Source code for mit_d3m.loaders

# -*- coding: utf-8 -*-

import logging
import os
from collections import OrderedDict

import networkx as nx
import numpy as np
import pandas as pd

from mit_d3m.utils import available_memory, used_memory

LOGGER = logging.getLogger(__name__)


[docs]class Dataset: def __init__(self, name, X=None, y=None, context=None): self.name = name self.X = X self.y = y self.context = context or dict() def __repr__(self): attributes = ['{!r}'.format(self.name)] for attribute in ['X', 'y', 'context']: if getattr(self, attribute) is not None: attributes.append(repr(attribute)) return "Dataset({})".format(', '.join(attributes))
[docs] def get_split(self, indexes): X = self.X if hasattr(X, 'iloc'): X = X.iloc[indexes] else: X = X[indexes] y = self.y if y is not None: if hasattr(y, 'iloc'): y = y.iloc[indexes] else: y = y[indexes] return X, y
[docs]class Loader(object): def __init__(self, data_modality, task_type): self.data_modality = data_modality self.task_type = task_type
[docs] def load(self, d3mds): """Load X, y and context from D3MDS.""" X, y = d3mds.get_data() return Dataset(d3mds.dataset_id, X, y)
[docs] def to_dict(self): return { 'data_modality': self.data_modality, 'task_type': self.task_type, }
[docs]def features_by_type(column_types, columns): if not isinstance(column_types, list): column_types = [column_types] features = [] for column in columns: is_of_type = column['colType'] in column_types target = column['role'] == ['suggestedTarget'] if is_of_type and not target: features.append(column['colName']) return features
[docs]class TabularLoader(Loader):
[docs] @staticmethod def find_privileged_features(dataset_doc, tables): privileged_features = dict() for quality in dataset_doc.get('qualities', []): privileged_quality = quality['qualName'] == 'privilegedFeature' privileged_true = quality['qualValue'] == 'True' restricted_to = quality.get('restrictedTo') if privileged_quality and privileged_true and restricted_to: res_id = restricted_to['resID'] privileged_feature = privileged_features.setdefault(res_id, list()) res_component = restricted_to.get('resComponent') if res_component is not None: column_name = res_component.get('columnName') if column_name is None: column_index = res_component.get('columnIndex') if column_index is not None: column_name = tables[res_id]['columns'][column_index]['columnName'] if column_name: privileged_feature.append(column_name) return privileged_features
[docs] @classmethod def remove_privileged_features(cls, dataset_doc, tables): privileged_features = cls.find_privileged_features(dataset_doc, tables) for res_id, columns in privileged_features.items(): if columns and res_id in tables: tables[res_id]['data'].drop(columns, axis=1, inplace=True)
[docs] @staticmethod def map_dtype_to_d3m_type(dtype): if 'int' in str(dtype): return 'integer' elif 'float' in str(dtype): return 'real' elif 'str' in str(dtype): return 'string' elif 'object' in str(dtype): return 'categorical' elif 'date' in str(dtype): return 'dateTime' elif 'bool' in str(dtype): return 'boolean' else: return 'categorical'
[docs] @classmethod def analyze_columns(cls, columns, data): index = None time_index = None targets = [] for column in columns: column_name = column['colName'] if 'suggestedTarget' in column['role']: targets.append(column_name) else: if 'index' in column['role']: if index: raise ValueError("Multiple indexes found") index = column_name if 'timeIndicator' in column['role']: if time_index: raise ValueError("Multiple indexes found") time_index = column_name if index: data.set_index(index, drop=False, inplace=True) if targets: data.drop(targets, axis=1, errors='ignore', inplace=True) return columns, index, time_index
[docs] @classmethod def build_columns(cls, data, name): index = cls.make_index(data, name) columns = [ { 'colIndex': column_index, 'colName': column_name, 'colType': cls.map_dtype_to_d3m_type(data[column_name].dtype) } for column_index, column_name in enumerate(data) ] time_index = None if 'time' in data.columns: time_index = 'time' return columns, index, time_index
[docs] @classmethod def get_columns(cls, resource, data, name): columns = resource.get('columns') if columns: columns, index, time_index = cls.analyze_columns(columns, data) if not index: index = cls.make_index(data, name) return columns, index, time_index else: return cls.build_columns(data, name)
[docs] @classmethod def load_table(cls, dataset_root, resource): table_path = os.path.join(dataset_root, resource['resPath']) table_name = os.path.basename(table_path).split('.')[0] dirname = os.path.basename(os.path.normpath(os.path.dirname(table_path))) if dirname != 'tables': raise ValueError("Found a table out of the tables folder!") data = pd.read_csv(table_path) columns, index, time_index = cls.get_columns(resource, data, table_name) return { 'resource_id': resource['resID'], 'table_name': table_name, 'columns': columns, 'data': data, 'index': index, 'time_index': time_index }
[docs] @staticmethod def get_parent(resource_id, tables): for table in tables.values(): for column in table['columns']: refers_to = column.get('refersTo', dict()).get('resID') if refers_to == resource_id: return table, column['colName']
[docs] @staticmethod def get_collection_details(dataset_root, resource): collection_path = os.path.join(dataset_root, resource['resPath']) if collection_path.endswith('/'): collection_path = collection_path[:-1] collection_name = os.path.basename(collection_path).split('.')[0] return collection_name, collection_path
[docs] @classmethod def load_collection_data(cls, path, parent_table, parent_column): parent_data = parent_table['data'] parent_index_name = parent_table['index'] dataframes = [] for parent_index, row in parent_data.iterrows(): filename = row[parent_column] df = pd.read_csv(os.path.join(path, filename)) df[parent_index_name] = parent_index dataframes.append(df) del parent_data[parent_column] return pd.concat(dataframes, ignore_index=True)
[docs] @staticmethod def make_index(data, name): index_name = name + '_id' while index_name in data.columns: index_name += '_id' data.index.name = index_name data.reset_index(inplace=True, drop=False) return index_name
[docs] @classmethod def load_collection(cls, tables, dataset_root, resource): parent_table, parent_column = cls.get_parent( resource['resID'], tables ) table_name, path = cls.get_collection_details(dataset_root, resource) data = cls.load_collection_data(path, parent_table, parent_column) columns, index, time_index = cls.get_columns(resource, data, table_name) return { 'resource_id': resource['resID'], 'table_name': table_name, 'columns': columns, 'data': data, 'index': index, 'time_index': time_index }
[docs] @staticmethod def get_resources(d3mds): main_table = None resources = list() for resource in d3mds.dataset_doc['dataResources']: if 'learningData.csv' in resource['resPath']: main_table = resource else: resources.append(resource) if main_table is None: raise RuntimeError('Main table not found') return main_table, resources
[docs] @classmethod def load_tables(cls, d3mds): main_table, resources = cls.get_resources(d3mds) dataset_root = d3mds.dataset_root main_table = cls.load_table(dataset_root, main_table) tables = { main_table['resource_id']: main_table } for resource in resources: resource_type = resource['resType'] is_collection = resource['isCollection'] if resource_type == 'table' and not is_collection: table = cls.load_table(dataset_root, resource) elif resource_type == 'timeseries' or is_collection: table = cls.load_collection(tables, dataset_root, resource) else: raise ValueError("I don't know what to do with this") tables[table['resource_id']] = table cls.remove_privileged_features(d3mds.dataset_doc, tables) return tables
[docs] @staticmethod def get_relationships(tables): relationships = [] table_names = { table['resource_id']: table['table_name'] for table in tables.values() } for table in tables.values(): columns = table['columns'] df = table['data'] table_name = table['table_name'] for column in columns: refers_to = column.get('refersTo') if refers_to: res_id = refers_to['resID'] res_obj = refers_to['resObject'] foreign_table_name = table_names[res_id] column_name = column['colName'] if column_name in df.columns and isinstance(res_obj, dict): foreign_table_name = table_names[res_id] column_index = res_obj.get('columnIndex') if column_index is not None: foreign_table = tables[foreign_table_name] foreign_column_name = foreign_table['columns'][column_index]['colName'] else: foreign_column_name = res_obj['columnName'] relationships.append(( foreign_table_name, foreign_column_name, table_name, column_name, )) elif res_obj == 'item': foreign_column_name = 'd3mIndex' column_name = 'd3mIndex' relationships.append(( table_name, column_name, foreign_table_name, foreign_column_name, )) return relationships
[docs] def load(self, d3mds): X, y = d3mds.get_data() tables = self.load_tables(d3mds) relationships = self.get_relationships(tables) entities = dict() for table in tables.values(): entities[table['table_name']] = ( table['data'], table['index'], table['time_index'] ) context = { 'target_entity': 'learningData', 'entities': entities, 'relationships': relationships } X.reset_index(inplace=True, drop=False) X.set_index('d3mIndex', inplace=True, drop=False) return Dataset(d3mds.dataset_id, X, y, context)
[docs]class ResourceLoader(Loader):
[docs] def load_resources(self, resources_names, d3mds): raise NotImplementedError
[docs] def get_context(self, X, y): return None
[docs] def load(self, d3mds): """Load X, y and context from D3MDS.""" X, y = d3mds.get_data() resource_columns = d3mds.get_related_resources(self.data_modality) for resource_column in resource_columns: X = self.load_resources(X, resource_column, d3mds) context = self.get_context(X, y) return Dataset(d3mds.dataset_id, X, y, context=context)
[docs]class ImageLoader(ResourceLoader): INPUT_SHAPE = (224, 224, 3) EPOCHS = 1
[docs] def load_resources(self, X, resource_column, d3mds): from keras.preprocessing.image import img_to_array, load_img # noqa LOGGER.info("Loading %s images", len(X)) image_dir = d3mds.get_resources_dir('image') images = np.empty((len(X), *self.INPUT_SHAPE), dtype=np.float32) for i, filename in enumerate(X[resource_column]): if used_memory() > available_memory(): raise MemoryError() filename = os.path.join(image_dir, filename) image = load_img(filename) image = image.resize(self.INPUT_SHAPE[:2]) image = img_to_array(image) image = image / 255.0 # Quantize images. images[i, :, :, :] = image return images
[docs]class TextLoader(ResourceLoader):
[docs] def load_resources(self, X, resource_column, d3mds): texts_dir = d3mds.get_resources_dir('text') texts = [] for filename in X.pop(resource_column): with open(os.path.join(texts_dir, filename), 'r') as text_file: texts.append(text_file.read()) X['texts'] = texts return X
[docs]class GraphLoader(Loader):
[docs] def load_graphs(self, d3mds, max_graphs=2): graphs = d3mds.load_graphs() node_columns = d3mds.get_related_resources(self.data_modality) graph_names = OrderedDict() for _, (column, graph_id) in zip(range(max_graphs), node_columns.items()): graph_names[column] = nx.Graph(graphs[graph_id]) return graph_names
[docs] def get_context(self, X, d3mds): if self.task_type == 'community_detection': graphs = self.load_graphs(d3mds, 1) column, graph = list(graphs.items())[0] context = { 'graph': graph, } elif self.task_type == 'link_prediction': graphs = self.load_graphs(d3mds, 2) columns = list(graphs.keys()) context = { 'node_columns': columns, 'graph': graphs[columns[-1]] } elif self.task_type in ('vertex_nomination', 'vertex_classification'): graphs = self.load_graphs(d3mds, 1) context = { 'graphs': graphs } elif self.task_type == 'graph_matching': graphs = self.load_graphs(d3mds, 2) columns = list(graphs.keys()) graph_0, graph_1 = tuple(graphs.values()) pairs = X[columns].values graph = graph_0.copy() graph.add_nodes_from(graph_1.nodes(data=True)) graph.add_edges_from(graph_1.edges) graph.add_edges_from(pairs) context = { 'node_columns': columns, 'graph': graph, 'graphs': graphs } else: raise ValueError('Unsupported task_type: {}'.format(self.task_type)) return context
[docs] def load(self, d3mds): X, y = d3mds.get_data() context = self.get_context(X, d3mds) return Dataset(d3mds.dataset_id, X, y, context=context)
_LOADERS = { 'single_table': TabularLoader, 'multi_table': TabularLoader, 'timeseries': TabularLoader, 'image': ImageLoader, 'text': TextLoader, 'graph': GraphLoader, }
[docs]def get_loader(data_modality, task_type): loader_class = _LOADERS.get(data_modality, Loader) return loader_class(data_modality, task_type)