Source code for autobazaar.search

# -*- coding: utf-8 -*-

"""AutoBazaar Search Module.


This module contains the PipelineSearcher, which is the class that
contains the main logic of the Auto Machine Learning process.
"""

import gc
import itertools
import json
import logging
import os
import signal
import warnings
from collections import defaultdict
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
from btb import HyperParameter
from btb.tuning import GP, GPEi, Uniform
from mit_d3m.loaders import get_loader
from mlblocks.mlpipeline import MLPipeline
from sklearn.model_selection import KFold, StratifiedKFold

from autobazaar.pipeline import ABPipeline
from autobazaar.utils import ensure_dir, make_dumpable, remove_dots, restore_dots

LOGGER = logging.getLogger(__name__)

warnings.filterwarnings("ignore", category=DeprecationWarning)

TEMPLATES_DIR = os.path.join(os.path.dirname(__file__), 'templates')

TRIVIAL_PIPELINE_METHOD = {
    'classification': 'mode',
    'regression': 'median',
    'collaborativeFiltering': 'median',
    'graphMatching': 'mode',
}

TUNERS = {
    'gp': GP,
    'gpei': GPEi,
    'uniform': Uniform
}

PARAM_TYPES = {
    'str': 'string',
}


[docs]class StopSearch(KeyboardInterrupt): pass
[docs]class UnsupportedProblem(Exception): pass
[docs]def log_times(name, append=False): def decorator(wrapped): def wrapper(self, *args, **kwargs): start = datetime.utcnow() result = wrapped(self, *args, **kwargs) elapsed = (datetime.utcnow() - start).total_seconds() if append: attribute = getattr(self, name, None) if attribute is None: attribute = list() setattr(self, name, attribute) attribute.append(elapsed) else: setattr(self, name, elapsed) return result return wrapper return decorator
[docs]class PipelineSearcher(object): """PipelineSearcher class. This class is responsible for searching the best pipeline to solve a given dataset and problem. """ def __init__(self, pipelines_dir, db=None, test_id=None, tuner_type='gp', cv_splits=5, random_state=0): self._db = db self._pipelines_dir = pipelines_dir ensure_dir(self._pipelines_dir) self._cv_splits = cv_splits self._random_state = random_state self._tuner_type = tuner_type self._tuner_class = TUNERS[tuner_type] self._test_id = test_id def _dump_pipelines(self): LOGGER.info('Dumping best pipelines') dumped = list() gc.collect() for details in self._to_dump: pipeline = details['pipeline'] if not pipeline.dumped: pipeline.fit(self.data_params) mlpipeline = pipeline.pipeline LOGGER.info("Dumping pipeline %s: %s", pipeline.id, pipeline.pipeline) LOGGER.info("Hyperparameters: %s", mlpipeline.get_hyperparameters()) pipeline.dump(self._pipelines_dir) details['pipeline'] = pipeline.id dumped.append(details) gc.collect() else: LOGGER.info("Skipping already dumped pipeline %s", pipeline.id) return dumped def _set_for_dump(self, pipeline): self._to_dump.append({ 'elapsed': (datetime.utcnow() - self.start_time).total_seconds(), 'iterations': len(self.pipelines) - 1, 'cv_score': self.best_pipeline.score, 'rank': self.best_pipeline.rank, 'pipeline': pipeline, 'load_time': self.load_time, 'trivial_time': self.trivial_time, 'cv_time': np.sum(self.cv_times), }) def _save_pipeline(self, pipeline): pipeline_dict = pipeline.to_dict(True) pipeline_dict['_id'] = pipeline.id pipeline_dict['ts'] = datetime.utcnow() self.pipelines.append(pipeline_dict) if self._db: insertable = remove_dots(pipeline_dict) insertable.pop('problem_doc') insertable['dataset'] = self.dataset_id insertable['tuner_type'] = self._tuner_type insertable['test_id'] = self._test_id self._db.pipelines.insert_one(insertable) @log_times('trivial_time') def _build_trivial_pipeline(self): LOGGER.info("Building the Trivial pipeline") try: method = TRIVIAL_PIPELINE_METHOD.get(self.task_type) pipeline_dict = { 'name': 'trivial.{}'.format(method), 'primitives': ['mlprimitives.custom.trivial.TrivialPredictor'], 'init_params': { 'mlprimitives.custom.trivial.TrivialPredictor': { 'method': method } } } pipeline = ABPipeline(pipeline_dict, self.loader, self.metric, self.problem_doc) pipeline.cv_score(self.data_params.X, self.data_params.y, self.data_params.context, cv=self.kf) self._save_pipeline(pipeline) return pipeline except Exception: # if the Trivial pipeline crashes we can do nothing, # so we just log the error and move on. LOGGER.exception("The Trivial pipeline crashed.") def _load_template_json(self, template_name): if template_name.endswith('.json'): template_filename = template_name name = template_name[:-5] else: name = template_name template_name = template_name.replace('/', '.') + '.json' template_filename = os.path.join(TEMPLATES_DIR, template_name) if os.path.exists(template_filename): with open(template_filename, 'r') as template_file: template_dict = json.load(template_file) template_dict['name'] = name return template_dict def _find_template(self, template_name): match = { 'metadata.name': template_name } cursor = self._db.pipelines.find(match) templates = list(cursor.sort('metadata.insert_ts', -1).limit(1)) if templates: template = templates[0] template['name'] = template.pop('metadata')['name'] template['template'] = str(template.pop('_id')) return restore_dots(template) def _load_template(self, template_name): if self._db: template = self._find_template(template_name) if template: return template return self._load_template_json(template_name) def _get_template(self, template_name=None): if template_name: template = self._load_template(template_name) if not template: raise ValueError("Template {} not found".format(template_name)) primitives = '\n'.join(template['primitives']) LOGGER.info('Using template %s:\n%s', template_name, primitives) return template else: problem_type = [ self.data_modality, self.task_type, self.task_subtype ] for levels in reversed(range(1, 4)): # Try the following options: # modality/task/subtask/default # modality/task/default # modality/default template_name = '/'.join(problem_type[:levels] + ['default']) template = self._load_template(template_name) if template: primitives = '\n'.join(template['primitives']) LOGGER.info('Using template %s:\n%s', template_name, primitives) return template # Nothing has been found for this modality/task/subtask combination problem_type = '/'.join(problem_type) LOGGER.error('Problem type not supported %s', problem_type) raise UnsupportedProblem(problem_type) @log_times('cv_times', append=True) def _cv_pipeline(self, params=None): pipeline_dict = self.template_dict.copy() if params: pipeline_dict['hyperparameters'] = params pipeline = ABPipeline(pipeline_dict, self.loader, self.metric, self.problem_doc) X = self.data_params.X y = self.data_params.y context = self.data_params.context try: pipeline.cv_score(X, y, context, cv=self.kf) except KeyboardInterrupt: raise except Exception: LOGGER.exception("Crash cross validating pipeline %s", pipeline.id) return None return pipeline def _create_tuner(self, pipeline): # Build an MLPipeline to get the tunables and the default params mlpipeline = MLPipeline.from_dict(self.template_dict) tunable_hyperparameters = mlpipeline.get_tunable_hyperparameters() tunables = [] tunable_keys = [] for block_name, params in tunable_hyperparameters.items(): for param_name, param_details in params.items(): key = (block_name, param_name) param_type = param_details['type'] param_type = PARAM_TYPES.get(param_type, param_type) if param_type == 'bool': param_range = [True, False] else: param_range = param_details.get('range') or param_details.get('values') value = HyperParameter(param_type, param_range) tunables.append((key, value)) tunable_keys.append(key) # Create the tuner LOGGER.info('Creating %s tuner', self._tuner_class.__name__) self.tuner = self._tuner_class(tunables) if pipeline: try: # Add the default params and the score obtained by them to the tuner. default_params = defaultdict(dict) for block_name, params in pipeline.pipeline.get_hyperparameters().items(): for param, value in params.items(): key = (block_name, param) if key in tunable_keys: if value is None: raise ValueError('None value is not supported') default_params[key] = value if pipeline.rank is not None: self.tuner.add(default_params, 1 - pipeline.rank) except ValueError: pass def _set_checkpoint(self): next_checkpoint = self.checkpoints.pop(0) interval = next_checkpoint - self.current_checkpoint self._stop_time = datetime.utcnow() + timedelta(seconds=interval) LOGGER.info("Setting %s seconds checkpoint in %s seconds: %s", next_checkpoint, interval, self._stop_time) signal.alarm(interval) self.current_checkpoint = next_checkpoint def _checkpoint(self, signum=None, frame=None, final=False): signal.alarm(0) checkpoint_name = 'Final' if final else str(self.current_checkpoint) + ' seconds' LOGGER.info("%s checkpoint reached", checkpoint_name) try: if self.best_pipeline: self._set_for_dump(self.best_pipeline) except KeyboardInterrupt: raise except Exception: LOGGER.exception("Checkpoint dump crashed") if final or not bool(self.checkpoints): self.current_checkpoint = None # LOGGER.warn("Stopping Search") # raise StopSearch() else: self._set_checkpoint() def _check_stop(self): if self._stop_time and self._stop_time < datetime.utcnow(): LOGGER.warn("Stop Time already passed. Stopping Search!") raise StopSearch() def _setup_search(self, d3mds, budget, checkpoints, template_name): self.start_time = datetime.utcnow() self.cv_times = list() # Problem variables self.problem_id = d3mds.get_problem_id() self.task_type = d3mds.get_task_type() self.task_subtype = d3mds.problem.get_task_subtype() # TODO: put this in mit-d3m loaders if self.task_type == 'vertex_classification': self.task_type = 'vertex_nomination' self.problem_doc = d3mds.problem_doc # Dataset variables self.dataset_id = d3mds.dataset_id self.data_modality = d3mds.get_data_modality() # TODO: put this in mit-d3m loaders if self.data_modality == 'edgeList': self.data_modality = 'graph' self.metric = d3mds.get_metric() self.loader = get_loader(self.data_modality, self.task_type) self.best_pipeline = None self.pipelines = [] self.checkpoints = sorted(checkpoints or []) self.current_checkpoint = 0 self._to_dump = [] if not self.checkpoints and budget is None: self.budget = 1 else: self.budget = budget self.template_dict = self._get_template(template_name) LOGGER.info("Running TA2 Search") LOGGER.info("Problem Id: %s", self.problem_id) LOGGER.info(" Data Modality: %s", self.data_modality) LOGGER.info(" Task type: %s", self.task_type) LOGGER.info(" Task subtype: %s", self.task_subtype) LOGGER.info(" Metric: %s", self.metric) LOGGER.info(" Checkpoints: %s", self.checkpoints) LOGGER.info(" Budget: %s", self.budget) @log_times('load_time') def _load_data(self, d3mds): self.data_params = self.loader.load(d3mds) def _setup_cv(self): if isinstance(self.data_params.y, pd.Series): min_samples = self.data_params.y.value_counts().min() else: y = self.data_params.y min_samples = y.groupby(list(y.columns)).size().min() if self.task_type == 'classification' and min_samples >= self._cv_splits: self.kf = StratifiedKFold( n_splits=self._cv_splits, shuffle=True, random_state=self._random_state ) else: self.kf = KFold( n_splits=self._cv_splits, shuffle=True, random_state=self._random_state )
[docs] def search(self, d3mds, template_name=None, budget=None, checkpoints=None): try: self._setup_search(d3mds, budget, checkpoints, template_name) self._load_data(d3mds) self._setup_cv() # Build the trivial pipeline self.best_pipeline = self._build_trivial_pipeline() # Do not continue if there is no budget or no fit data if budget == 0 or not len(self.data_params.X): raise StopSearch() # Build the default pipeline default_pipeline = self._cv_pipeline() if default_pipeline: self.best_pipeline = default_pipeline self._save_pipeline(default_pipeline) if budget == 1: raise StopSearch() elif budget is not None: iterator = range(budget - 1) else: iterator = itertools.count() # infinite range # Build the tuner self._create_tuner(default_pipeline) LOGGER.info("Starting the tuning loop") if self.checkpoints: signal.signal(signal.SIGALRM, self._checkpoint) self._set_checkpoint() else: self._stop_time = None for iteration in iterator: self._check_stop() proposed_params = self.tuner.propose() params = make_dumpable(proposed_params) LOGGER.info("Cross validating pipeline %s", iteration + 1) pipeline = self._cv_pipeline(params) if pipeline and (pipeline.rank is not None): self.tuner.add(proposed_params, 1 - pipeline.rank) LOGGER.info("Saving pipeline %s: %s", iteration + 1, pipeline.id) self._save_pipeline(pipeline) if not self.best_pipeline or (pipeline.rank < self.best_pipeline.rank): self.best_pipeline = pipeline LOGGER.info('Best pipeline so far: %s; rank: %s, score: %s', self.best_pipeline, self.best_pipeline.rank, self.best_pipeline.score) else: self.tuner.add(proposed_params, -1000000) except KeyboardInterrupt: pass finally: signal.alarm(0) if self.current_checkpoint: self._checkpoint(final=True) elif self.best_pipeline and not checkpoints: self._set_for_dump(self.best_pipeline) if self.best_pipeline: LOGGER.info('Best pipeline for problem %s found: %s; rank: %s, score: %s', self.problem_id, self.best_pipeline, self.best_pipeline.rank, self.best_pipeline.score) else: LOGGER.info('No pipeline could be found for problem %s', self.problem_id) return self._dump_pipelines()