Source code for autobazaar.pipeline

# -*- coding: utf-8 -*-

"""AutoBazaar Pipeline Module."""

import json
import logging
import os
import random
import uuid
from collections import Counter

import cloudpickle
import numpy as np
import pandas as pd
from mit_d3m.loaders import get_loader
from mit_d3m.metrics import METRICS_DICT
from mlblocks import MLPipeline

from autobazaar.utils import encode_score

LOGGER = logging.getLogger(__name__)


[docs]class ABPipeline(object): """AutoBazaar Pipeline Class.""" def _extract_hyperparameters(self, preprocessing_primitives): block_names_count = Counter() block_names = list() for primitive in preprocessing_primitives: block_names_count.update([primitive]) block_count = block_names_count[primitive] block_names.append('{}#{}'.format(primitive, block_count)) pre_params = dict() hyperparameters = self.pipeline_dict['hyperparameters'].copy() for block_name in block_names: block_params = hyperparameters.pop(block_name, None) if block_params: pre_params[block_name] = block_params return pre_params, hyperparameters def __init__(self, pipeline_dict, loader, metric, problem_doc): self.pipeline_dict = pipeline_dict self.name = pipeline_dict['name'] self.template = pipeline_dict.get('template') self.loader = loader self.metric = metric self.problem_doc = problem_doc preprocessing_blocks = self.pipeline_dict.get('preprocessing_blocks') if preprocessing_blocks: preprocessing = pipeline_dict.copy() preprocessing_primitives = preprocessing['primitives'][:preprocessing_blocks] preprocessing['primitives'] = preprocessing_primitives self._preprocessing = preprocessing tunable = pipeline_dict.copy() tunable_primitives = tunable['primitives'][preprocessing_blocks:] tunable['primitives'] = tunable_primitives self._tunable = tunable pre_params, tun_params = self._extract_hyperparameters(preprocessing_primitives) self._preprocessing['hyperparameters'] = pre_params self._tunable['hyperparameters'] = tun_params else: self._preprocessing = None self._tunable = pipeline_dict self.id = str(uuid.uuid4()) self.cv_scores = list() self.rank = None self.score = None self.dumped = False self.fitted = False self.pipeline = MLPipeline.from_dict(pipeline_dict)
[docs] def fit(self, data_params): """Fit the pipeline on the given params.""" X, y = data_params.X, data_params.y self.pipeline = MLPipeline.from_dict(self.pipeline_dict) self.pipeline.fit(X, y, **data_params.context) self.fitted = True
[docs] def predict(self, d3mds): """Get predictions for the given D3MDS.""" data_params = self.loader.load(d3mds) predictions = self.pipeline.predict(data_params.X, **data_params.context) out_df = pd.DataFrame() out_df['d3mIndex'] = data_params.y.index out_df[d3mds.target_column] = predictions return out_df
def _get_split(self, X, y, indexes): if hasattr(X, 'iloc'): X = X.iloc[indexes] else: X = X[indexes] if y is not None: if hasattr(y, 'iloc'): y = y.iloc[indexes] else: y = y[indexes] return X, y def _get_score(self): score = np.mean(self.cv_scores) std = np.std(self.cv_scores) if 'Error' in self.metric: rank = score elif score <= 1: rank = 1 - score else: raise ValueError("Found a score > 1 in a maximization problem: {}".format(score)) return score, std, rank
[docs] def preprocess(self, X, y, context): """Execute the preprocessing steps of the pipeline.""" if self._preprocessing: LOGGER.info("Executing preprocessing pipeline") pipeline = MLPipeline.from_dict(self._preprocessing) pipeline.fit(X, y, **context) return pipeline.predict(X, **context) else: LOGGER.info("No preprocessing steps found") return X
[docs] def cv_score(self, X, y, context, metric=None, cv=None): """Cross Validate this pipeline.""" scorer = METRICS_DICT[metric or self.metric] LOGGER.debug('CV Scoring pipeline %s', self) self.cv_scores = list() for fold, (train_index, test_index) in enumerate(cv.split(X, y)): LOGGER.debug('Scoring fold: %s', fold) X_train, y_train = self._get_split(X, y, train_index) X_test, y_test = self._get_split(X, y, test_index) pipeline = MLPipeline.from_dict(self._tunable) pipeline.fit(X_train, y_train, **context) pred = pipeline.predict(X_test, **context) score = encode_score(scorer, y_test, pred) self.cv_scores.append(score) LOGGER.debug('Fold %s score: %s', fold, score) score, std, rank = self._get_score() LOGGER.debug('CV score: %s +/- %s; rank: %s', score, std, rank) self.score = score self.std = std self.rank = rank + random.random() * 1.e-12 # to avoid collisions
[docs] def to_dict(self, problem_doc=False): """Return the details of this pipeline in a dict.""" pipeline_dict = self.pipeline.to_dict().copy() pipeline_dict.update({ 'id': self.id, 'name': self.name, 'template': self.template, 'loader': self.loader.to_dict(), 'score': self.score, 'rank': self.rank, 'metric': self.metric }) if problem_doc: pipeline_dict['problem_doc'] = self.problem_doc return pipeline_dict
def __repr__(self): return 'ABPipeline({})'.format(json.dumps(self.to_dict(), indent=4))
[docs] @classmethod def from_dict(cls, pipeline_dict): """Load a pipeline from a dict.""" pipeline_dict = pipeline_dict.copy() loader = get_loader(**pipeline_dict.pop('loader')) metric = pipeline_dict['metric'] problem_doc = pipeline_dict.pop('problem_doc') return cls(pipeline_dict, loader, metric, problem_doc)
[docs] def dump(self, output_dir, rank=None): """Dump this pipeline using pickle.""" if rank is None: rank = self.rank LOGGER.info('Dumping pipeline with rank %s: %s', rank, self.id) self.dumped = True pickle_path = os.path.join(output_dir, '{}.pkl'.format(self.id)) with open(pickle_path, "wb") as pickle_file: LOGGER.info("Outputting pipeline %s", pickle_file.name) cloudpickle.dump(self, pickle_file) json_path = os.path.join(output_dir, '{}.json'.format(self.id)) self.pipeline.save(json_path)