Merge pull request #106 from Azure/fix/reinstate-mlflow
Fix/reinstate mlflow
This commit is contained in:
Коммит
d07ac0fe34
|
@ -12,8 +12,5 @@ dependencies:
|
|||
- pandas==1.2.1
|
||||
- joblib==1.0.0
|
||||
- matplotlib==3.3.3
|
||||
- fairlearn==0.7.0
|
||||
- azureml-contrib-fairness==1.38.0
|
||||
- interpret-community==0.24.1
|
||||
- interpret-core==0.2.7
|
||||
- azureml-interpret==1.38.0
|
||||
- git+https://github.com/microsoft/AzureML-Observability#subdirectory=aml-obs-client
|
||||
- git+https://github.com/microsoft/AzureML-Observability#subdirectory=aml-obs-collector
|
|
@ -12,8 +12,5 @@ dependencies:
|
|||
- pandas==1.2.1
|
||||
- joblib==1.0.0
|
||||
- matplotlib==3.3.3
|
||||
- fairlearn==0.7.0
|
||||
- azureml-contrib-fairness==1.38.0
|
||||
- interpret-community==0.24.1
|
||||
- interpret-core==0.2.7
|
||||
- azureml-interpret==1.38.0
|
||||
- git+https://github.com/microsoft/AzureML-Observability#subdirectory=aml-obs-client
|
||||
- git+https://github.com/microsoft/AzureML-Observability#subdirectory=aml-obs-collector
|
|
@ -1,7 +1,12 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License.
|
||||
"""
|
||||
Evaluates trained ML model using test dataset.
|
||||
Saves predictions, evaluation results and deploy flag.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import os
|
||||
import pickle
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
@ -9,20 +14,10 @@ from matplotlib import pyplot as plt
|
|||
|
||||
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
|
||||
|
||||
from azureml.core import Run, Model
|
||||
|
||||
from fairlearn.metrics._group_metric_set import _create_group_metric_set
|
||||
from azureml.contrib.fairness import upload_dashboard_dictionary, download_dashboard_by_upload_id
|
||||
|
||||
from interpret_community import TabularExplainer
|
||||
from azureml.interpret import ExplanationClient
|
||||
|
||||
import mlflow
|
||||
import mlflow.sklearn
|
||||
|
||||
# current run
|
||||
run = Run.get_context()
|
||||
ws = run.experiment.workspace
|
||||
import mlflow.pyfunc
|
||||
from mlflow.tracking import MlflowClient
|
||||
|
||||
TARGET_COL = "cost"
|
||||
|
||||
|
@ -55,56 +50,43 @@ CAT_NOM_COLS = [
|
|||
CAT_ORD_COLS = [
|
||||
]
|
||||
|
||||
SENSITIVE_COLS = ["vendor"] # for fairlearn dashborad
|
||||
|
||||
|
||||
def parse_args():
|
||||
|
||||
'''Parse input arguments'''
|
||||
|
||||
parser = argparse.ArgumentParser("predict")
|
||||
parser.add_argument("--model_name", type=str, help="Name of registered model")
|
||||
parser.add_argument("--model_input", type=str, help="Path of input model")
|
||||
parser.add_argument("--prepared_data", type=str, help="Path to transformed data")
|
||||
parser.add_argument("--predictions", type=str, help="Path of predictions")
|
||||
parser.add_argument("--score_report", type=str, help="Path to score report")
|
||||
parser.add_argument('--deploy_flag', type=str, help='A deploy flag whether to deploy or no')
|
||||
parser.add_argument("--test_data", type=str, help="Path to test dataset")
|
||||
parser.add_argument("--evaluation_output", type=str, help="Path of eval results")
|
||||
parser.add_argument("--runner", type=str, help="Local or Cloud Runner", default="CloudRunner")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
args = parse_args()
|
||||
|
||||
lines = [
|
||||
f"Model path: {args.model_input}",
|
||||
f"Test data path: {args.prepared_data}",
|
||||
f"Predictions path: {args.predictions}",
|
||||
f"Scoring output path: {args.score_report}",
|
||||
]
|
||||
|
||||
for line in lines:
|
||||
print(line)
|
||||
|
||||
# ---------------- Model Evaluation ---------------- #
|
||||
def main(args):
|
||||
'''Read trained model and test dataset, evaluate model and save result'''
|
||||
|
||||
# Load the test data
|
||||
test_data = pd.read_parquet(Path(args.test_data))
|
||||
|
||||
print("mounted_path files: ")
|
||||
arr = os.listdir(args.prepared_data)
|
||||
|
||||
train_data = pd.read_csv((Path(args.prepared_data) / "train.csv"))
|
||||
test_data = pd.read_csv((Path(args.prepared_data) / "test.csv"))
|
||||
|
||||
y_train = train_data[TARGET_COL]
|
||||
X_train = train_data[NUMERIC_COLS + CAT_NOM_COLS + CAT_ORD_COLS]
|
||||
|
||||
# Split the data into inputs and outputs
|
||||
y_test = test_data[TARGET_COL]
|
||||
X_test = test_data[NUMERIC_COLS + CAT_NOM_COLS + CAT_ORD_COLS]
|
||||
|
||||
# Load the model from input port
|
||||
model = pickle.load(open((Path(args.model_input) / "model.pkl"), "rb"))
|
||||
model = mlflow.sklearn.load_model(args.model_input)
|
||||
|
||||
# ---------------- Model Evaluation ---------------- #
|
||||
yhat_test, score = model_evaluation(X_test, y_test, model, args.evaluation_output)
|
||||
|
||||
# ----------------- Model Promotion ---------------- #
|
||||
if args.runner == "CloudRunner":
|
||||
predictions, deploy_flag = model_promotion(args.model_name, args.evaluation_output, X_test, y_test, yhat_test, score)
|
||||
|
||||
|
||||
|
||||
def model_evaluation(X_test, y_test, model, evaluation_output):
|
||||
|
||||
# Get predictions to y_test (y_test)
|
||||
yhat_test = model.predict(X_test)
|
||||
|
@ -113,7 +95,7 @@ def main():
|
|||
output_data = X_test.copy()
|
||||
output_data["real_label"] = y_test
|
||||
output_data["predicted_label"] = yhat_test
|
||||
output_data.to_csv((Path(args.predictions) / "predictions.csv"))
|
||||
output_data.to_csv((Path(evaluation_output) / "predictions.csv"))
|
||||
|
||||
# Evaluate Model performance with the test set
|
||||
r2 = r2_score(y_test, yhat_test)
|
||||
|
@ -122,15 +104,14 @@ def main():
|
|||
mae = mean_absolute_error(y_test, yhat_test)
|
||||
|
||||
# Print score report to a text file
|
||||
(Path(args.score_report) / "score.txt").write_text(
|
||||
"Scored with the following model:\n{}".format(model)
|
||||
(Path(evaluation_output) / "score.txt").write_text(
|
||||
f"Scored with the following model:\n{format(model)}"
|
||||
)
|
||||
with open((Path(args.score_report) / "score.txt"), "a") as f:
|
||||
f.write("Mean squared error: %.2f \n" % mse)
|
||||
f.write("Root mean squared error: %.2f \n" % rmse)
|
||||
f.write("Mean absolute error: %.2f \n" % mae)
|
||||
f.write("Coefficient of determination: %.2f \n" % r2)
|
||||
|
||||
with open((Path(evaluation_output) / "score.txt"), "a") as outfile:
|
||||
outfile.write("Mean squared error: {mse.2f} \n")
|
||||
outfile.write("Root mean squared error: {rmse.2f} \n")
|
||||
outfile.write("Mean absolute error: {mae.2f} \n")
|
||||
outfile.write("Coefficient of determination: {r2.2f} \n")
|
||||
|
||||
mlflow.log_metric("test r2", r2)
|
||||
mlflow.log_metric("test mse", mse)
|
||||
|
@ -146,18 +127,23 @@ def main():
|
|||
plt.savefig("predictions.png")
|
||||
mlflow.log_artifact("predictions.png")
|
||||
|
||||
# -------------------- Promotion ------------------- #
|
||||
return yhat_test, r2
|
||||
|
||||
def model_promotion(model_name, evaluation_output, X_test, y_test, yhat_test, score):
|
||||
|
||||
scores = {}
|
||||
predictions = {}
|
||||
score = r2_score(y_test, yhat_test) # current model
|
||||
for model_run in Model.list(ws):
|
||||
if model_run.name == args.model_name:
|
||||
model_path = Model.download(model_run, exist_ok=True)
|
||||
mdl = pickle.load(open((Path(model_path) / "model.pkl"), "rb"))
|
||||
predictions[model_run.id] = mdl.predict(X_test)
|
||||
scores[model_run.id] = r2_score(y_test, predictions[model_run.id])
|
||||
|
||||
print(scores)
|
||||
|
||||
client = MlflowClient()
|
||||
|
||||
for model_run in client.search_model_versions(f"name='{model_name}'"):
|
||||
model_version = model_run.version
|
||||
mdl = mlflow.pyfunc.load_model(
|
||||
model_uri=f"models:/{model_name}/{model_version}")
|
||||
predictions[f"{model_name}:{model_version}"] = mdl.predict(X_test)
|
||||
scores[f"{model_name}:{model_version}"] = r2_score(
|
||||
y_test, predictions[f"{model_name}:{model_version}"])
|
||||
|
||||
if scores:
|
||||
if score >= max(list(scores.values())):
|
||||
deploy_flag = 1
|
||||
|
@ -165,69 +151,41 @@ def main():
|
|||
deploy_flag = 0
|
||||
else:
|
||||
deploy_flag = 1
|
||||
print("Deploy flag: ",deploy_flag)
|
||||
print(f"Deploy flag: {deploy_flag}")
|
||||
|
||||
with open((Path(args.deploy_flag) / "deploy_flag"), 'w') as f:
|
||||
f.write('%d' % int(deploy_flag))
|
||||
|
||||
with open((Path(evaluation_output) / "deploy_flag"), 'w') as outfile:
|
||||
outfile.write(f"{int(deploy_flag)}")
|
||||
|
||||
# add current model score and predictions
|
||||
scores["current model"] = score
|
||||
perf_comparison_plot = pd.DataFrame(scores, index=["r2 score"]).plot(kind='bar', figsize=(15, 10))
|
||||
predictions["currrent model"] = yhat_test
|
||||
|
||||
perf_comparison_plot = pd.DataFrame(
|
||||
scores, index=["r2 score"]).plot(kind='bar', figsize=(15, 10))
|
||||
perf_comparison_plot.figure.savefig("perf_comparison.png")
|
||||
perf_comparison_plot.figure.savefig(Path(args.score_report) / "perf_comparison.png")
|
||||
|
||||
perf_comparison_plot.figure.savefig(Path(evaluation_output) / "perf_comparison.png")
|
||||
|
||||
mlflow.log_metric("deploy flag", bool(deploy_flag))
|
||||
mlflow.log_artifact("perf_comparison.png")
|
||||
|
||||
|
||||
# -------------------- FAIRNESS ------------------- #
|
||||
# Calculate Fairness Metrics over Sensitive Features
|
||||
# Create a dictionary of model(s) you want to assess for fairness
|
||||
|
||||
sf = { col: X_test[[col]] for col in SENSITIVE_COLS }
|
||||
predictions["currrent model"] = [x for x in model.predict(X_test)]
|
||||
|
||||
dash_dict_all = _create_group_metric_set(y_true=y_test,
|
||||
predictions=predictions,
|
||||
sensitive_features=sf,
|
||||
prediction_type='regression',
|
||||
)
|
||||
|
||||
# Upload the dashboard to Azure Machine Learning
|
||||
dashboard_title = "Fairness insights Comparison of Models"
|
||||
|
||||
# Set validate_model_ids parameter of upload_dashboard_dictionary to False
|
||||
# if you have not registered your model(s)
|
||||
upload_id = upload_dashboard_dictionary(run,
|
||||
dash_dict_all,
|
||||
dashboard_name=dashboard_title,
|
||||
validate_model_ids=False)
|
||||
print("\nUploaded to id: {0}\n".format(upload_id))
|
||||
|
||||
|
||||
# -------------------- Explainability ------------------- #
|
||||
tabular_explainer = TabularExplainer(model,
|
||||
initialization_examples=X_train,
|
||||
features=X_train.columns)
|
||||
|
||||
# save explainer
|
||||
#joblib.dump(tabular_explainer, os.path.join(tabular_explainer, "explainer"))
|
||||
|
||||
# find global explanations for feature importance
|
||||
# you can use the training data or the test data here,
|
||||
# but test data would allow you to use Explanation Exploration
|
||||
global_explanation = tabular_explainer.explain_global(X_test)
|
||||
|
||||
# sorted feature importance values and feature names
|
||||
sorted_global_importance_values = global_explanation.get_ranked_global_values()
|
||||
sorted_global_importance_names = global_explanation.get_ranked_global_names()
|
||||
|
||||
print("Explainability feature importance:")
|
||||
# alternatively, you can print out a dictionary that holds the top K feature names and values
|
||||
global_explanation.get_feature_importance_dict()
|
||||
|
||||
client = ExplanationClient.from_run(run)
|
||||
client.upload_model_explanation(global_explanation, comment='global explanation: all features')
|
||||
|
||||
return predictions, deploy_flag
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
mlflow.start_run()
|
||||
|
||||
args = parse_args()
|
||||
|
||||
lines = [
|
||||
f"Model name: {args.model_name}",
|
||||
f"Model path: {args.model_input}",
|
||||
f"Test data path: {args.test_data}",
|
||||
f"Evaluation output path: {args.evaluation_output}",
|
||||
]
|
||||
|
||||
for line in lines:
|
||||
print(line)
|
||||
|
||||
main(args)
|
||||
|
||||
mlflow.end_run()
|
||||
|
|
|
@ -1,3 +1,9 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License.
|
||||
"""
|
||||
Prepares raw data and provides training, validation and test datasets
|
||||
"""
|
||||
|
||||
import argparse
|
||||
|
||||
from pathlib import Path
|
||||
|
@ -7,14 +13,49 @@ import pandas as pd
|
|||
|
||||
import mlflow
|
||||
|
||||
TARGET_COL = "cost"
|
||||
|
||||
NUMERIC_COLS = [
|
||||
"distance",
|
||||
"dropoff_latitude",
|
||||
"dropoff_longitude",
|
||||
"passengers",
|
||||
"pickup_latitude",
|
||||
"pickup_longitude",
|
||||
"pickup_weekday",
|
||||
"pickup_month",
|
||||
"pickup_monthday",
|
||||
"pickup_hour",
|
||||
"pickup_minute",
|
||||
"pickup_second",
|
||||
"dropoff_weekday",
|
||||
"dropoff_month",
|
||||
"dropoff_monthday",
|
||||
"dropoff_hour",
|
||||
"dropoff_minute",
|
||||
"dropoff_second",
|
||||
]
|
||||
|
||||
CAT_NOM_COLS = [
|
||||
"store_forward",
|
||||
"vendor",
|
||||
]
|
||||
|
||||
CAT_ORD_COLS = [
|
||||
]
|
||||
|
||||
def parse_args():
|
||||
'''Parse input arguments'''
|
||||
|
||||
parser = argparse.ArgumentParser("prep")
|
||||
parser.add_argument("--raw_data", type=str, help="Path to raw data")
|
||||
parser.add_argument("--prepared_data", type=str, help="Path of prepared data")
|
||||
parser.add_argument("--train_data", type=str, help="Path to train dataset")
|
||||
parser.add_argument("--val_data", type=str, help="Path to test dataset")
|
||||
parser.add_argument("--test_data", type=str, help="Path to test dataset")
|
||||
|
||||
parser.add_argument("--enable_monitoring", type=str, help="enable logging to ADX")
|
||||
parser.add_argument("--table_name", type=str, default="mlmonitoring", help="Table name in ADX for logging")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
return args
|
||||
|
@ -24,20 +65,8 @@ def log_training_data(df, table_name):
|
|||
collector = Online_Collector(table_name)
|
||||
collector.batch_collect(df)
|
||||
|
||||
def main():
|
||||
|
||||
# ---------- Parse Arguments ----------- #
|
||||
# -------------------------------------- #
|
||||
|
||||
args = parse_args()
|
||||
|
||||
lines = [
|
||||
f"Raw data path: {args.raw_data}",
|
||||
f"Data output path: {args.prepared_data}",
|
||||
]
|
||||
|
||||
for line in lines:
|
||||
print(line)
|
||||
def main(args):
|
||||
'''Read, split, and save datasets'''
|
||||
|
||||
# ------------ Reading Data ------------ #
|
||||
# -------------------------------------- #
|
||||
|
@ -47,6 +76,7 @@ def main():
|
|||
print(arr)
|
||||
|
||||
data = pd.read_csv((Path(args.raw_data) / 'taxi-data.csv'))
|
||||
data = data[NUMERIC_COLS + CAT_NOM_COLS + CAT_ORD_COLS + [TARGET_COL]]
|
||||
|
||||
# ------------- Split Data ------------- #
|
||||
# -------------------------------------- #
|
||||
|
@ -61,18 +91,42 @@ def main():
|
|||
|
||||
train = data[msk_train]
|
||||
val = data[msk_val]
|
||||
test = data[msk_test]
|
||||
test = data[msk_test]
|
||||
|
||||
mlflow.log_metric('train size', train.shape[0])
|
||||
mlflow.log_metric('val size', val.shape[0])
|
||||
mlflow.log_metric('test size', test.shape[0])
|
||||
|
||||
train.to_csv((Path(args.prepared_data) / "train.csv"))
|
||||
val.to_csv((Path(args.prepared_data) / "val.csv"))
|
||||
test.to_csv((Path(args.prepared_data) / "test.csv"))
|
||||
train.to_parquet((Path(args.train_data) / "train.parquet"))
|
||||
val.to_parquet((Path(args.val_data) / "val.parquet"))
|
||||
test.to_parquet((Path(args.test_data) / "test.parquet"))
|
||||
|
||||
if (args.enable_monitoring.lower == 'true' or args.enable_monitoring == '1' or args.enable_monitoring.lower == 'yes'):
|
||||
log_training_data(data, args.table_name)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
mlflow.start_run()
|
||||
|
||||
# ---------- Parse Arguments ----------- #
|
||||
# -------------------------------------- #
|
||||
|
||||
args = parse_args()
|
||||
|
||||
lines = [
|
||||
f"Raw data path: {args.raw_data}",
|
||||
f"Train dataset output path: {args.train_data}",
|
||||
f"Val dataset output path: {args.val_data}",
|
||||
f"Test dataset path: {args.test_data}",
|
||||
|
||||
]
|
||||
|
||||
for line in lines:
|
||||
print(line)
|
||||
|
||||
main(args)
|
||||
|
||||
mlflow.end_run()
|
||||
|
||||
|
|
@ -1,56 +1,85 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License.
|
||||
"""
|
||||
Registers trained ML model if deploy flag is True.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
import pickle
|
||||
|
||||
import mlflow
|
||||
|
||||
from azureml.core import Run
|
||||
|
||||
# Get run
|
||||
run = Run.get_context()
|
||||
run_id = run.get_details()["runId"]
|
||||
print(run_id)
|
||||
import os
|
||||
import json
|
||||
|
||||
def parse_args():
|
||||
'''Parse input arguments'''
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--model_name', type=str, help='Name under which model will be registered')
|
||||
parser.add_argument('--model_path', type=str, help='Model directory')
|
||||
parser.add_argument('--deploy_flag', type=str, help='A deploy flag whether to deploy or no')
|
||||
|
||||
parser.add_argument('--evaluation_output', type=str, help='Path of eval results')
|
||||
parser.add_argument(
|
||||
"--model_info_output_path", type=str, help="Path to write model info JSON"
|
||||
)
|
||||
args, _ = parser.parse_known_args()
|
||||
print(f'Arguments: {args}')
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
args = parse_args()
|
||||
|
||||
model_name = args.model_name
|
||||
model_path = args.model_path
|
||||
|
||||
with open((Path(args.deploy_flag) / "deploy_flag"), 'rb') as f:
|
||||
deploy_flag = int(f.read())
|
||||
def main(args):
|
||||
'''Loads model, registers it if deply flag is True'''
|
||||
|
||||
with open((Path(args.evaluation_output) / "deploy_flag"), 'rb') as infile:
|
||||
deploy_flag = int(infile.read())
|
||||
|
||||
mlflow.log_metric("deploy flag", int(deploy_flag))
|
||||
deploy_flag=1
|
||||
if deploy_flag==1:
|
||||
|
||||
print("Registering ", model_name)
|
||||
|
||||
model = pickle.load(open((Path(model_path) / "model.pkl"), "rb"))
|
||||
# log model using mlflow
|
||||
mlflow.sklearn.log_model(model, model_name)
|
||||
print("Registering ", args.model_name)
|
||||
|
||||
# register model using mlflow model
|
||||
# load model
|
||||
model = mlflow.sklearn.load_model(args.model_path)
|
||||
|
||||
# log model using mlflow
|
||||
mlflow.sklearn.log_model(model, args.model_name)
|
||||
|
||||
# register logged model using mlflow
|
||||
run_id = mlflow.active_run().info.run_id
|
||||
model_uri = f'runs:/{run_id}/{args.model_name}'
|
||||
mlflow.register_model(model_uri, model_name)
|
||||
|
||||
mlflow_model = mlflow.register_model(model_uri, args.model_name)
|
||||
model_version = mlflow_model.version
|
||||
|
||||
# write model info
|
||||
print("Writing JSON")
|
||||
dict = {"id": "{0}:{1}".format(args.model_name, model_version)}
|
||||
output_path = os.path.join(args.model_info_output_path, "model_info.json")
|
||||
with open(output_path, "w") as of:
|
||||
json.dump(dict, fp=of)
|
||||
|
||||
else:
|
||||
print("Model will not be registered!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
mlflow.start_run()
|
||||
|
||||
# ---------- Parse Arguments ----------- #
|
||||
# -------------------------------------- #
|
||||
|
||||
args = parse_args()
|
||||
|
||||
lines = [
|
||||
f"Model name: {args.model_name}",
|
||||
f"Model path: {args.model_path}",
|
||||
f"Evaluation output path: {args.evaluation_output}",
|
||||
]
|
||||
|
||||
for line in lines:
|
||||
print(line)
|
||||
|
||||
main(args)
|
||||
|
||||
mlflow.end_run()
|
||||
|
|
|
@ -1,20 +1,18 @@
|
|||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License.
|
||||
"""
|
||||
Trains ML model using training dataset. Saves trained model.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
|
||||
from pathlib import Path
|
||||
import os
|
||||
import pickle
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from matplotlib import pyplot as plt
|
||||
|
||||
from sklearn.pipeline import Pipeline
|
||||
from sklearn.ensemble import RandomForestRegressor
|
||||
from sklearn.impute import SimpleImputer
|
||||
from sklearn.preprocessing import StandardScaler, MinMaxScaler
|
||||
from sklearn.preprocessing import OneHotEncoder
|
||||
from sklearn.compose import ColumnTransformer
|
||||
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
|
||||
|
||||
import mlflow
|
||||
|
@ -53,22 +51,23 @@ CAT_ORD_COLS = [
|
|||
|
||||
|
||||
def parse_args():
|
||||
'''Parse input arguments'''
|
||||
|
||||
parser = argparse.ArgumentParser("train")
|
||||
parser.add_argument("--prepared_data", type=str, help="Path to training data")
|
||||
parser.add_argument("--train_data", type=str, help="Path to train dataset")
|
||||
parser.add_argument("--model_output", type=str, help="Path of output model")
|
||||
|
||||
# classifier specific arguments
|
||||
parser.add_argument('--regressor__n_estimators', type=int, default=500,
|
||||
help='Number of trees')
|
||||
parser.add_argument('--regressor__bootstrap', type=int, default=1,
|
||||
help='Method of selecting samples for training each tree')
|
||||
help='Method of selecting samples for training each tree')
|
||||
parser.add_argument('--regressor__max_depth', type=int, default=10,
|
||||
help=' Maximum number of levels in tree')
|
||||
parser.add_argument('--regressor__max_features', type=str, default='auto',
|
||||
help='Number of features to consider at every split')
|
||||
help='Number of features to consider at every split')
|
||||
parser.add_argument('--regressor__min_samples_leaf', type=int, default=4,
|
||||
help='Minimum number of samples required at each leaf node')
|
||||
help='Minimum number of samples required at each leaf node')
|
||||
parser.add_argument('--regressor__min_samples_split', type=int, default=5,
|
||||
help='Minimum number of samples required to split a node')
|
||||
|
||||
|
@ -76,73 +75,17 @@ def parse_args():
|
|||
|
||||
return args
|
||||
|
||||
def main():
|
||||
def main(args):
|
||||
'''Read train dataset, train model, save trained model'''
|
||||
|
||||
args = parse_args()
|
||||
|
||||
lines = [
|
||||
f"Training data path: {args.prepared_data}",
|
||||
f"Model output path: {args.model_output}",
|
||||
]
|
||||
|
||||
for line in lines:
|
||||
print(line)
|
||||
|
||||
print("mounted_path files: ")
|
||||
arr = os.listdir(args.prepared_data)
|
||||
print(arr)
|
||||
|
||||
train_data = pd.read_csv((Path(args.prepared_data) / "train.csv"))
|
||||
# Read train data
|
||||
train_data = pd.read_parquet(Path(args.train_data))
|
||||
|
||||
# Split the data into input(X) and output(y)
|
||||
y_train = train_data[TARGET_COL]
|
||||
X_train = train_data[NUMERIC_COLS + CAT_NOM_COLS + CAT_ORD_COLS]
|
||||
|
||||
# Train a Linear Regression Model with the train set
|
||||
|
||||
# numerical features
|
||||
numeric_transformer = Pipeline(steps=[
|
||||
('standardscaler', StandardScaler())])
|
||||
|
||||
# ordinal features transformer
|
||||
ordinal_transformer = Pipeline(steps=[
|
||||
('imputer', SimpleImputer(missing_values=np.nan, strategy="most_frequent")),
|
||||
('minmaxscaler', MinMaxScaler())
|
||||
])
|
||||
|
||||
# nominal features transformer
|
||||
nominal_transformer = Pipeline(steps=[
|
||||
('imputer', SimpleImputer(missing_values=np.nan, strategy="most_frequent")),
|
||||
('onehot', OneHotEncoder(sparse=False))
|
||||
])
|
||||
|
||||
# imputer only for all other features
|
||||
imputer_transformer = Pipeline(steps=[
|
||||
('imputer', SimpleImputer(missing_values=np.nan, strategy="most_frequent"))
|
||||
])
|
||||
|
||||
# preprocessing pipeline
|
||||
preprocessor = ColumnTransformer(
|
||||
transformers=[
|
||||
('numeric', numeric_transformer, NUMERIC_COLS),
|
||||
#('ordinal', ordinal_transformer, CAT_ORD_COLS),
|
||||
('nominal', nominal_transformer, CAT_NOM_COLS)], # other features are already binary
|
||||
remainder="drop")
|
||||
|
||||
# append regressor to preprocessing pipeline.
|
||||
# now we have a full prediction pipeline.
|
||||
|
||||
#model = Pipeline(steps=[('preprocessor', preprocessor),
|
||||
# ('regressor', RandomForestRegressor(
|
||||
# n_estimators = args.regressor__n_estimators,
|
||||
# bootstrap = args.regressor__bootstrap,
|
||||
# max_depth = args.regressor__max_depth,
|
||||
# max_features = args.regressor__max_features,
|
||||
# min_samples_leaf = args.regressor__min_samples_leaf,
|
||||
# min_samples_split = args.regressor__min_samples_split,
|
||||
# random_state=0))])
|
||||
|
||||
|
||||
# Train a Random Forest Regression Model with the training set
|
||||
model = RandomForestRegressor(n_estimators = args.regressor__n_estimators,
|
||||
bootstrap = args.regressor__bootstrap,
|
||||
max_depth = args.regressor__max_depth,
|
||||
|
@ -151,6 +94,7 @@ def main():
|
|||
min_samples_split = args.regressor__min_samples_split,
|
||||
random_state=0)
|
||||
|
||||
# log model hyperparameters
|
||||
mlflow.log_param("model", "RandomForestRegressor")
|
||||
mlflow.log_param("n_estimators", args.regressor__n_estimators)
|
||||
mlflow.log_param("bootstrap", args.regressor__bootstrap)
|
||||
|
@ -159,6 +103,7 @@ def main():
|
|||
mlflow.log_param("min_samples_leaf", args.regressor__min_samples_leaf)
|
||||
mlflow.log_param("min_samples_split", args.regressor__min_samples_split)
|
||||
|
||||
# Train model with the train set
|
||||
model.fit(X_train, y_train)
|
||||
|
||||
# Predict using the Regression Model
|
||||
|
@ -169,7 +114,8 @@ def main():
|
|||
mse = mean_squared_error(y_train, yhat_train)
|
||||
rmse = np.sqrt(mse)
|
||||
mae = mean_absolute_error(y_train, yhat_train)
|
||||
|
||||
|
||||
# log model performance metrics
|
||||
mlflow.log_metric("train r2", r2)
|
||||
mlflow.log_metric("train mse", mse)
|
||||
mlflow.log_metric("train rmse", rmse)
|
||||
|
@ -184,8 +130,33 @@ def main():
|
|||
mlflow.log_artifact("regression_results.png")
|
||||
|
||||
# Save the model
|
||||
pickle.dump(model, open((Path(args.model_output) / "model.pkl"), "wb"))
|
||||
mlflow.sklearn.save_model(sk_model=model, path=args.model_output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
mlflow.start_run()
|
||||
|
||||
# ---------- Parse Arguments ----------- #
|
||||
# -------------------------------------- #
|
||||
|
||||
args = parse_args()
|
||||
|
||||
lines = [
|
||||
f"Train dataset input path: {args.train_data}",
|
||||
f"Model output path: {args.model_output}",
|
||||
f"n_estimators: {args.regressor__n_estimators}",
|
||||
f"bootstrap: {args.regressor__bootstrap}",
|
||||
f"max_depth: {args.regressor__max_depth}",
|
||||
f"max_features: {args.regressor__max_features}",
|
||||
f"min_samples_leaf: {args.regressor__min_samples_leaf}",
|
||||
f"min_samples_split: {args.regressor__min_samples_split}"
|
||||
]
|
||||
|
||||
for line in lines:
|
||||
print(line)
|
||||
|
||||
main(args)
|
||||
|
||||
mlflow.end_run()
|
||||
|
|
@ -5,17 +5,19 @@ description: Training Pipeline to train a model that predicts taxi fare price
|
|||
|
||||
# <inputs_and_outputs>
|
||||
inputs:
|
||||
input: #using local data, will crate an anonymous data asset
|
||||
input: #using local data, will create an anonymous data asset
|
||||
type: uri_folder
|
||||
path: ../../../data/
|
||||
enable_monitoring: 'false'
|
||||
enable_monitoring: "true"
|
||||
table_name: 'taximonitoring'
|
||||
|
||||
outputs:
|
||||
prepared_data:
|
||||
train_data:
|
||||
val_data:
|
||||
test_data:
|
||||
trained_model:
|
||||
predictions:
|
||||
score_report:
|
||||
deploy_flag:
|
||||
evaluation_output:
|
||||
model_info_output_path:
|
||||
# </inputs_and_outputs>
|
||||
|
||||
# <jobs>
|
||||
|
@ -32,14 +34,20 @@ jobs:
|
|||
command: >-
|
||||
python prep.py
|
||||
--raw_data ${{inputs.raw_data}}
|
||||
--prepared_data ${{outputs.prepared_data}}
|
||||
--train_data ${{outputs.train_data}}
|
||||
--val_data ${{outputs.val_data}}
|
||||
--test_data ${{outputs.test_data}}
|
||||
--enable_monitoring ${{inputs.enable_monitoring}}
|
||||
--table_name ${{inputs.table_name}}
|
||||
environment: azureml:taxi-train-env@latest
|
||||
inputs:
|
||||
raw_data: ${{parent.inputs.input}}
|
||||
enable_monitoring: ${{parent.inputs.enable_monitoring}}
|
||||
table_name: ${{parent.inputs.table_name}}
|
||||
outputs:
|
||||
prepared_data: ${{parent.outputs.prepared_data}}
|
||||
train_data: ${{parent.outputs.train_data}}
|
||||
val_data: ${{parent.outputs.val_data}}
|
||||
test_data: ${{parent.outputs.test_data}}
|
||||
|
||||
train_model:
|
||||
name: train_model
|
||||
|
@ -47,11 +55,11 @@ jobs:
|
|||
code: ../../../data-science/src
|
||||
command: >-
|
||||
python train.py
|
||||
--prepared_data ${{inputs.prepared_data}}
|
||||
--train_data ${{inputs.train_data}}
|
||||
--model_output ${{outputs.model_output}}
|
||||
environment: azureml:taxi-train-env@latest
|
||||
inputs:
|
||||
prepared_data: ${{parent.jobs.prep_data.outputs.prepared_data}}
|
||||
train_data: ${{parent.jobs.prep_data.outputs.train_data}}
|
||||
outputs:
|
||||
model_output: ${{parent.outputs.trained_model}}
|
||||
|
||||
|
@ -63,19 +71,15 @@ jobs:
|
|||
python evaluate.py
|
||||
--model_name ${{inputs.model_name}}
|
||||
--model_input ${{inputs.model_input}}
|
||||
--prepared_data ${{inputs.prepared_data}}
|
||||
--predictions ${{outputs.predictions}}
|
||||
--score_report ${{outputs.score_report}}
|
||||
--deploy_flag ${{outputs.deploy_flag}}
|
||||
--test_data ${{inputs.test_data}}
|
||||
--evaluation_output ${{outputs.evaluation_output}}
|
||||
environment: azureml:taxi-train-env@latest
|
||||
inputs:
|
||||
model_name: "taxi-model"
|
||||
model_input: ${{parent.jobs.train_model.outputs.model_output}}
|
||||
prepared_data: ${{parent.jobs.prep_data.outputs.prepared_data}}
|
||||
test_data: ${{parent.jobs.prep_data.outputs.test_data}}
|
||||
outputs:
|
||||
predictions: ${{parent.outputs.predictions}}
|
||||
score_report: ${{parent.outputs.score_report}}
|
||||
deploy_flag: ${{parent.outputs.deploy_flag}}
|
||||
evaluation_output: ${{parent.outputs.evaluation_output}}
|
||||
|
||||
register_model:
|
||||
name: register_model
|
||||
|
@ -85,11 +89,13 @@ jobs:
|
|||
python register.py
|
||||
--model_name ${{inputs.model_name}}
|
||||
--model_path ${{inputs.model_path}}
|
||||
--deploy_flag ${{inputs.deploy_flag}}
|
||||
--evaluation_output ${{inputs.evaluation_output}}
|
||||
--model_info_output_path ${{outputs.model_info_output_path}}
|
||||
environment: azureml:taxi-train-env@latest
|
||||
inputs:
|
||||
model_name: "taxi-model"
|
||||
model_path: ${{parent.jobs.train_model.outputs.model_output}}
|
||||
deploy_flag: ${{parent.jobs.evaluate_model.outputs.deploy_flag}}
|
||||
|
||||
# </jobs>
|
||||
evaluation_output: ${{parent.jobs.evaluate_model.outputs.evaluation_output}}
|
||||
outputs:
|
||||
model_info_output_path: ${{parent.outputs.model_info_output_path}}
|
||||
# </jobs>
|
|
@ -32,6 +32,7 @@ stages:
|
|||
displayName: Deploy Training Pipeline
|
||||
jobs:
|
||||
- job: DeployTrainingPipeline
|
||||
timeoutInMinutes: 120 # how long to run the job before automatically cancelling
|
||||
steps:
|
||||
- checkout: self
|
||||
path: s/
|
||||
|
@ -51,4 +52,4 @@ stages:
|
|||
pipeline_file: mlops/azureml/train/pipeline.yml
|
||||
experiment_name: $(environment)_taxi_fare_train_$(Build.SourceBranchName)
|
||||
display_name: $(environment)_taxi_fare_run_$(Build.BuildID)
|
||||
enable_monitoring: $(enable_monitoring)
|
||||
enable_monitoring: $(enable_monitoring)
|
|
@ -43,6 +43,8 @@ stages:
|
|||
path: s/
|
||||
- checkout: mlops-templates
|
||||
path: s/templates/
|
||||
- checkout: rai-vnext-preview
|
||||
path: s/
|
||||
- template: templates/tests/unit-tests.yml@mlops-templates
|
||||
- template: templates/${{ variables.version }}/install-az-cli.yml@mlops-templates
|
||||
- template: templates/${{ variables.version }}/install-aml-cli.yml@mlops-templates
|
||||
|
@ -53,13 +55,7 @@ stages:
|
|||
environment_name: taxi-train-env
|
||||
environment_file: mlops/azureml/train/train-env.yml
|
||||
enable_monitoring: $(enable_monitoring)
|
||||
- checkout: rai-vnext-preview
|
||||
path: s/
|
||||
- template: register-rai-components.yml
|
||||
- checkout: self
|
||||
path: s/
|
||||
- checkout: mlops-templates
|
||||
path: s/templates/
|
||||
- template: templates/${{ variables.version }}/run-pipeline.yml@mlops-templates
|
||||
parameters:
|
||||
pipeline_file: mlops/azureml/train/pipeline.yml
|
||||
|
|
Загрузка…
Ссылка в новой задаче