Merge pull request #107 from Azure/jfomhover/nlpsdk

NLP SDK implementation
This commit is contained in:
Cindy Weng 2022-12-08 11:42:33 +00:00 коммит произвёл GitHub
Родитель 3e1c300d10 d313155dce
Коммит fdc8462212
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
13 изменённых файлов: 1177 добавлений и 1 удалений

Просмотреть файл

@ -16,3 +16,13 @@ psutil==5.9.0
# for unit testing
pytest==7.1.2
# for azure ml SDK v2
azure-ai-ml==1.1.0
azure-common==1.1.28
azure-core==1.26.1
azure-identity==1.10.0
azure-mgmt-core==1.3.0
azure-storage-blob==12.14.1
azure-storage-file-datalake==12.9.1
azure-storage-file-share==12.7.0

Просмотреть файл

@ -0,0 +1,24 @@
name: nlp_inference_conda_env
channels:
- pytorch
- anaconda
- defaults
- conda-forge
dependencies:
- python=3.8
- pip=21.2.4
- pytorch=1.10.0
- torchvision=0.11.1
- torchaudio=0.10.0
- cudatoolkit=11.1.1
- nvidia-apex=0.1.0
- gxx_linux-64=8.5.0
- pip:
- azureml-defaults==1.39.0
- azureml-mlflow==1.39.0
- azureml-telemetry==1.39.0
- azureml-train-core==1.39.0
- mlflow==1.24.0
- transformers==4.17.0
- 'inference-schema[numpy-support]==1.3.0'
- applicationinsights==0.11.10

Просмотреть файл

@ -0,0 +1,7 @@
# check release notes https://docs.nvidia.com/deeplearning/frameworks/pytorch-release-notes/index.html
FROM nvcr.io/nvidia/pytorch:22.04-py3
# Install dependencies missing in this container
# NOTE: container already has matplotlib==3.5.1 tqdm==4.62.0
COPY requirements.txt ./
RUN pip install -r requirements.txt

Просмотреть файл

@ -0,0 +1,21 @@
# data science requirements
# torchvision==0.12.0
# torch==1.11.0
pytorch_lightning==1.6.4
transformers==4.18.0
datasets==2.3.2
rouge_score==0.0.4
sentencepiece==0.1.96
# for metrics reporting/plotting
mlflow==1.25.1
azureml-mlflow==1.41.0
# matplotlib==3.5.2
# tqdm==4.64.0
psutil==5.9.0
# for unit testing
pytest==7.1.2
# for azure ml SDK v2
azure-ai-ml==1.1.0

Просмотреть файл

@ -0,0 +1,88 @@
import os
import argparse
import logging
import mlflow
import json
from distutils.util import strtobool
def main():
"""Main function of the script."""
# initialize root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s : %(levelname)s : %(name)s : %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# input and output arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--baseline_metrics",
type=str,
required=True,
help="path to baseline metrics folder containing all_results.json",
)
parser.add_argument(
"--candidate_metrics",
type=str,
required=True,
help="path to candidate metrics folder containing all_results.json",
)
parser.add_argument(
"--reference_metric",
type=str,
default="predict_rougeLsum",
help="name of reference metric for shipping flag (default: predict_rougeLsum)",
)
parser.add_argument(
"--force_comparison", type=strtobool, default=False, help="set to True to bypass comparison and set --deploy_flag to True"
)
parser.add_argument(
"--deploy_flag", type=str, help="a deploy flag whether to deploy or not"
)
args = parser.parse_args()
# Start Logging
mlflow.start_run()
logger.info(f"Running with arguments: {args}")
# open metrics on both sides
with open(os.path.join(args.baseline_metrics, "all_results.json")) as in_file:
baseline_metrics = json.loads(in_file.read())
with open(os.path.join(args.candidate_metrics, "all_results.json")) as in_file:
candidate_metrics = json.loads(in_file.read())
# should we ship or not?
if args.force_comparison:
deploy_flag = True
else:
deploy_flag = (
candidate_metrics[args.reference_metric]
> baseline_metrics[args.reference_metric]
)
logger.info("baseline_metrics[{}]={}, candidate_metrics[{}]={}, deploy_flag={} (force_comparison={})".format(
args.reference_metric,
baseline_metrics[args.reference_metric],
args.reference_metric,
candidate_metrics[args.reference_metric],
deploy_flag,
args.force_comparison
))
# save deploy_flag as a file
os.makedirs(args.deploy_flag, exist_ok=True)
with open(os.path.join(args.deploy_flag, "deploy_flag"), "w") as out_file:
out_file.write("%d" % int(deploy_flag))
# Stop Logging
mlflow.end_run()
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,151 @@
"""
This scripts prepares a HuggingFace dataset to be used
for fine-tuning. It encodes the train/val/test tests and
outputs as JSONL files.
"""
import os
import argparse
import logging
from datasets import load_dataset, DatasetDict
from transformers import AutoTokenizer
import mlflow
def main():
"""Main function of the script."""
# initialize root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s : %(levelname)s : %(name)s : %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# input and output arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset_name", type=str, help="name of dataset or path to input dataset_name"
)
parser.add_argument(
"--dataset_config", type=str, help="config for huggingface dataset"
)
parser.add_argument("--text_column", type=str, help="name of text_column")
parser.add_argument("--summary_column", type=str, help="name of summary_column")
parser.add_argument(
"--max_input_length", type=int, default=512, help="max_input_length"
)
parser.add_argument(
"--max_target_length", type=int, default=40, help="max_target_length"
)
parser.add_argument(
"--padding", type=str, default="max_length", help="padding type"
)
parser.add_argument(
"--model_arch",
type=str,
help="name of the model to prepare for in HF model library",
)
parser.add_argument(
"--limit_samples", type=int, default=-1, help="sample size from input dataset"
)
parser.add_argument("--encodings", type=str, help="path to tokenized dataset")
parser.add_argument(
"--source_prefix",
type=str,
help="A prefix to add before every source text (useful for T5 models).",
)
args = parser.parse_args()
# Start Logging
mlflow.start_run()
logger.info(f"Running with arguments: {args}")
# get tokenizer ready
tokenizer = AutoTokenizer.from_pretrained(args.model_arch)
logger.info(f"tokenizer: {tokenizer}")
prefix = args.source_prefix if args.source_prefix is not None else ""
if args.source_prefix is None and "t5" in args.model_arch.lower():
logger.warning(
"You're running a t5 model but didn't provide a source prefix, which is the expected, e.g. with "
"`--source_prefix 'summarize: ' `"
)
# Load HuggingFace dataset
raw_dataset = load_dataset(args.dataset_name, args.dataset_config)
logger.info(f"raw dataset length: {raw_dataset.num_rows}")
mlflow.log_metric("train_samples", raw_dataset["train"].shape[0])
mlflow.log_metric("test_samples", raw_dataset["test"].shape[0])
mlflow.log_metric("validation_samples", raw_dataset["validation"].shape[0])
if args.limit_samples > 0:
sample_sizes = {
k: min(len(raw_dataset[k]), args.limit_samples) for k in raw_dataset.keys()
}
raw_dataset = DatasetDict(
{
k: raw_dataset[k].select([i for i in range(sample_sizes[k])])
for k in raw_dataset.keys()
}
)
logger.info("sampled raw dataset:")
logger.info(raw_dataset.num_rows)
def preprocess_function(examples):
# remove pairs where at least one record is None
inputs, targets = [], []
for i in range(len(examples[args.text_column])):
if (
examples[args.text_column][i] is not None
and examples[args.summary_column][i] is not None
):
inputs.append(examples[args.text_column][i])
targets.append(examples[args.summary_column][i])
inputs = [prefix + inp for inp in inputs]
model_inputs = tokenizer(
inputs,
max_length=args.max_input_length,
padding=args.padding,
truncation=True,
)
# Set up the tokenizer for targets
with tokenizer.as_target_tokenizer():
labels = tokenizer(
examples[args.summary_column],
max_length=args.max_target_length,
padding=args.padding,
truncation=True,
)
# replace all tokenizer.pad_token_id in the labels by -100 to ignore padding in the loss.
if args.padding == "max_length":
labels["input_ids"] = [
[(l if l != tokenizer.pad_token_id else -100) for l in label]
for label in labels["input_ids"]
]
model_inputs["labels"] = labels["input_ids"]
return model_inputs
preprocessed_datasets = raw_dataset.map(preprocess_function, batched=True)
logger.info(f"preprocessed_datasets: {preprocessed_datasets}")
output_path = os.path.join(args.encodings)
os.makedirs(output_path, exist_ok=True)
preprocessed_datasets.save_to_disk(output_path)
logger.info(f"tokenized data is saved to {output_path}")
# Stop Logging
mlflow.end_run()
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,82 @@
from azureml.core import Run
from azureml.core.model import Model
import os
import argparse
import logging
import mlflow
def main():
"""Main function of the script."""
# initialize root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s : %(levelname)s : %(name)s : %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# input and output arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--model_folder",
type=str,
required=True,
help="folder containing model",
)
parser.add_argument(
"--register_as",
type=str,
required=True,
help="name to use for model registration in AzureML",
)
parser.add_argument(
"--deploy_flag", type=str, required=True, help="a deploy flag whether to deploy or not"
)
args = parser.parse_args()
logger.info(f"Running with arguments: {args}")
# Start Logging
mlflow.start_run()
if os.path.isfile(args.deploy_flag):
deploy_flag_file_path = args.deploy_flag
else:
deploy_flag_file_path = os.path.join(args.deploy_flag, "deploy_flag")
logger.info(f"Opening deploy_flag file from {deploy_flag_file_path}")
with open(deploy_flag_file_path, 'rb') as in_file:
deploy_flag = bool(int(in_file.read()))
if deploy_flag:
logger.info(f"Deploy flag is True, registering model as {args.register_as}...")
run = Run.get_context()
# if we're running locally, except
if run.__class__.__name__ == "_OfflineRun":
raise Exception("You can't run this script locally, you will need to run it as an AzureML job.")
_ = Model.register(
run.experiment.workspace,
model_name=args.register_as,
model_path=args.model_folder,
tags={
"type": "huggingface",
"task": "summarization"
},
description="Huggingface model finetuned for summarization",
)
else:
logger.info(f"Deploy flag is False, pass.")
# Stop Logging
mlflow.end_run()
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,269 @@
import logging
import os
from datasets import load_metric, load_from_disk
from transformers import (
AutoModelForSeq2SeqLM,
AutoTokenizer,
DataCollatorForSeq2Seq,
Seq2SeqTrainingArguments,
Seq2SeqTrainer,
HfArgumentParser,
IntervalStrategy,
)
from transformers.trainer_callback import TrainerCallback
import torch
import nltk
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
import mlflow
from pynvml import *
import time
# Input arguments are set with dataclass. Huggingface library stores the default training args in TrainingArguments dataclass
# user args are also defined in dataclasses, we will then load arguments from a tuple of user defined and built-in dataclasses.
@dataclass
class DataArgs:
# Inputs
preprocessed_datasets: str = field(
default=None, metadata={"help": "path to preprocesed datasets"}
)
# Processing parameters
max_target_length: Optional[int] = field(
default=128,
metadata={"help": "maxi sequence length for target text after tokenization."},
)
limit_samples: Optional[int] = field(
default=-1,
metadata={"help": "limit the number of samples for faster run."},
)
@dataclass
class ModelArgs:
model_name: Optional[str] = field(default=None, metadata={"help": "model name"})
model_path: Optional[str] = field(
default=None, metadata={"help": "path to existing model file to load"}
)
model_output: Optional[str] = field(
default=None, metadata={"help": "path to save the model"}
)
nltk.download("punkt")
def print_gpu_utilization():
nvmlInit()
handle = nvmlDeviceGetHandleByIndex(0)
info = nvmlDeviceGetMemoryInfo(handle)
print(f"GPU memory occupied: {info.used//1024**2} MB.")
def print_summary(result):
print(f"Time: {result.metrics['train_runtime']:.2f}")
print(f"Samples/second: {result.metrics['train_samples_per_second']:.2f}")
print_gpu_utilization()
def postprocess_text(preds, labels):
"""Postprocess output for computing metrics"""
preds = [pred.strip() for pred in preds]
labels = [label.strip() for label in labels]
# rougeLSum expects newline after each sentence
preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in preds]
labels = ["\n".join(nltk.sent_tokenize(label)) for label in labels]
return preds, labels
def compute_metrics(eval_preds, tokenizer, metric):
"""Compute metric based on predictions from evaluation"""
preds, labels = eval_preds
if isinstance(preds, tuple):
preds = preds[0]
decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
# Replace -100 in the labels as we can't decode them.
labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
# Some simple post-processing
decoded_preds, decoded_labels = postprocess_text(decoded_preds, decoded_labels)
result = metric.compute(
predictions=decoded_preds, references=decoded_labels, use_stemmer=True
)
# Extract a few results from ROUGE
result = {key: value.mid.fmeasure * 100 for key, value in result.items()}
prediction_lens = [
np.count_nonzero(pred != tokenizer.pad_token_id) for pred in preds
]
result["gen_len"] = np.mean(prediction_lens)
result = {k: round(v, 4) for k, v in result.items()}
return result
class CustomCallback(TrainerCallback):
"""A [`TrainerCallback`] that sends the logs to [AzureML](https://pypi.org/project/azureml-sdk/).
This is a hotfix for the issue raised here:
https://github.com/huggingface/transformers/issues/18870
"""
def on_log(self, args, state, control, logs=None, **kwargs):
if state.is_world_process_zero:
metrics = {}
for k, v in logs.items():
if isinstance(v, (int, float)):
metrics[k] = v
mlflow.log_metrics(metrics=metrics, step=state.global_step)
def main():
# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s : %(levelname)s : %(name)s : %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# initialize the mlflow session
mlflow.start_run()
parser = HfArgumentParser((ModelArgs, DataArgs, Seq2SeqTrainingArguments))
model_args, data_args, training_args = parser.parse_args_into_dataclasses()
logger.info(f"Running with arguments: {model_args}, {data_args}, {training_args}")
# Check if this is the main node
is_this_main_node = int(os.environ.get("RANK", "0")) == 0
if is_this_main_node:
logger.info("This is the main Node")
input_datasets = load_from_disk(data_args.preprocessed_datasets)
logger.info(f"preprocessed dataset is loaded")
if model_args.model_path:
logger.info("using a saved model")
model = AutoModelForSeq2SeqLM.from_pretrained(model_args.model_path)
tokenizer = AutoTokenizer.from_pretrained(model_args.model_path)
else:
logger.info("using a model from model library")
model = AutoModelForSeq2SeqLM.from_pretrained(model_args.model_name)
tokenizer = AutoTokenizer.from_pretrained(model_args.model_name)
# Artificially limit the number of samples (for testing)
if training_args.do_train: # if using --do-train from Seq2SeqTrainingArguments
if data_args.limit_samples > 0:
max_train_samples = min(len(input_datasets["train"]), data_args.limit_samples)
train_dataset = input_datasets["train"].select(range(max_train_samples))
logger.info(f"train: making a {max_train_samples} sample of the data")
else:
train_dataset = input_datasets["train"]
if training_args.do_eval:
if data_args.limit_samples > 0:
max_eval_samples = min(
len(input_datasets["validation"]), data_args.limit_samples
)
eval_dataset = input_datasets["validation"].select(range(max_eval_samples))
logger.info(f"eval: making a {max_eval_samples} sample of the data")
else:
eval_dataset = input_datasets["validation"]
if training_args.do_predict:
if data_args.limit_samples > 0:
max_predict_samples = min(
len(input_datasets["test"]), data_args.limit_samples
)
predict_dataset = input_datasets["test"].select(range(max_predict_samples))
logger.info(f"predict: making a {max_predict_samples} sample of the data")
else:
predict_dataset = input_datasets["test"]
# Data collator
label_pad_token_id = -100
data_collator = DataCollatorForSeq2Seq(
tokenizer,
model=model,
label_pad_token_id=label_pad_token_id,
)
# Metric
metric = load_metric("rouge")
if training_args.do_train:
logging_steps = len(train_dataset) // training_args.per_device_train_batch_size
training_args.logging_steps = logging_steps
#training_args.output_dir = "outputs"
training_args.save_strategy = "epoch"
training_args.evaluation_strategy = IntervalStrategy.EPOCH
training_args.predict_with_generate = True
training_args.report_to = [] # use our own callback
logger.info(f"training args: {training_args}")
# Initialize our Trainer
trainer = Seq2SeqTrainer(
model=model,
args=training_args,
train_dataset=train_dataset if training_args.do_train else None,
eval_dataset=eval_dataset if training_args.do_eval else None,
tokenizer=tokenizer,
data_collator=data_collator,
compute_metrics=lambda preds : compute_metrics(preds, tokenizer, metric),
callbacks=[CustomCallback]
)
# Start the actual training (to include evaluation use --do-eval)
if training_args.do_train:
logger.info("Start training")
start = time.time()
train_result = trainer.train()
mlflow.log_metric(
"time/epoch", (time.time() - start) / 60 / training_args.num_train_epochs
)
logger.info(
"training is done"
) # Only print gpu utilization if gpu is available
if torch.cuda.is_available():
print_summary(train_result)
# Save the model as an output
if model_args.model_output and is_this_main_node:
logger.info(f"Saving the model at {model_args.model_output}")
os.makedirs(model_args.model_output, exist_ok=True)
trainer.save_model(model_args.model_output)
# Just run the predictions
if training_args.do_predict:
logger.info("*** Predict ***")
max_length = (
training_args.generation_max_length
if training_args.generation_max_length is not None
else data_args.max_target_length
)
predict_results = trainer.predict(
predict_dataset, metric_key_prefix="predict", max_length=max_length
)
metrics = predict_results.metrics
metrics["predict_samples"] = len(predict_dataset)
trainer.log_metrics("predict", metrics)
trainer.save_metrics("predict", metrics)
# Stop Logging
mlflow.end_run()
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,44 @@
import os
import logging
import json
from transformers import (
AutoModelForSeq2SeqLM,
AutoTokenizer,
)
def init():
"""
This function is called when the container is initialized/started, typically after create/update of the deployment.
You can write the logic here to perform init operations like caching the model in memory
"""
global model, tokenizer
# AZUREML_MODEL_DIR is an environment variable created during deployment.
# It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)
model_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), os.listdir(os.getenv("AZUREML_MODEL_DIR"))[0])
print("model_path")
print(os.listdir(model_path))
model = AutoModelForSeq2SeqLM.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
print("Init complete")
def run(raw_data):
global model, tokenizer
"""
This function is called for every invocation of the endpoint to perform the actual scoring/prediction.
In the example we extract the data from the json input and call the scikit-learn model's predict()
method and return the result back
"""
logging.info("Request received")
article = json.loads(raw_data)["data"]
if "t5" in model.config.architectures[0].lower():
article= "summarize:" + article
inputs = tokenizer(article, return_tensors="pt", max_length=512, truncation=True)
outputs = model.generate(
inputs["input_ids"], max_length=150, min_length=40, length_penalty=2.0, num_beams=4, early_stopping=True
)
result = tokenizer.decode(outputs[0])
print(result)
logging.info("Request processed")
return result

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -0,0 +1,405 @@
"""MLOps v2 NLP Python SDK training submission script."""
import os
import argparse
# Azure ML sdk v2 imports
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient
from azure.ai.ml import command
from azure.ai.ml import Input, Output
from azure.ai.ml import dsl, Input, Output
def get_config_parger(parser: argparse.ArgumentParser = None):
"""Builds the argument parser for the script."""
if parser is None:
parser = argparse.ArgumentParser(description=__doc__)
group = parser.add_argument_group("Azure ML references")
group.add_argument(
"--config_location",
type=str,
required=False,
help="Subscription ID",
)
group.add_argument(
"--subscription_id",
type=str,
required=False,
help="Subscription ID",
)
group.add_argument(
"--resource_group",
type=str,
required=False,
help="Resource group name",
)
group.add_argument(
"--workspace_name",
type=str,
required=False,
help="Workspace name",
)
# Experiment Name
group.add_argument(
"-n",
type=str,
required=True,
default="nlp_summarization_train",
help="Experiment name",
)
parser.add_argument(
"--wait",
default=False,
action="store_true",
help="wait for the job to finish",
)
group = parser.add_argument_group("Training parameters")
group.add_argument(
"--limit_samples",
type=int,
default=1000,
)
group.add_argument(
"--pretrained_model_name",
type=str,
default="t5-small",
)
group.add_argument(
"--num_train_epochs",
type=int,
default=5,
)
group.add_argument(
"--batch_size",
type=int,
default=8,
)
group.add_argument(
"--learning_rate",
type=float,
default=0.00005,
)
group.add_argument(
"--model_registration_name",
type=str,
default="pubmed-summarization",
)
group = parser.add_argument_group("Compute parameters")
group.add_argument(
"--cpu_compute",
type=str,
default="cpu-cluster",
)
group.add_argument(
"--cpu_compute_large",
type=str,
default="cpu-cluster-lg",
)
group.add_argument(
"--gpu_compute",
type=str,
default="gpu-cluster",
)
group.add_argument(
"--training_nodes",
type=int,
default=1,
)
group.add_argument(
"--gpus_per_node",
type=int,
default=1,
)
return parser
def connect_to_aml(args):
"""Connect to Azure ML workspace using provided cli arguments."""
try:
credential = DefaultAzureCredential()
# Check if given credential can get token successfully.
credential.get_token("https://management.azure.com/.default")
except Exception as ex:
# Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
credential = InteractiveBrowserCredential()
# Get a handle to workspace
try:
# ml_client to connect using local config.json
ml_client = MLClient.from_config(credential, path='config.json')
except Exception as ex:
print(
"Could not find config.json, using config.yaml refs to Azure ML workspace instead."
)
# tries to connect using cli args if provided else using config.yaml
ml_client = MLClient(
subscription_id=args.subscription_id,
resource_group_name=args.resource_group,
workspace_name=args.workspace_name,
credential=credential,
)
return ml_client
def build_components(args):
"""Builds the components for the pipeline."""
DATA_SCIENCE_FOLDER = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..","..", "..", "data-science", "src"
)
prep_finetuning_dataset = command(
name="prep_finetuning_dataset",
display_name="Prepare dataset for training",
inputs={
"dataset_name": Input(type="string"),
"dataset_config": Input(type="string"),
"text_column": Input(type="string"),
"summary_column": Input(type="string"),
"limit_samples": Input(type="integer"),
"max_input_length": Input(type="integer"),
"max_target_length": Input(type="integer"),
"padding": Input(type="string"),
"pretrained_model_name": Input(type="string"),
},
outputs=dict(
encodings=Output(type="uri_folder", mode="rw_mount"),
),
code=DATA_SCIENCE_FOLDER,
command="""python summarization/prepare.py \
--dataset_name ${{inputs.dataset_name}} \
--dataset_config ${{inputs.dataset_config}} \
--text_column ${{inputs.text_column}} \
--summary_column ${{inputs.summary_column}} \
--limit_samples ${{inputs.limit_samples}} \
--model_arch ${{inputs.pretrained_model_name}} \
--max_input_length ${{inputs.max_input_length}} \
--max_target_length ${{inputs.max_target_length}} \
--padding ${{inputs.padding}} \
--encodings ${{outputs.encodings}}\
""",
environment="nlp_summarization_train@latest",
)
finetune_model = command(
name="finetune_model",
display_name="Fine-tune summarization model",
inputs={
"preprocessed_datasets": Input(type="uri_folder"),
"pretrained_model_name": Input(type="string"),
"limit_samples": Input(type="integer"),
"learning_rate": Input(type="number"),
"num_train_epochs": Input(type="integer"),
"per_device_train_batch_size": Input(type="integer"),
"per_device_eval_batch_size": Input(type="integer"),
},
outputs=dict(
finetuned_model=Output(type="uri_folder", mode="rw_mount"),
),
code=DATA_SCIENCE_FOLDER,
command="""python summarization/run.py \
--preprocessed_datasets ${{inputs.preprocessed_datasets}} \
--learning_rate ${{inputs.learning_rate}} \
--per_device_train_batch_size ${{inputs.per_device_train_batch_size}} \
--per_device_eval_batch_size ${{inputs.per_device_eval_batch_size}} \
--limit_samples ${{inputs.limit_samples}} \
--model_name ${{inputs.pretrained_model_name}} \
--model_output ${{outputs.finetuned_model}}\
--output_dir outputs \
--num_train_epochs ${{inputs.num_train_epochs}} \
--do_train --do_eval \
""",
environment="nlp_summarization_train@latest",
distribution={
"type": "PyTorch",
# set process count to the number of gpus on the node
"process_count_per_instance": args.gpus_per_node,
},
# set instance count to the number of nodes you want to use
instance_count=args.training_nodes,
)
evaluate_model = command(
name="evaluate_model",
display_name="Run eval on a model",
inputs={
"preprocessed_datasets": Input(type="uri_folder"),
"model_path": Input(type="uri_folder", optional=True),
"model_name": Input(type="string", optional=True),
"limit_samples": Input(type="integer"),
"max_target_length": Input(type="integer"),
},
outputs=dict(
metrics=Output(type="uri_folder", mode="rw_mount"),
),
code=DATA_SCIENCE_FOLDER,
command="""python summarization/run.py \
--preprocessed_datasets ${{inputs.preprocessed_datasets}} \
--limit_samples ${{inputs.limit_samples}} \
--output_dir ${{outputs.metrics}} \
$[[--model_path ${{inputs.model_path}}]] \
$[[--model_name ${{inputs.model_name}}]] \
--max_target_length ${{inputs.max_target_length}} \
--do_predict \
""",
environment="nlp_summarization_train@latest",
)
compare_models = command(
name="compare_models",
display_name="Compare finetuned to baseline",
inputs={
"baseline_metrics": Input(type="uri_folder"),
"candidate_metrics": Input(type="uri_folder"),
"reference_metric": Input(type="string"),
},
outputs=dict(
deploy_flag=Output(type="uri_folder", mode="rw_mount"),
),
code=DATA_SCIENCE_FOLDER,
command="""python summarization/compare.py \
--baseline_metrics ${{inputs.baseline_metrics}} \
--candidate_metrics ${{inputs.candidate_metrics}} \
--reference_metric ${{inputs.reference_metric}} \
--deploy_flag ${{outputs.deploy_flag}} \
--force_comparison True\
""",
environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest",
)
register_model = command(
name="register_model",
display_name="Register model",
inputs={
"model": Input(type="uri_folder"),
"deploy_flag": Input(type="uri_folder"),
"model_registration_name": Input(type="string"),
},
code=DATA_SCIENCE_FOLDER,
command="""python summarization/register.py \
--model_folder ${{inputs.model}} \
--deploy_flag ${{inputs.deploy_flag}} \
--register_as ${{inputs.model_registration_name}} \
""",
environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest",
)
return {
"prep_finetuning_dataset": prep_finetuning_dataset,
"finetune_model": finetune_model,
"evaluate_model": evaluate_model,
"compare_models": compare_models,
"register_model": register_model,
}
def main():
"""Main entry point for the script."""
parser = get_config_parger()
args, _ = parser.parse_known_args()
ml_client = connect_to_aml(args)
# get components from build function
components_dict = build_components(args)
prep_finetuning_dataset = components_dict["prep_finetuning_dataset"]
finetune_model = components_dict["finetune_model"]
evaluate_model = components_dict["evaluate_model"]
compare_models = components_dict["compare_models"]
register_model = components_dict["register_model"]
# build the pipeline using Azure ML SDK v2
@dsl.pipeline(
name="NLP Training Pipeline",
description="NLP Training Pipeline",
)
def nlp_training_pipeline(
limit_samples: int,
pretrained_model_name: str,
num_train_epochs: int,
batch_size: int,
learning_rate: float,
model_registration_name: str,
):
prep_finetuning_dataset_step = prep_finetuning_dataset(
dataset_name="ccdv/pubmed-summarization",
dataset_config="section",
text_column="article",
summary_column="abstract",
limit_samples=limit_samples,
max_input_length=512,
max_target_length=40,
padding="max_length",
pretrained_model_name=pretrained_model_name,
)
prep_finetuning_dataset_step.compute = args.cpu_compute_large
finetune_model_step = finetune_model(
preprocessed_datasets=prep_finetuning_dataset_step.outputs.encodings,
pretrained_model_name=pretrained_model_name,
limit_samples=limit_samples,
learning_rate=learning_rate,
num_train_epochs=num_train_epochs,
per_device_train_batch_size=batch_size,
per_device_eval_batch_size=batch_size,
)
finetune_model_step.compute = args.gpu_compute
evaluate_finetuned_model_step = evaluate_model(
preprocessed_datasets=prep_finetuning_dataset_step.outputs.encodings,
model_path=finetune_model_step.outputs.finetuned_model,
limit_samples=limit_samples,
max_target_length=40,
)
evaluate_finetuned_model_step.compute = args.gpu_compute
evaluate_baseline_model_step = evaluate_model(
preprocessed_datasets=prep_finetuning_dataset_step.outputs.encodings,
model_name=pretrained_model_name,
limit_samples=limit_samples,
max_target_length=40,
)
evaluate_baseline_model_step.compute = args.gpu_compute
compare_models_step = compare_models(
baseline_metrics=evaluate_finetuned_model_step.outputs.metrics,
candidate_metrics=evaluate_baseline_model_step.outputs.metrics,
reference_metric="predict_rougeLsum",
)
compare_models_step.compute = args.cpu_compute
register_model_step = register_model(
model=finetune_model_step.outputs.finetuned_model,
deploy_flag=compare_models_step.outputs.deploy_flag,
model_registration_name=model_registration_name,
)
register_model_step.compute = args.cpu_compute
# instanciates the job
pipeline_job = nlp_training_pipeline(
limit_samples=args.limit_samples,
pretrained_model_name=args.pretrained_model_name,
num_train_epochs=args.num_train_epochs,
batch_size=args.batch_size,
learning_rate=args.learning_rate,
model_registration_name=args.model_registration_name,
)
# submits the job
print("Submitting the pipeline job to your AzureML workspace...")
pipeline_job = ml_client.jobs.create_or_update(
pipeline_job, experiment_name=args.n
)
print("The url to see your live job running is returned by the sdk:")
print(pipeline_job.services["Studio"].endpoint)
if args.wait:
ml_client.jobs.stream(pipeline_job.name)
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,73 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
variables:
- ${{ if eq(variables['Build.SourceBranchName'], 'main') }}:
# 'main' branch: PRD environment
- template: ../../config-infra-prod.yml
- ${{ if ne(variables['Build.SourceBranchName'], 'main') }}:
# 'develop' or feature branches: DEV environment
- template: ../../config-infra-dev.yml
- name: version
value: python-sdk-v2
trigger:
- none
pool:
vmImage: ubuntu-20.04
resources:
repositories:
- repository: mlops-templates # Template Repo
name: Azure/mlops-templates # need to change org name from "Azure" to your own org
endpoint: github-connection # need to set up and hardcode
type: github
ref: main-dec31
stages:
- stage: DeployTrainingPipeline
displayName: Deploy Training Pipeline
jobs:
- job: DeployTrainingPipeline
steps:
- checkout: self
path: s/
- checkout: mlops-templates
path: s/templates/
- template: templates/aml-cli-v2/install-az-cli.yml@mlops-templates
- template: templates/aml-cli-v2/install-aml-cli.yml@mlops-templates
- template: templates/aml-cli-v2/install-requirements.yml@mlops-templates
- template: templates/aml-cli-v2/connect-to-workspace.yml@mlops-templates
- template: templates/${{ variables.version }}/create-compute.yml@mlops-templates
parameters:
cluster_name: cpu-cluster
size: STANDARD_DS3_V2
min_instances: 0
max_instances: 1
cluster_tier: dedicated
- template: templates/${{ variables.version }}/create-compute.yml@mlops-templates
parameters:
cluster_name: cpu-cluster-lg
size: Standard_D14_v2
min_instances: 0
max_instances: 1
cluster_tier: dedicated
- template: templates/${{ variables.version }}/create-compute.yml@mlops-templates
parameters:
cluster_name: gpu-cluster
size: Standard_NV6
min_instances: 0
max_instances: 1
cluster_tier: dedicated
- template: templates/${{ variables.version }}/register-environment.yml@mlops-templates
parameters:
build_type: docker
environment_name: nlp_summarization_train
environment_path: data-science/environments/training
- template: templates/${{ variables.version }}/run-pipeline.yml@mlops-templates
parameters:
pipeline_path: mlops/azureml/train/pipeline-train.py
experiment_name: $(environment)_nlp_summarization_$(Build.SourceBranchName)

Просмотреть файл

@ -1 +0,0 @@
# placeholder for sdk-v2 implementation of nlp