Source code for mlprimitives.evaluation

# -*- coding: utf-8 -*-

"""
MLPrimitives Evaluation functions.

Collection of functions and tools to evaluate the performance
of a pipeline over a given dataset.
"""

import json
import logging
from copy import copy

import numpy as np
from mlblocks import MLPipeline
from sklearn import metrics

from mlprimitives.datasets import load_dataset

LOGGER = logging.getLogger(__name__)


[docs]def get_value(dataset, value): if isinstance(value, str) and value.startswith('$'): value = getattr(dataset, value[1:]) elif isinstance(value, dict): value = get_context(dataset, value) elif isinstance(value, list): value = [get_value(dataset, v) for v in value] return copy(value)
[docs]def get_context(dataset, context_spec): context = dict() for key, value in context_spec.items(): context[key] = get_value(dataset, value) return context
[docs]def get_scorer(name, kwargs): metric = getattr(metrics, name, None) if not metric: raise ValueError('Unknown metric: "{}"'.format(name)) def scorer(obs, exp): return metric(obs, exp, **kwargs) return scorer
[docs]def score_pipeline(pipeline_metadata, n_splits=5, random_state=0, dataset=None): if isinstance(pipeline_metadata, str): LOGGER.info('Loading pipeline %s', pipeline_metadata) with open(pipeline_metadata, 'r') as pipeline_file: pipeline_metadata = json.load(pipeline_file) validation = pipeline_metadata['validation'] if dataset is None: dataset = validation['dataset'] LOGGER.info('Loading dataset %s', dataset) dataset = load_dataset(dataset) metric = validation.get('metric') metric_args = validation.get('metric_args', dict()) if metric: scorer = get_scorer(metric, metric_args) else: scorer = dataset.score metric = dataset.metric scores = list() splits = dataset.get_splits(n_splits, random_state) if n_splits == 1: splits = [splits] for split, (X_train, X_test, y_train, y_test) in enumerate(splits): LOGGER.info('Scoring split %s', split + 1) context = get_context(dataset, validation.get('context', dict())) pipeline = MLPipeline.from_dict(pipeline_metadata) pipeline.fit(X_train, y_train, **context) predictions = pipeline.predict(X_test, **context) score = scorer(y_test, predictions) LOGGER.info('Split %s %s: %s', split + 1, metric, score) scores.append(score) return np.mean(scores), np.std(scores)