Source code for streamline.dataprep.data_process

import os
import time
import pickle
import random
import logging
import numpy as np
import pandas as pd
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler
from streamline.utils.job import Job


[docs]class DataProcessing(Job): """ Data Processing Job Class for Scaling and Imputation of CV Datasets """ def __init__(self, cv_train_path, cv_test_path, experiment_path, scale_data=True, impute_data=True, multi_impute=True, overwrite_cv=True, class_label="Class", instance_label=None, random_state=None): """ Args: cv_train_path: cv_test_path: experiment_path: scale_data: impute_data: multi_impute: overwrite_cv: class_label: instance_label: random_state: """ super().__init__() self.cv_train_path = cv_train_path self.cv_test_path = cv_test_path self.experiment_path = experiment_path self.scale_data = scale_data self.impute_data = impute_data self.multi_impute = multi_impute self.overwrite_cv = overwrite_cv self.class_label = class_label self.instance_label = instance_label self.categorical_variables = None self.dataset_name = None self.cv_count = None self.random_state = random_state
[docs] def run(self): """ Run all elements of the data preprocessing: data scaling and missing value imputation (mode imputation for categorical features and MICE-based iterative imputing for quantitative features) """ # Set random seeds for repeatability random.seed(self.random_state) np.random.seed(self.random_state) # Load target training and testing datasets data_train, data_test = self.load_data() # Grab header labels for features only header = data_train.columns.values.tolist() header.remove(self.class_label) if not (self.instance_label is None): header.remove(self.instance_label) logging.info('Preparing Train and Test for: ' + str(self.dataset_name) + "_CV_" + str(self.cv_count)) # Temporarily separate class column to be merged back into training and testing datasets later y_train = data_train[self.class_label] y_test = data_test[self.class_label] # If present, temporarily separate instance label to be merged back into training and testing datasets later if not (self.instance_label is None): i_train = data_train[self.instance_label] i_test = data_test[self.instance_label] # Create features-only version of training and testing datasets for scaling and imputation if self.instance_label is None: x_train = data_train.drop([self.class_label], axis=1) # exclude class column x_test = data_test.drop([self.class_label], axis=1) # exclude class column else: x_train = data_train.drop([self.class_label, self.instance_label], axis=1) # exclude class column x_test = data_test.drop([self.class_label, self.instance_label], axis=1) # exclude class column del data_train # memory cleanup del data_test # memory cleanup # Load previously identified list of categorical variables # and create an index list to identify respective columns file = open(self.experiment_path + '/' + self.dataset_name + '/exploratory/categorical_variables.pickle', 'rb') self.categorical_variables = pickle.load(file) # Impute Missing Values in training and testing data if specified by user if self.impute_data: logging.info('Imputing Missing Values...') # Confirm that there are missing values in original dataset to bother with imputation data_counts = pd.read_csv(self.experiment_path + '/' + self.dataset_name + '/exploratory/DataCounts.csv', na_values='NA', sep=',') missing_values = int(data_counts['Count'].values[4]) if missing_values != 0: x_train, x_test = self.impute_cv_data(x_train, x_test) x_train = pd.DataFrame(x_train, columns=header) x_test = pd.DataFrame(x_test, columns=header) else: # No missing data found in dataset logging.info('Notice: No missing values found. Imputation skipped.') # Scale training and testing datasets if specified by user if self.scale_data: logging.info('Scaling Data Values...') x_train, x_test = self.data_scaling(x_train, x_test) # Remerge features with class and instance label in training and testing data if self.instance_label is None: data_train = pd.concat([ pd.DataFrame(y_train, columns=[self.class_label]), pd.DataFrame(x_train, columns=header) ], axis=1, sort=False) data_test = pd.concat([ pd.DataFrame(y_test, columns=[self.class_label]), pd.DataFrame(x_test, columns=header) ], axis=1, sort=False) else: data_train = pd.concat([ pd.DataFrame(y_train, columns=[self.class_label]), pd.DataFrame(i_train, columns=[self.instance_label]), pd.DataFrame(x_train, columns=header) ], axis=1, sort=False) data_test = pd.concat([ pd.DataFrame(y_test, columns=[self.class_label]), pd.DataFrame(i_test, columns=[self.instance_label]), pd.DataFrame(x_test, columns=header) ], axis=1, sort=False) del x_train # memory cleanup del x_test # memory cleanup # Export imputed and/or scaled cv data logging.info('Saving Processed Train and Test Data...') if self.impute_data or self.scale_data: self.write_cv_files(data_train, data_test) # Save phase runtime self.save_runtime() # Print phase completion logging.info(self.dataset_name + " Phase 2 complete") job_file = open( self.experiment_path + '/jobsCompleted/job_preprocessing_' + self.dataset_name + '_' + str(self.cv_count) + '.txt', 'w') job_file.write('complete') job_file.close()
[docs] def load_data(self): """ Load the target training and testing datasets and return respective dataframes, feature header labels, dataset name, and specific cv partition number for this dataset pair. """ # Grab path name components self.dataset_name = self.cv_train_path.split('/')[-3] self.cv_count = self.cv_train_path.split('/')[-1].split("_")[-2] # Load training and testing datasets data_train = pd.read_csv(self.cv_train_path, na_values='NA', sep=',') data_test = pd.read_csv(self.cv_test_path, na_values='NA', sep=',') return data_train, data_test
[docs] def impute_cv_data(self, x_train, x_test): """ Begin by imputing categorical variables with simple 'mode' imputation Args: x_train: pandas dataframe with train set data x_test: pandas dataframe with test set data Returns: Imputed x_train and x_test """ mode_dict = {} for c in x_train.columns: if c in self.categorical_variables: train_mode = x_train[c].mode().iloc[0] x_train[c].fillna(train_mode, inplace=True) mode_dict[c] = train_mode for c in x_test.columns: if c in self.categorical_variables: x_test[c].fillna(mode_dict[c], inplace=True) # Save impute map for downstream use. outfile = open( self.experiment_path + '/' + self.dataset_name + "/scale_impute/categorical_imputer_cv" + str(self.cv_count) + '.pickle', "wb") pickle.dump(mode_dict, outfile) outfile.close() if self.multi_impute: # Impute quantitative features (x) using iterative imputer (multiple imputation) imputer = IterativeImputer(random_state=self.random_state, max_iter=30) imputer = imputer.fit(x_train) x_train = imputer.transform(x_train) x_test = imputer.transform(x_test) # Save impute map for downstream use. outfile = open( self.experiment_path + '/' + self.dataset_name + '/scale_impute/ordinal_imputer_cv' + str(self.cv_count) + '.pickle', 'wb') pickle.dump(imputer, outfile) outfile.close() else: # Impute quantitative features (x) with simple mean imputation median_dict = {} for c in x_train.columns: if not (c in self.categorical_variables): train_median = x_train[c].median() x_train[c].fillna(train_median, inplace=True) median_dict[c] = train_median for c in x_test.columns: if not (c in self.categorical_variables): x_test[c].fillna(median_dict[c], inplace=True) # Save impute map for downstream use. outfile = open( self.experiment_path + '/' + self.dataset_name + '/scale_impute/ordinal_imputer_cv' + str(self.cv_count) + '.pickle', 'wb') pickle.dump(median_dict, outfile) outfile.close() return x_train, x_test
[docs] def data_scaling(self, x_train, x_test): """ Conducts data scaling using scikit-learn StandardScalar method which standardizes featuers by removing the mean and scaling to unit variance. This scaling transformation is determined (i.e. fit) based on the training dataset alone then the same scaling is applied (i.e. transform) to both the training and testing datasets. The fit scaling is pickled so that it can be applied identically to data in the future for model application. Args: x_train: pandas dataframe with train set data x_test: pandas dataframe with test set data Returns: Scaled x_train and x_test """ # number of decimal places to round scaled values to # (Important to avoid float round errors, and thus pipeline reproducibility) decimal_places = 7 # Scale features (x) using training data scaler = StandardScaler() scaler.fit(x_train) x_train = pd.DataFrame(scaler.transform(x_train).round(decimal_places), columns=x_train.columns) # Avoid float value rounding errors with large numbers of decimal places. # Important for pipeline reproducibility # x_train = x_train.round(decimal_places) # Scale features (x) using fit scalar in corresponding testing dataset x_test = pd.DataFrame(scaler.transform(x_test).round(decimal_places), columns=x_test.columns) # Avoid float value rounding errors with large numbers of decimal places. # Important for pipeline reproducibility # x_test = x_test.round(decimal_places) # Save scalar for future use outfile = open(self.experiment_path + '/' + self.dataset_name + '/scale_impute/scaler_cv' + str(self.cv_count) + '.pickle', 'wb') pickle.dump(scaler, outfile) outfile.close() return x_train, x_test
[docs] def write_cv_files(self, data_train, data_test): """ Exports new training and testing datasets following imputation and/or scaling. Includes option to overwrite original dataset (to save space) or preserve a copy of training and testing dataset with CVOnly (for comparison and quality control). Args: data_train: pandas dataframe with train set data data_test: pandas dataframe with test set data Returns: None """ if self.overwrite_cv: # Remove old CV files os.remove(self.cv_train_path) os.remove(self.cv_test_path) else: # Rename old CV files os.rename(self.cv_train_path, self.experiment_path + '/' + self.dataset_name + '/CVDatasets/' + self.dataset_name + '_CVOnly_' + str(self.cv_count) + "_Train.csv") os.rename(self.cv_test_path, self.experiment_path + '/' + self.dataset_name + '/CVDatasets/' + self.dataset_name + '_CVOnly_' + str(self.cv_count) + "_Test.csv") # Write new CV files data_train.to_csv(self.cv_train_path, index=False) data_test.to_csv(self.cv_test_path, index=False)
[docs] def save_runtime(self): """ Save runtime for this phase """ if not os.path.exists(self.experiment_path + '/' + self.dataset_name + '/runtime/'): os.mkdir(self.experiment_path + '/' + self.dataset_name + '/runtime/') runtime_file = open(self.experiment_path + '/' + self.dataset_name + '/runtime/runtime_preprocessing' + self.cv_count + '.txt', 'w+') runtime_file.write(str(time.time() - self.job_start_time)) runtime_file.close()