Add classifier and FileDataStream unit tests to test_pipeline_combining. (#212)

Add classifier and FileDataStream unit tests to test_pipeline_combining.
This commit is contained in:
pieths 2019-08-05 16:28:38 -07:00 коммит произвёл GitHub
Родитель 51bdff2bb3
Коммит c655aad93b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
1 изменённых файлов: 84 добавлений и 1 удалений

Просмотреть файл

@ -7,7 +7,7 @@ import unittest
import numpy as np
import pandas as pd
from nimbusml import Pipeline
from nimbusml import Pipeline, FileDataStream
from nimbusml.datasets import get_dataset
from nimbusml.feature_extraction.categorical import OneHotVectorizer
from nimbusml.linear_model import LogisticRegressionBinaryClassifier, OnlineGradientDescentRegressor
@ -323,6 +323,89 @@ class TestPipelineCombining(unittest.TestCase):
self.assertTrue(result_1.equals(result_2))
def test_combine_with_classifier_trained_with_y_arg(self):
"""
Tests a sequence where the initial transform is computed
using both X and y input args. Note, any steps after the
initial transform will be operating on data where the X
and y have been combined in to one dataset.
"""
np.random.seed(0)
df = get_dataset("infert").as_df()
X = df.loc[:, df.columns != 'case']
y = df['case']
transform = OneHotVectorizer() << 'education_str'
# Passing in both X and y
df = transform.fit_transform(X, y, as_binary_data_stream=True)
# NOTE: need to specify the label column here because the
# feature and label data was joined in the last step.
predictor = LogisticRegressionBinaryClassifier(label='case', feature=list(X.columns))
predictor.fit(df)
df = transform.transform(X, as_binary_data_stream=True)
result_1 = predictor.predict(df)
# Combine the models and perform a prediction
combined_pipeline = Pipeline.combine_models(transform, predictor)
result_2 = combined_pipeline.predict(X)
result_2 = result_2['PredictedLabel'].astype(np.float64)
self.assertTrue(result_1.equals(result_2))
def test_combine_with_classifier_trained_with_joined_X_and_y(self):
np.random.seed(0)
infert_df = get_dataset("infert").as_df()
feature_cols = [c for c in infert_df.columns if c != 'case']
transform = OneHotVectorizer() << 'education_str'
df = transform.fit_transform(infert_df, as_binary_data_stream=True)
predictor = LogisticRegressionBinaryClassifier(label='case', feature=feature_cols)
predictor.fit(df)
df = transform.transform(infert_df, as_binary_data_stream=True)
result_1 = predictor.predict(df)
# Combine the models and perform a prediction
combined_pipeline = Pipeline.combine_models(transform, predictor)
result_2 = combined_pipeline.predict(infert_df)
result_2 = result_2['PredictedLabel'].astype(np.float64)
self.assertTrue(result_1.equals(result_2))
def test_combine_with_classifier_trained_with_filedatastream(self):
path = get_dataset('infert').as_filepath()
data = FileDataStream.read_csv(path)
transform = OneHotVectorizer(columns={'edu': 'education'})
df = transform.fit_transform(data, as_binary_data_stream=True)
feature_cols = ['parity', 'edu', 'age', 'induced', 'spontaneous', 'stratum', 'pooled.stratum']
predictor = LogisticRegressionBinaryClassifier(feature=feature_cols, label='case')
predictor.fit(df)
data = FileDataStream.read_csv(path)
df = transform.transform(data, as_binary_data_stream=True)
result_1 = predictor.predict(df)
data = FileDataStream.read_csv(path)
combined_pipeline = Pipeline.combine_models(transform, predictor)
result_2 = combined_pipeline.predict(data)
result_1 = result_1.astype(np.int32)
result_2 = result_2['PredictedLabel'].astype(np.int32)
self.assertTrue(result_1.equals(result_2))
if __name__ == '__main__':
unittest.main()