This commit is contained in:
Jeff Omhover 2022-11-07 08:36:00 -08:00
Родитель bfb77dd965
Коммит adbce63fe5
9 изменённых файлов: 686 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,24 @@
name: nlp_inference_conda_env
channels:
- pytorch
- anaconda
- defaults
- conda-forge
dependencies:
- python=3.8
- pip=21.2.4
- pytorch=1.10.0
- torchvision=0.11.1
- torchaudio=0.10.0
- cudatoolkit=11.1.1
- nvidia-apex=0.1.0
- gxx_linux-64=8.5.0
- pip:
- azureml-defaults==1.39.0
- azureml-mlflow==1.39.0
- azureml-telemetry==1.39.0
- azureml-train-core==1.39.0
- mlflow==1.24.0
- transformers==4.17.0
- 'inference-schema[numpy-support]==1.3.0'
- applicationinsights==0.11.10

Просмотреть файл

@ -0,0 +1,7 @@
# check release notes https://docs.nvidia.com/deeplearning/frameworks/pytorch-release-notes/index.html
FROM nvcr.io/nvidia/pytorch:22.04-py3
# Install dependencies missing in this container
# NOTE: container already has matplotlib==3.5.1 tqdm==4.62.0
COPY requirements.txt ./
RUN pip install -r requirements.txt

Просмотреть файл

@ -0,0 +1,18 @@
# data science requirements
# torchvision==0.12.0
# torch==1.11.0
pytorch_lightning==1.6.4
transformers==4.18.0
datasets==2.3.2
rouge_score==0.0.4
sentencepiece==0.1.96
# for metrics reporting/plotting
mlflow==1.25.1
azureml-mlflow==1.41.0
# matplotlib==3.5.2
# tqdm==4.64.0
psutil==5.9.0
# for unit testing
pytest==7.1.2

Просмотреть файл

@ -0,0 +1,88 @@
import os
import argparse
import logging
import mlflow
import json
from distutils.util import strtobool
def main():
"""Main function of the script."""
# initialize root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s : %(levelname)s : %(name)s : %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# input and output arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--baseline_metrics",
type=str,
required=True,
help="path to baseline metrics folder containing all_results.json",
)
parser.add_argument(
"--candidate_metrics",
type=str,
required=True,
help="path to candidate metrics folder containing all_results.json",
)
parser.add_argument(
"--reference_metric",
type=str,
default="predict_rougeLsum",
help="name of reference metric for shipping flag (default: predict_rougeLsum)",
)
parser.add_argument(
"--force_comparison", type=strtobool, default=False, help="set to True to bypass comparison and set --deploy_flag to True"
)
parser.add_argument(
"--deploy_flag", type=str, help="a deploy flag whether to deploy or not"
)
args = parser.parse_args()
# Start Logging
mlflow.start_run()
logger.info(f"Running with arguments: {args}")
# open metrics on both sides
with open(os.path.join(args.baseline_metrics, "all_results.json")) as in_file:
baseline_metrics = json.loads(in_file.read())
with open(os.path.join(args.candidate_metrics, "all_results.json")) as in_file:
candidate_metrics = json.loads(in_file.read())
# should we ship or not?
if args.force_comparison:
deploy_flag = True
else:
deploy_flag = (
candidate_metrics[args.reference_metric]
> baseline_metrics[args.reference_metric]
)
logger.info("baseline_metrics[{}]={}, candidate_metrics[{}]={}, deploy_flag={} (force_comparison={})".format(
args.reference_metric,
baseline_metrics[args.reference_metric],
args.reference_metric,
candidate_metrics[args.reference_metric],
deploy_flag,
args.force_comparison
))
# save deploy_flag as a file
os.makedirs(args.deploy_flag, exist_ok=True)
with open(os.path.join(args.deploy_flag, "deploy_flag"), "w") as out_file:
out_file.write("%d" % int(deploy_flag))
# Stop Logging
mlflow.end_run()
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,151 @@
"""
This scripts prepares a HuggingFace dataset to be used
for fine-tuning. It encodes the train/val/test tests and
outputs as JSONL files.
"""
import os
import argparse
import logging
from datasets import load_dataset, DatasetDict
from transformers import AutoTokenizer
import mlflow
def main():
"""Main function of the script."""
# initialize root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s : %(levelname)s : %(name)s : %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# input and output arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset_name", type=str, help="name of dataset or path to input dataset_name"
)
parser.add_argument(
"--dataset_config", type=str, help="config for huggingface dataset"
)
parser.add_argument("--text_column", type=str, help="name of text_column")
parser.add_argument("--summary_column", type=str, help="name of summary_column")
parser.add_argument(
"--max_input_length", type=int, default=512, help="max_input_length"
)
parser.add_argument(
"--max_target_length", type=int, default=40, help="max_target_length"
)
parser.add_argument(
"--padding", type=str, default="max_length", help="padding type"
)
parser.add_argument(
"--model_arch",
type=str,
help="name of the model to prepare for in HF model library",
)
parser.add_argument(
"--limit_samples", type=int, default=-1, help="sample size from input dataset"
)
parser.add_argument("--encodings", type=str, help="path to tokenized dataset")
parser.add_argument(
"--source_prefix",
type=str,
help="A prefix to add before every source text (useful for T5 models).",
)
args = parser.parse_args()
# Start Logging
mlflow.start_run()
logger.info(f"Running with arguments: {args}")
# get tokenizer ready
tokenizer = AutoTokenizer.from_pretrained(args.model_arch)
logger.info(f"tokenizer: {tokenizer}")
prefix = args.source_prefix if args.source_prefix is not None else ""
if args.source_prefix is None and "t5" in args.model_arch.lower():
logger.warning(
"You're running a t5 model but didn't provide a source prefix, which is the expected, e.g. with "
"`--source_prefix 'summarize: ' `"
)
# Load HuggingFace dataset
raw_dataset = load_dataset(args.dataset_name, args.dataset_config)
logger.info(f"raw dataset length: {raw_dataset.num_rows}")
mlflow.log_metric("train_samples", raw_dataset["train"].shape[0])
mlflow.log_metric("test_samples", raw_dataset["test"].shape[0])
mlflow.log_metric("validation_samples", raw_dataset["validation"].shape[0])
if args.limit_samples > 0:
sample_sizes = {
k: min(len(raw_dataset[k]), args.limit_samples) for k in raw_dataset.keys()
}
raw_dataset = DatasetDict(
{
k: raw_dataset[k].select([i for i in range(sample_sizes[k])])
for k in raw_dataset.keys()
}
)
logger.info("sampled raw dataset:")
logger.info(raw_dataset.num_rows)
def preprocess_function(examples):
# remove pairs where at least one record is None
inputs, targets = [], []
for i in range(len(examples[args.text_column])):
if (
examples[args.text_column][i] is not None
and examples[args.summary_column][i] is not None
):
inputs.append(examples[args.text_column][i])
targets.append(examples[args.summary_column][i])
inputs = [prefix + inp for inp in inputs]
model_inputs = tokenizer(
inputs,
max_length=args.max_input_length,
padding=args.padding,
truncation=True,
)
# Set up the tokenizer for targets
with tokenizer.as_target_tokenizer():
labels = tokenizer(
examples[args.summary_column],
max_length=args.max_target_length,
padding=args.padding,
truncation=True,
)
# replace all tokenizer.pad_token_id in the labels by -100 to ignore padding in the loss.
if args.padding == "max_length":
labels["input_ids"] = [
[(l if l != tokenizer.pad_token_id else -100) for l in label]
for label in labels["input_ids"]
]
model_inputs["labels"] = labels["input_ids"]
return model_inputs
preprocessed_datasets = raw_dataset.map(preprocess_function, batched=True)
logger.info(f"preprocessed_datasets: {preprocessed_datasets}")
output_path = os.path.join(args.encodings)
os.makedirs(output_path, exist_ok=True)
preprocessed_datasets.save_to_disk(output_path)
logger.info(f"tokenized data is saved to {output_path}")
# Stop Logging
mlflow.end_run()
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,82 @@
from azureml.core import Run
from azureml.core.model import Model
import os
import argparse
import logging
import mlflow
def main():
"""Main function of the script."""
# initialize root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s : %(levelname)s : %(name)s : %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# input and output arguments
parser = argparse.ArgumentParser()
parser.add_argument(
"--model_folder",
type=str,
required=True,
help="folder containing model",
)
parser.add_argument(
"--register_as",
type=str,
required=True,
help="name to use for model registration in AzureML",
)
parser.add_argument(
"--deploy_flag", type=str, required=True, help="a deploy flag whether to deploy or not"
)
args = parser.parse_args()
logger.info(f"Running with arguments: {args}")
# Start Logging
mlflow.start_run()
if os.path.isfile(args.deploy_flag):
deploy_flag_file_path = args.deploy_flag
else:
deploy_flag_file_path = os.path.join(args.deploy_flag, "deploy_flag")
logger.info(f"Opening deploy_flag file from {deploy_flag_file_path}")
with open(deploy_flag_file_path, 'rb') as in_file:
deploy_flag = bool(int(in_file.read()))
if deploy_flag:
logger.info(f"Deploy flag is True, registering model as {args.register_as}...")
run = Run.get_context()
# if we're running locally, except
if run.__class__.__name__ == "_OfflineRun":
raise Exception("You can't run this script locally, you will need to run it as an AzureML job.")
_ = Model.register(
run.experiment.workspace,
model_name=args.register_as,
model_path=args.model_folder,
tags={
"type": "huggingface",
"task": "summarization"
},
description="Huggingface model finetuned for summarization",
)
else:
logger.info(f"Deploy flag is False, pass.")
# Stop Logging
mlflow.end_run()
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,269 @@
import logging
import os
from datasets import load_metric, load_from_disk
from transformers import (
AutoModelForSeq2SeqLM,
AutoTokenizer,
DataCollatorForSeq2Seq,
Seq2SeqTrainingArguments,
Seq2SeqTrainer,
HfArgumentParser,
IntervalStrategy,
)
from transformers.trainer_callback import TrainerCallback
import torch
import nltk
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
import mlflow
from pynvml import *
import time
# Input arguments are set with dataclass. Huggingface library stores the default training args in TrainingArguments dataclass
# user args are also defined in dataclasses, we will then load arguments from a tuple of user defined and built-in dataclasses.
@dataclass
class DataArgs:
# Inputs
preprocessed_datasets: str = field(
default=None, metadata={"help": "path to preprocesed datasets"}
)
# Processing parameters
max_target_length: Optional[int] = field(
default=128,
metadata={"help": "maxi sequence length for target text after tokenization."},
)
limit_samples: Optional[int] = field(
default=-1,
metadata={"help": "limit the number of samples for faster run."},
)
@dataclass
class ModelArgs:
model_name: Optional[str] = field(default=None, metadata={"help": "model name"})
model_path: Optional[str] = field(
default=None, metadata={"help": "path to existing model file to load"}
)
model_output: Optional[str] = field(
default=None, metadata={"help": "path to save the model"}
)
nltk.download("punkt")
def print_gpu_utilization():
nvmlInit()
handle = nvmlDeviceGetHandleByIndex(0)
info = nvmlDeviceGetMemoryInfo(handle)
print(f"GPU memory occupied: {info.used//1024**2} MB.")
def print_summary(result):
print(f"Time: {result.metrics['train_runtime']:.2f}")
print(f"Samples/second: {result.metrics['train_samples_per_second']:.2f}")
print_gpu_utilization()
def postprocess_text(preds, labels):
"""Postprocess output for computing metrics"""
preds = [pred.strip() for pred in preds]
labels = [label.strip() for label in labels]
# rougeLSum expects newline after each sentence
preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in preds]
labels = ["\n".join(nltk.sent_tokenize(label)) for label in labels]
return preds, labels
def compute_metrics(eval_preds, tokenizer, metric):
"""Compute metric based on predictions from evaluation"""
preds, labels = eval_preds
if isinstance(preds, tuple):
preds = preds[0]
decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
# Replace -100 in the labels as we can't decode them.
labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
# Some simple post-processing
decoded_preds, decoded_labels = postprocess_text(decoded_preds, decoded_labels)
result = metric.compute(
predictions=decoded_preds, references=decoded_labels, use_stemmer=True
)
# Extract a few results from ROUGE
result = {key: value.mid.fmeasure * 100 for key, value in result.items()}
prediction_lens = [
np.count_nonzero(pred != tokenizer.pad_token_id) for pred in preds
]
result["gen_len"] = np.mean(prediction_lens)
result = {k: round(v, 4) for k, v in result.items()}
return result
class CustomCallback(TrainerCallback):
"""A [`TrainerCallback`] that sends the logs to [AzureML](https://pypi.org/project/azureml-sdk/).
This is a hotfix for the issue raised here:
https://github.com/huggingface/transformers/issues/18870
"""
def on_log(self, args, state, control, logs=None, **kwargs):
if state.is_world_process_zero:
metrics = {}
for k, v in logs.items():
if isinstance(v, (int, float)):
metrics[k] = v
mlflow.log_metrics(metrics=metrics, step=state.global_step)
def main():
# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s : %(levelname)s : %(name)s : %(message)s"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# initialize the mlflow session
mlflow.start_run()
parser = HfArgumentParser((ModelArgs, DataArgs, Seq2SeqTrainingArguments))
model_args, data_args, training_args = parser.parse_args_into_dataclasses()
logger.info(f"Running with arguments: {model_args}, {data_args}, {training_args}")
# Check if this is the main node
is_this_main_node = int(os.environ.get("RANK", "0")) == 0
if is_this_main_node:
logger.info("This is the main Node")
input_datasets = load_from_disk(data_args.preprocessed_datasets)
logger.info(f"preprocessed dataset is loaded")
if model_args.model_path:
logger.info("using a saved model")
model = AutoModelForSeq2SeqLM.from_pretrained(model_args.model_path)
tokenizer = AutoTokenizer.from_pretrained(model_args.model_path)
else:
logger.info("using a model from model library")
model = AutoModelForSeq2SeqLM.from_pretrained(model_args.model_name)
tokenizer = AutoTokenizer.from_pretrained(model_args.model_name)
# Artificially limit the number of samples (for testing)
if training_args.do_train: # if using --do-train from Seq2SeqTrainingArguments
if data_args.limit_samples > 0:
max_train_samples = min(len(input_datasets["train"]), data_args.limit_samples)
train_dataset = input_datasets["train"].select(range(max_train_samples))
logger.info(f"train: making a {max_train_samples} sample of the data")
else:
train_dataset = input_datasets["train"]
if training_args.do_eval:
if data_args.limit_samples > 0:
max_eval_samples = min(
len(input_datasets["validation"]), data_args.limit_samples
)
eval_dataset = input_datasets["validation"].select(range(max_eval_samples))
logger.info(f"eval: making a {max_eval_samples} sample of the data")
else:
eval_dataset = input_datasets["validation"]
if training_args.do_predict:
if data_args.limit_samples > 0:
max_predict_samples = min(
len(input_datasets["test"]), data_args.limit_samples
)
predict_dataset = input_datasets["test"].select(range(max_predict_samples))
logger.info(f"predict: making a {max_predict_samples} sample of the data")
else:
predict_dataset = input_datasets["test"]
# Data collator
label_pad_token_id = -100
data_collator = DataCollatorForSeq2Seq(
tokenizer,
model=model,
label_pad_token_id=label_pad_token_id,
)
# Metric
metric = load_metric("rouge")
if training_args.do_train:
logging_steps = len(train_dataset) // training_args.per_device_train_batch_size
training_args.logging_steps = logging_steps
#training_args.output_dir = "outputs"
training_args.save_strategy = "epoch"
training_args.evaluation_strategy = IntervalStrategy.EPOCH
training_args.predict_with_generate = True
training_args.report_to = [] # use our own callback
logger.info(f"training args: {training_args}")
# Initialize our Trainer
trainer = Seq2SeqTrainer(
model=model,
args=training_args,
train_dataset=train_dataset if training_args.do_train else None,
eval_dataset=eval_dataset if training_args.do_eval else None,
tokenizer=tokenizer,
data_collator=data_collator,
compute_metrics=lambda preds : compute_metrics(preds, tokenizer, metric),
callbacks=[CustomCallback]
)
# Start the actual training (to include evaluation use --do-eval)
if training_args.do_train:
logger.info("Start training")
start = time.time()
train_result = trainer.train()
mlflow.log_metric(
"time/epoch", (time.time() - start) / 60 / training_args.num_train_epochs
)
logger.info(
"training is done"
) # Only print gpu utilization if gpu is available
if torch.cuda.is_available():
print_summary(train_result)
# Save the model as an output
if model_args.model_output and is_this_main_node:
logger.info(f"Saving the model at {model_args.model_output}")
os.makedirs(model_args.model_output, exist_ok=True)
trainer.save_model(model_args.model_output)
# Just run the predictions
if training_args.do_predict:
logger.info("*** Predict ***")
max_length = (
training_args.generation_max_length
if training_args.generation_max_length is not None
else data_args.max_target_length
)
predict_results = trainer.predict(
predict_dataset, metric_key_prefix="predict", max_length=max_length
)
metrics = predict_results.metrics
metrics["predict_samples"] = len(predict_dataset)
trainer.log_metrics("predict", metrics)
trainer.save_metrics("predict", metrics)
# Stop Logging
mlflow.end_run()
if __name__ == "__main__":
main()

Просмотреть файл

@ -0,0 +1,44 @@
import os
import logging
import json
from transformers import (
AutoModelForSeq2SeqLM,
AutoTokenizer,
)
def init():
"""
This function is called when the container is initialized/started, typically after create/update of the deployment.
You can write the logic here to perform init operations like caching the model in memory
"""
global model, tokenizer
# AZUREML_MODEL_DIR is an environment variable created during deployment.
# It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)
model_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), os.listdir(os.getenv("AZUREML_MODEL_DIR"))[0])
print("model_path")
print(os.listdir(model_path))
model = AutoModelForSeq2SeqLM.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)
print("Init complete")
def run(raw_data):
global model, tokenizer
"""
This function is called for every invocation of the endpoint to perform the actual scoring/prediction.
In the example we extract the data from the json input and call the scikit-learn model's predict()
method and return the result back
"""
logging.info("Request received")
article = json.loads(raw_data)["data"]
if "t5" in model.config.architectures[0].lower():
article= "summarize:" + article
inputs = tokenizer(article, return_tensors="pt", max_length=512, truncation=True)
outputs = model.generate(
inputs["input_ids"], max_length=150, min_length=40, length_penalty=2.0, num_beams=4, early_stopping=True
)
result = tokenizer.decode(outputs[0])
print(result)
logging.info("Request processed")
return result

Различия файлов скрыты, потому что одна или несколько строк слишком длинны