Source code for ml_wrappers.dataset.dataset_wrapper

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------

"""Defines a helpful dataset wrapper to allow operations such as summarizing data, taking the subset or sampling."""

import logging
import warnings

import numpy as np
import pandas as pd
from scipy.sparse import issparse

from ..common.constants import Defaults
from .dataset_utils import (_convert_batch_dataset_to_numpy,
                            _generate_augmented_data, _summarize_data)
from .timestamp_featurizer import CustomTimestampFeaturizer

with warnings.catch_warnings():
    warnings.filterwarnings('ignore', 'Starting from version 2.2.1', UserWarning)


module_logger = logging.getLogger(__name__)
module_logger.setLevel(logging.INFO)


try:
    import tensorflow as tf
except ImportError:
    module_logger.debug('Could not import tensorflow, required if using a Tensorflow model')

SAMPLED_STRING_ROWS = 10


[docs]class DatasetWrapper(object): """A wrapper around a dataset to make dataset operations more uniform across explainers.""" def __init__(self, dataset, clear_references=False): """Initialize the dataset wrapper. :param dataset: A matrix of feature vector examples (# examples x # features) for initializing the explainer. :type dataset: numpy.ndarray or pandas.DataFrame or panads.Series or scipy.sparse.csr_matrix or shap.DenseData or torch.Tensor or tensorflow.python.data.ops.dataset_ops.BatchDataset :param clear_references: A memory optimization that clears all references after use in explainers. :type clear_references: bool """ if not isinstance(dataset, pd.DataFrame) and not isinstance(dataset, pd.Series) and \ not isinstance(dataset, np.ndarray) and not issparse(dataset) and \ not str(type(dataset)).endswith(".DenseData'>") and \ not str(type(dataset)).endswith("torch.Tensor'>") and \ not str(type(dataset)).endswith("BatchDataset'>"): raise TypeError("Got type {0} which is not supported in DatasetWrapper".format( type(dataset)) ) self._features = None self._original_dataset_with_type = dataset self._dataset_is_df = isinstance(dataset, pd.DataFrame) self._dataset_is_series = isinstance(dataset, pd.Series) self._dataset_is_batch = str(type(dataset)).endswith("BatchDataset'>") self._default_index_cols = ['index'] self._default_index = True if self._dataset_is_df: self._features = dataset.columns.values.tolist() if self._dataset_is_df or self._dataset_is_series: dataset = dataset.values elif self._dataset_is_batch: dataset, features, size = _convert_batch_dataset_to_numpy(dataset) self._features = features self._batch_size = size self._dataset = dataset self._original_dataset = dataset self._summary_dataset = None self._column_indexer = None self._subset_taken = False self._summary_computed = False self._string_indexed = False self._one_hot_encoded = False self._one_hot_encoder = None self._timestamp_featurized = False self._timestamp_featurizer = None self._clear_references = clear_references @property def dataset(self): """Get the dataset. :return: The underlying dataset. :rtype: numpy.ndarray or scipy.sparse.csr_matrix """ return self._dataset @property def typed_dataset(self): """Get the dataset in the original type, pandas DataFrame or Series. :return: The underlying dataset. :rtype: numpy.ndarray or pandas.DataFrame or pandas.Series or scipy.sparse matrix """ wrapper_func = self.typed_wrapper_func return wrapper_func(self._dataset)
[docs] def typed_wrapper_func(self, dataset, keep_index_as_feature=False): """Get a wrapper function to convert the dataset to the original type, pandas DataFrame or Series. :param dataset: The dataset to convert to original type. :type dataset: numpy.ndarray or scipy.sparse.csr_matrix :param keep_index_as_feature: Whether to keep the index as a feature when converting back. Off by default to convert it back to index. :type keep_index_as_feature: bool :return: A wrapper function for a given dataset to convert to original type. :rtype: numpy.ndarray or scipy.sparse.csr_matrix or pandas.DataFrame or pandas.Series """ if self._dataset_is_df: if len(dataset.shape) == 1: dataset = dataset.reshape(1, dataset.shape[0]) original_dtypes = self._original_dataset_with_type.dtypes output_types = dict(original_dtypes) dataframe = pd.DataFrame(dataset, columns=self._features) if not self._default_index: if keep_index_as_feature: # Add the index name to type as feature dtype for idx, name in enumerate(self._original_dataset_with_type.index.names): level_values_dtype = self._original_dataset_with_type.index.get_level_values(idx).dtype output_types.update({name: level_values_dtype}) else: dataframe = dataframe.set_index(self._default_index_cols) return dataframe.astype(output_types) elif self._dataset_is_series: return pd.Series(dataset) elif self._dataset_is_batch: if len(dataset.shape) == 1: dataset = dataset.reshape(1, dataset.shape[0]) df = pd.DataFrame(dataset, columns=self._features) tensor_slices = (dict(df), None) tf_dataset = tf.data.Dataset.from_tensor_slices(tensor_slices) batch_dataset = tf_dataset.batch(self._batch_size) return batch_dataset else: return dataset
@property def original_dataset(self): """Get the original dataset prior to performing any operations. Note: if the original dataset was a pandas dataframe, this will return the numpy version. :return: The original dataset. :rtype: numpy.ndarray or scipy.sparse matrix """ return self._original_dataset @property def original_dataset_with_type(self): """Get the original typed dataset which could be a numpy array or pandas DataFrame or pandas Series. :return: The original dataset. :rtype: numpy.ndarray or pandas.DataFrame or pandas.Series or scipy.sparse matrix """ return self._original_dataset_with_type @property def num_features(self): """Get the number of features (columns) on the dataset. :return: The number of features (columns) in the dataset. :rtype: int """ evaluation_examples_temp = self._dataset if isinstance(evaluation_examples_temp, pd.DataFrame): evaluation_examples_temp = evaluation_examples_temp.values if len(evaluation_examples_temp.shape) == 1: return len(evaluation_examples_temp) elif issparse(evaluation_examples_temp): return evaluation_examples_temp.shape[1] else: return len(evaluation_examples_temp[0]) @property def summary_dataset(self): """Get the summary dataset without any subsetting. :return: The original dataset or None if summary was not computed. :rtype: numpy.ndarray or scipy.sparse.csr_matrix """ return self._summary_dataset def _set_default_index_cols(self, dataset): if dataset.index.names is not None: self._default_index_cols = dataset.index.names
[docs] def set_index(self): """Undo reset_index. Set index as feature on internal dataset to be an index again. """ if self._dataset_is_df: dataset = self.typed_dataset self._features = dataset.columns.values.tolist() self._dataset = dataset.values self._default_index = True
[docs] def reset_index(self): """Reset index to be part of the features on the dataset. """ dataset = self._original_dataset_with_type if self._dataset_is_df: self._default_index = pd.Index(np.arange(0, len(dataset))).equals(dataset.index) reset_dataset = dataset if not self._default_index: self._set_default_index_cols(dataset) reset_dataset = dataset.reset_index() # Move index columns to the end of the dataframe to ensure # index arguments are still valid to original dataset dcols = reset_dataset.columns.tolist() for default_index_col in self._default_index_cols: dcols.insert(len(dcols), dcols.pop(dcols.index(default_index_col))) reset_dataset = reset_dataset.reindex(columns=dcols) self._features = reset_dataset.columns.values.tolist() self._dataset = reset_dataset.values
[docs] def get_features(self, features=None, explain_subset=None, **kwargs): """Get the features of the dataset if None on current kwargs. :return: The features of the dataset if currently None on kwargs. :rtype: list """ if features is not None: if explain_subset is not None: return np.array(features)[explain_subset].tolist() return features if explain_subset is not None and self._features is not None: return np.array(self._features)[explain_subset].tolist() if self._features is None: return list(range(self._dataset.shape[1])) return self._features
[docs] def get_column_indexes(self, features, categorical_features): """Get the column indexes for the given column names. :param features: The full list of existing column names. :type features: list[str] :param categorical_features: The list of categorical feature names to get indexes for. :type categorical_features: list[str] :return: The list of column indexes. :rtype: list[int] """ return [features.index(categorical_feature) for categorical_feature in categorical_features]
[docs] def string_index(self, columns=None): """Indexes categorical string features on the dataset. :param columns: Optional parameter specifying the subset of columns that may need to be string indexed. :type columns: list :return: The transformation steps to index the given dataset. :rtype: sklearn.compose.ColumnTransformer """ if self._string_indexed: return self._column_indexer # Optimization so we don't redo this operation multiple times on the same dataset self._string_indexed = True # If the data was previously successfully summarized, then there are no # categorical columns as it must be numeric. # Also, if the dataset is sparse, we can assume there are no categorical strings if str(type(self._dataset)).endswith(".DenseData'>") or issparse(self._dataset): return None # If the user doesn't have a newer version of scikit-learn with OrdinalEncoder, don't do encoding try: from sklearn.compose import ColumnTransformer from sklearn.preprocessing import OrdinalEncoder except ImportError: return None tmp_dataset = self._dataset # Temporarily convert to pandas for easier and uniform string handling, # only use top sampled rows to limit memory usage for string type test if isinstance(self._dataset, np.ndarray): tmp_dataset = pd.DataFrame(self._dataset[:SAMPLED_STRING_ROWS, :], dtype=self._dataset.dtype) else: tmp_dataset = tmp_dataset.iloc[:SAMPLED_STRING_ROWS] categorical_col_names = list(np.array(list(tmp_dataset))[(tmp_dataset.applymap(type) == str).all(0)]) if categorical_col_names: all_columns = tmp_dataset.columns if columns is not None: categorical_col_indices = \ [all_columns.get_loc(col_name) for col_name in categorical_col_names if col_name in columns] else: categorical_col_indices = [all_columns.get_loc(col_name) for col_name in categorical_col_names] ordinal_enc = OrdinalEncoder() ct = ColumnTransformer([('ord', ordinal_enc, categorical_col_indices)], remainder='drop') string_indexes_dataset = ct.fit_transform(self._dataset) # Inplace replacement of columns # (danger: using remainder=passthrough with ColumnTransformer will change column order!) for idx, categorical_col_index in enumerate(categorical_col_indices): self._dataset[:, categorical_col_index] = string_indexes_dataset[:, idx] self._column_indexer = ct return self._column_indexer
[docs] def one_hot_encode(self, columns): """Indexes categorical string features on the dataset. :param columns: Parameter specifying the subset of column indexes that may need to be one-hot-encoded. :type columns: list[int] :return: The transformation steps to one-hot-encode the given dataset. :rtype: sklearn.preprocessing.OneHotEncoder """ if self._one_hot_encoded: return self._one_hot_encoder # Optimization so we don't redo this operation multiple times on the same dataset self._one_hot_encoded = True # If the data was previously successfully summarized, then there are no # categorical columns as it must be numeric. # Also, if the dataset is sparse, we can assume there are no categorical strings if not columns or str(type(self._dataset)).endswith(".DenseData'>") or issparse(self._dataset): return None # If the user doesn't have a newer version of scikit-learn with OneHotEncoder, don't do encoding try: from sklearn.compose import ColumnTransformer from sklearn.preprocessing import OneHotEncoder except ImportError: return None one_hot_encoder = OneHotEncoder(handle_unknown='ignore', sparse=False) self._one_hot_encoder = ColumnTransformer([('ord', one_hot_encoder, columns)], remainder='passthrough') # Note this will change column order, the one hot encoded columns will be at the start and the # rest of the columns at the end self._dataset = self._one_hot_encoder.fit_transform(self._dataset.astype(float)).astype(float) return self._one_hot_encoder
[docs] def timestamp_featurizer(self): """Featurizes the timestamp columns. :return: The transformation steps to featurize the timestamp columns. :rtype: ml_wrappers.DatasetWrapper """ if self._timestamp_featurized: return self._timestamp_featurizer # Optimization so we don't redo this operation multiple times on the same dataset self._timestamp_featurized = True # If the data was previously successfully summarized, then there are no # categorical columns as it must be numeric. # Also, if the dataset is sparse, we can assume there are no categorical strings if str(type(self._dataset)).endswith(".DenseData'>") or issparse(self._dataset): return None typed_dataset_without_index = self.typed_wrapper_func(self._dataset, keep_index_as_feature=True) self._timestamp_featurizer = CustomTimestampFeaturizer(self._features).fit(typed_dataset_without_index) self._dataset = self._timestamp_featurizer.transform(self._dataset) return self._timestamp_featurizer
[docs] def apply_indexer(self, column_indexer, bucket_unknown=False): """Indexes categorical string features on the dataset. :param column_indexer: The transformation steps to index the given dataset. :type column_indexer: sklearn.compose.ColumnTransformer :param bucket_unknown: If true, buckets unknown values to separate categorical level. :type bucket_unknown: bool """ if self._string_indexed or issparse(self._dataset): return name, ordinal_encoder, cols = column_indexer.transformers_[0] all_categories = ordinal_encoder.categories_ def convert_cols(category_to_index, value, unknown): if value in category_to_index: index = category_to_index[value] elif not bucket_unknown: # Add new index on the fly - note the background data does NOT need to # contain all possible category levels! index = len(category_to_index) + 1 category_to_index[value] = index else: # Put all unknown indexes into a separate 'unknown' bucket index = unknown category_to_index[value] = index return index for idx, i in enumerate(cols): categories_for_col = all_categories[idx] category_to_index = dict(zip(categories_for_col, range(len(categories_for_col)))) unknown = len(category_to_index) + 1 self._dataset[:, i] = list(map(lambda x: convert_cols(category_to_index, x, unknown), self._dataset[:, i])) # Ensure element types are float and not object self._dataset = self._dataset.astype(float) self._string_indexed = True
[docs] def apply_one_hot_encoder(self, one_hot_encoder): """One-hot-encode categorical string features on the dataset. :param one_hot_encoder: The transformation steps to one-hot-encode the given dataset. :type one_hot_encoder: sklearn.preprocessing.OneHotEncoder """ if self._one_hot_encoded or issparse(self._dataset): return self._dataset = one_hot_encoder.transform(self._dataset).astype(float) self._one_hot_encoded = True
[docs] def apply_timestamp_featurizer(self, timestamp_featurizer): """Apply timestamp featurization on the dataset. :param timestamp_featurizer: The transformation steps to featurize timestamps in the given dataset. :type timestamp_featurizer: CustomTimestampFeaturizer """ if self._timestamp_featurized or issparse(self._dataset): return self._dataset = timestamp_featurizer.transform(self._dataset) self._timestamp_featurized = True
[docs] def compute_summary(self, nclusters=10, use_gpu=False, **kwargs): """Summarizes the dataset if it hasn't been summarized yet.""" if self._summary_computed: return self._summary_dataset = _summarize_data(self._dataset, nclusters, use_gpu) self._dataset = self._summary_dataset self._summary_computed = True
[docs] def augment_data(self, max_num_of_augmentations=np.inf): """Augment the current dataset. :param max_augment_data_size: number of times we stack permuted x to augment. :type max_augment_data_size: int """ self._dataset = _generate_augmented_data(self._dataset, max_num_of_augmentations=max_num_of_augmentations)
[docs] def take_subset(self, explain_subset): """Take a subset of the dataset if not done before. :param explain_subset: A list of column indexes to take from the original dataset. :type explain_subset: list """ if self._subset_taken: return # Edge case: Take the subset of the summary in this case, # more optimal than recomputing the summary! explain_subset = np.array(explain_subset) self._dataset = self._dataset[:, explain_subset] self._subset_taken = True
def _reduce_examples(self, max_dim_clustering=Defaults.MAX_DIM): """Reduces the dimensionality of the examples if dimensionality is higher than max_dim_clustering. If the dataset is sparse, we mean-scale the data and then run truncated SVD to reduce the number of features to max_dim_clustering. For dense dataset, we also scale the data and then run PCA to reduce the number of features to max_dim_clustering. This is used to get better clustering results in _find_k. :param max_dim_clustering: Dimensionality threshold for performing reduction. :type max_dim_clustering: int """ from sklearn.decomposition import PCA, TruncatedSVD from sklearn.preprocessing import StandardScaler num_cols = self._dataset.shape[1] # Run PCA or SVD on input data and reduce to about MAX_DIM features prior to clustering components = min(max_dim_clustering, num_cols) reduced_examples = self._dataset if components != num_cols: if issparse(self._dataset): module_logger.debug('Reducing sparse data with StandardScaler and TruncatedSVD') normalized_examples = StandardScaler(with_mean=False).fit_transform(self._dataset) reducer = TruncatedSVD(n_components=components) else: module_logger.debug('Reducing normal data with StandardScaler and PCA') normalized_examples = StandardScaler().fit_transform(self._dataset) reducer = PCA(n_components=components) module_logger.info('reducing dimensionality to {0} components for clustering'.format(str(components))) reduced_examples = reducer.fit_transform(normalized_examples) return reduced_examples def _find_k_kmeans(self, max_dim_clustering=Defaults.MAX_DIM): """Use k-means to downsample the examples. Starting from k_upper_bound, cuts k in half each time and run k-means clustering on the examples. After each run, computes the silhouette score and stores k with highest silhouette score. We use optimal k to determine how much to downsample the examples. :param max_dim_clustering: Dimensionality threshold for performing reduction. :type max_dim_clustering: int """ from math import ceil, isnan, log from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score reduced_examples = self._reduce_examples(max_dim_clustering) num_rows = self._dataset.shape[0] k_upper_bound = 2000 k_list = [] k = min(num_rows / 2, k_upper_bound) for _ in range(int(ceil(log(num_rows, 2) - 7))): k_list.append(int(k)) k /= 2 prev_highest_score = -1 prev_highest_index = 0 opt_k = int(k) for k_index, k in enumerate(k_list): module_logger.info('running KMeans with k: {}'.format(str(k))) km = KMeans(n_clusters=k).fit(reduced_examples) clusters = km.labels_ num_clusters = len(set(clusters)) k_too_big = num_clusters <= 1 if k_too_big or num_clusters == reduced_examples.shape[0]: score = -1 else: score = silhouette_score(reduced_examples, clusters) if isnan(score): score = -1 module_logger.info('KMeans silhouette score: {}'.format(str(score))) # Find k with highest silhouette score for optimal clustering if score >= prev_highest_score and not k_too_big: prev_highest_score = score prev_highest_index = k_index opt_k = k_list[prev_highest_index] module_logger.info('best silhouette score: {}'.format(str(prev_highest_score))) module_logger.info('optimal k for KMeans: {}'.format(str(opt_k))) return opt_k def _find_k_hdbscan(self, max_dim_clustering=Defaults.MAX_DIM): """Use hdbscan to downsample the examples. We use optimal k to determine how much to downsample the examples. :param max_dim_clustering: Dimensionality threshold for performing reduction. :type max_dim_clustering: int """ import hdbscan num_rows = self._dataset.shape[0] reduced_examples = self._reduce_examples(max_dim_clustering) hdbs = hdbscan.HDBSCAN(min_cluster_size=2).fit(reduced_examples) clusters = hdbs.labels_ opt_k = len(set(clusters)) clustering_threshold = 5 samples = opt_k * clustering_threshold module_logger.info(('found optimal k for hdbscan: {},' ' will use clustering_threshold * k for sampling: {}').format(str(opt_k), str(samples))) return min(samples, num_rows)
[docs] def sample(self, max_dim_clustering=Defaults.MAX_DIM, sampling_method=Defaults.HDBSCAN): """Sample the examples. First does random downsampling to upper_bound rows, then tries to find the optimal downsample based on how many clusters can be constructed from the data. If sampling_method is hdbscan, uses hdbscan to cluster the data and then downsamples to that number of clusters. If sampling_method is k-means, uses different values of k, cutting in half each time, and chooses the k with highest silhouette score to determine how much to downsample the data. The danger of using only random downsampling is that we might downsample too much or too little, so the clustering approach is a heuristic to give us some idea of how much we should downsample to. :param max_dim_clustering: Dimensionality threshold for performing reduction. :type max_dim_clustering: int :param sampling_method: Method to use for sampling, can be 'hdbscan' or 'kmeans'. :type sampling_method: str """ from sklearn.utils import resample # bounds are rough estimates that came from manual investigation lower_bound = 200 upper_bound = 10000 num_rows = self._dataset.shape[0] module_logger.info('sampling examples') # If less than lower_bound rows, just return the full dataset if num_rows < lower_bound: return self._dataset # If more than upper_bound rows, sample randomly elif num_rows > upper_bound: module_logger.info('randomly sampling to 10k rows') self._dataset = resample(self._dataset, n_samples=upper_bound, random_state=7) num_rows = upper_bound if sampling_method == Defaults.HDBSCAN: try: opt_k = self._find_k_hdbscan(max_dim_clustering) except Exception as ex: module_logger.warning(('Failed to use hdbscan due to error: {}' '\nEnsure hdbscan is installed with: pip install hdbscan').format(str(ex))) opt_k = self._find_k_kmeans(max_dim_clustering) else: opt_k = self._find_k_kmeans(max_dim_clustering) # Resample based on optimal number of clusters if (opt_k < num_rows): self._dataset = resample(self._dataset, n_samples=opt_k, random_state=7) return self._dataset
def _clear(self): """Optimization for memory usage. Clears all internal references so they can be garbage collected. """ if self._clear_references: self._features = None self._original_dataset_with_type = None self._dataset_is_df = None self._dataset_is_series = None self._default_index_cols = None self._default_index = None self._dataset = None self._original_dataset = None self._summary_dataset = None self._column_indexer = None self._subset_taken = False self._summary_computed = False self._string_indexed = False self._one_hot_encoded = False self._one_hot_encoder = None self._timestamp_featurized = False self._timestamp_featurizer = None