Source code for ta2.ta3.client

import logging
import os
import re
from datetime import datetime

import grpc
from d3m.metadata.problem import Problem
from google.protobuf.timestamp_pb2 import Timestamp
from ta3ta2_api import core_pb2, core_pb2_grpc
from ta3ta2_api.utils import encode_problem_description
from ta3ta2_api.value_pb2 import Value, ValueType

LOGGER = logging.getLogger(__name__)

RE_PYTHONIZE = re.compile(r'[A-Z]')


[docs]def pythonize(name): return re.sub(r'[A-Z]', r'_\g<0>', name).upper()
[docs]class TA3APIClient(object): def __init__(self, port, local_input='input', remote_input='input', verbose=False): channel = grpc.insecure_channel('localhost:' + str(port)) self.stub = core_pb2_grpc.CoreStub(channel) self.local_input = local_input self.remote_input = remote_input self.verbose = verbose def _get_dataset_doc_path(self, dataset): return os.path.join( 'file://' + os.path.abspath(self.remote_input), dataset, 'TRAIN/dataset_TRAIN/datasetDoc.json' ) def _get_problem_doc_path(self, dataset): return os.path.join( 'file://' + os.path.abspath(self.local_input), dataset, 'TRAIN/problem_TRAIN/problemDoc.json' ) def _build_problem(self, dataset): problem = Problem.load(problem_uri=self._get_problem_doc_path(dataset)) return encode_problem_description(problem)
[docs] def search_solutions(self, dataset, time_bound_search=1.): created_at = Timestamp() created_at.FromDatetime(datetime.utcnow()) request = core_pb2.SearchSolutionsRequest( user_agent='ta3_api_test.py', version='2020.2.11', time_bound_search=time_bound_search, priority=0., allowed_value_types=[ ValueType.Value('RAW'), ValueType.Value('DATASET_URI'), ValueType.Value('CSV_URI'), ], inputs=[ Value(dataset_uri=self._get_dataset_doc_path(dataset)) ], problem=self._build_problem(dataset) # template=pipeline_pb2.PipelineDescription( # id='dummy', # source=pipeline_pb2.PipelineSource( # name='dummy', # contact='dummy', # pipelines=['dummy'], # ), # created=created_at, # context=pipeline_pb2.PipelineContext.Value('PIPELINE_CONTEXT_UNKNOWN'), # name='dummy', # description='dummy', # users=[ # pipeline_pb2.PipelineDescriptionUser( # id='dummy', # reason='dummy', # rationale='dummy' # ) # ], # inputs=[ # pipeline_pb2.PipelineDescriptionInput( # name='dummy' # ) # ], # outputs=[ # pipeline_pb2.PipelineDescriptionOutput( # name='dummy', # data='dummy' # ) # ], # steps=[ # pipeline_pb2.PipelineDescriptionStep( # primitive=pipeline_pb2.PrimitivePipelineDescriptionStep( # primitive=Primitive( # id='dummy', # version='dummy', # python_path='dummy', # name='dummy', # digest='dummy' # ), # arguments={ # 'dummy': pipeline_pb2.PrimitiveStepArgument( # data=pipeline_pb2.DataArgument( # data='dummy' # ) # ) # }, # outputs=[ # pipeline_pb2.StepOutput( # id='dummy' # ) # ], # hyperparams={ # 'dummy': pipeline_pb2.PrimitiveStepHyperparameter( # data=pipeline_pb2.DataArgument( # data='dummy' # ) # ) # }, # users=[ # pipeline_pb2.PipelineDescriptionUser( # id='dummy', # reason='dummy', # rationale='dummy' # ) # ], # ) # ) # ] # ), ) LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.SearchSolutions(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response
[docs] def get_search_solutions_results(self, search_id, max_results=None): request = core_pb2.GetSearchSolutionsResultsRequest( search_id=search_id, ) LOGGER.debug("%s: %s", request.__class__.__name__, request) solutions = [] for solution in self.stub.GetSearchSolutionsResults(request): LOGGER.debug("%s: %s", solution.__class__.__name__, solution) solutions.append(solution) if max_results and len(solutions) >= max_results: break return solutions
[docs] def end_search_solutions(self, search_id): request = core_pb2.EndSearchSolutionsRequest( search_id=search_id, ) LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.EndSearchSolutions(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response
[docs] def stop_search_solutions(self, search_id): request = core_pb2.StopSearchSolutionsRequest( search_id=search_id, ) LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.StopSearchSolutions(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response
[docs] def describe_solution(self, solution_id): request = core_pb2.DescribeSolutionRequest( solution_id=solution_id ) LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.DescribeSolution(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response
[docs] def score_solution(self, solution_id, dataset): problem = self._build_problem(dataset) request = core_pb2.ScoreSolutionRequest( solution_id=solution_id, inputs=[ Value(dataset_uri=self._get_dataset_doc_path(dataset)) ], performance_metrics=problem.problem.performance_metrics, # users=[ # core_pb2.SolutionRunUser( # id='dummy', # choosen=True, # reason='dummy' # ) # ], configuration=core_pb2.ScoringConfiguration( method=core_pb2.EvaluationMethod.Value('K_FOLD'), folds=3, train_test_ratio=3, shuffle=True, random_seed=0, stratified=False ) ) LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.ScoreSolution(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response
[docs] def get_score_solution_results(self, request_id): request = core_pb2.GetScoreSolutionResultsRequest( request_id=request_id, ) LOGGER.debug("%s: %s", request.__class__.__name__, request) score = None for score in self.stub.GetScoreSolutionResults(request): LOGGER.debug("%s: %s", score.__class__.__name__, score) # return only the last one return score
[docs] def fit_solution(self, solution_id, dataset): request = core_pb2.FitSolutionRequest( solution_id=solution_id, inputs=[ Value(dataset_uri=self._get_dataset_doc_path(dataset)) ], expose_outputs=[ 'outputs.0' # 'steps.0.produce' ], expose_value_types=[ ValueType.Value('CSV_URI') ], # users=[ # core_pb2.SolutionRunUser( # id='dummy', # choosen=True, # reason='dummy' # ) # ] ) LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.FitSolution(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response
[docs] def get_fit_solution_results(self, request_id, max_results=None): request = core_pb2.GetFitSolutionResultsRequest( request_id=request_id, ) LOGGER.debug("%s: %s", request.__class__.__name__, request) fitted_solutions = [] for solution in self.stub.GetFitSolutionResults(request): LOGGER.debug("%s: %s", solution.__class__.__name__, solution) fitted_solutions.append(solution) if max_results and len(fitted_solutions) >= max_results: break return fitted_solutions
[docs] def produce_solution(self, fitted_solution_id, dataset): request = core_pb2.ProduceSolutionRequest( fitted_solution_id=fitted_solution_id, inputs=[ Value(dataset_uri=self._get_dataset_doc_path(dataset)) ], expose_outputs=[ 'outputs.0' ], expose_value_types=[ ValueType.Value('CSV_URI') ], # users=[ # core_pb2.SolutionRunUser( # id='dummy', # choosen=True, # reason='dummy' # ) # ] ) LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.ProduceSolution(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response
[docs] def get_produce_solution_results(self, request_id): request = core_pb2.GetProduceSolutionResultsRequest( request_id=request_id, ) LOGGER.debug("%s: %s", request.__class__.__name__, request) results = [] for result in self.stub.GetProduceSolutionResults(request): LOGGER.debug("%s: %s", result.__class__.__name__, result) return results
[docs] def solution_export(self, fitted_solution_id, rank): request = core_pb2.SolutionExportRequest( solution_id=fitted_solution_id, rank=rank, ) LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.SolutionExport(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response
[docs] def list_primitives(self): request = core_pb2.ListPrimitivesRequest() LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.ListPrimitives(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response
[docs] def hello(self): request = core_pb2.HelloRequest() LOGGER.debug("%s: %s", request.__class__.__name__, request) response = self.stub.Hello(request) LOGGER.debug("%s: %s", response.__class__.__name__, response) return response