From b43da4c1459444b3d538318402e43f7e9e7e5ebb Mon Sep 17 00:00:00 2001 From: Your Name Date: Fri, 5 Nov 2021 08:20:58 +0000 Subject: [PATCH] init --- examples/train/dask-lightgbm/README.md | 19 ++ examples/train/dask-lightgbm/conda.yml | 19 ++ examples/train/dask-lightgbm/job.yml | 37 +++ examples/train/dask-lightgbm/src/startDask.py | 221 +++++++++++++++ .../train/dask-lightgbm/src/train-lgb-dask.py | 66 +++++ examples/train/nni-hyperband/README.md | 18 ++ .../train/nni-hyperband/config_hyperband.yml | 22 ++ examples/train/nni-hyperband/src/mnist.py | 146 ++++++++++ examples/train/pytorch-ddp/README.md | 13 + examples/train/pytorch-ddp/dataprep.py | 16 ++ examples/train/pytorch-ddp/job.yml | 26 ++ examples/train/pytorch-ddp/src/train.py | 261 ++++++++++++++++++ examples/train/ray-flaml/README.md | 17 ++ .../train/ray-flaml/azureml-tensorboard.ipynb | 91 ++++++ examples/train/ray-flaml/conda.yml | 13 + examples/train/ray-flaml/job.yml | 28 ++ examples/train/ray-flaml/src/startRay.py | 100 +++++++ .../train/ray-flaml/src/train-automl-flaml.py | 76 +++++ 18 files changed, 1189 insertions(+) create mode 100644 examples/train/dask-lightgbm/README.md create mode 100644 examples/train/dask-lightgbm/conda.yml create mode 100644 examples/train/dask-lightgbm/job.yml create mode 100644 examples/train/dask-lightgbm/src/startDask.py create mode 100644 examples/train/dask-lightgbm/src/train-lgb-dask.py create mode 100644 examples/train/nni-hyperband/README.md create mode 100644 examples/train/nni-hyperband/config_hyperband.yml create mode 100644 examples/train/nni-hyperband/src/mnist.py create mode 100644 examples/train/pytorch-ddp/README.md create mode 100644 examples/train/pytorch-ddp/dataprep.py create mode 100644 examples/train/pytorch-ddp/job.yml create mode 100644 examples/train/pytorch-ddp/src/train.py create mode 100644 examples/train/ray-flaml/README.md create mode 100644 examples/train/ray-flaml/azureml-tensorboard.ipynb create mode 100644 examples/train/ray-flaml/conda.yml create mode 100644 examples/train/ray-flaml/job.yml create mode 100644 examples/train/ray-flaml/src/startRay.py create mode 100644 examples/train/ray-flaml/src/train-automl-flaml.py diff --git a/examples/train/dask-lightgbm/README.md b/examples/train/dask-lightgbm/README.md new file mode 100644 index 0000000..68f688e --- /dev/null +++ b/examples/train/dask-lightgbm/README.md @@ -0,0 +1,19 @@ +# LightGBM Distributed Training with DASK + +This example shows how to use DASK to train LightGBM models in distributed mode on Azure Machine Learning. + +## Prerequisites + +- Azure Machine Learning Workspace + - Compute Clusters for DASK + - Compute Instance with Azure ML CLI 2.0 installed + +## LightGBM DASK Distributed Training + +LightGBM supports distributed training with DASK. DASK is a distributed computing framework for Python. See the following documents in reference section for more details. + + +## Reference + +- [DASK](https://dask.org/) +- [LightGBM DASK](https://lightgbm.readthedocs.io/en/latest/Parallel-Learning-Guide.html#dask) diff --git a/examples/train/dask-lightgbm/conda.yml b/examples/train/dask-lightgbm/conda.yml new file mode 100644 index 0000000..4972119 --- /dev/null +++ b/examples/train/dask-lightgbm/conda.yml @@ -0,0 +1,19 @@ +name: dask +channels: + - defaults + - conda-forge +dependencies: + - python=3.8 + - pip: + - lightgbm + - dask + - dask-ml + - bokeh + - pandas + - notebook + - matplotlib + - ipykernel + - numpy + - scikit-learn + - azureml-sdk + - azureml-mlflow diff --git a/examples/train/dask-lightgbm/job.yml b/examples/train/dask-lightgbm/job.yml new file mode 100644 index 0000000..703a362 --- /dev/null +++ b/examples/train/dask-lightgbm/job.yml @@ -0,0 +1,37 @@ +$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json + +code: + local_path: src + +# This is the command that will start up the dask cluster and run the script `prep-nyctaxi.py` with the following parameters. +# For an interactive session, just remove the --script. That will just start the cluster and mount the dataset. +command: >- + python startDask.py + --script train-lgb-dask.py + --dataset_path {inputs.nyc_taxi_dataset} + +inputs: + nyc_taxi_dataset: + data: + path: https://azuremlexamples.blob.core.windows.net/datasets/nyctaxi/ + mode: mount + +environment: + conda_file: file:conda.yml + docker: + image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04 + +compute: + # use a sku with lots of disk space and memory + target: azureml:daskclusters + instance_count: 5 + +distribution: + # The job below is currently launched with `type: pytorch` since that + # gives the full flexibility of assigning the work to the + # no pytorch is actually used in this job + type: pytorch + +experiment_name: dask-nyctaxi-lgb-train + +description: DASK LightGBM Job (Multiple Instances) diff --git a/examples/train/dask-lightgbm/src/startDask.py b/examples/train/dask-lightgbm/src/startDask.py new file mode 100644 index 0000000..4e747b2 --- /dev/null +++ b/examples/train/dask-lightgbm/src/startDask.py @@ -0,0 +1,221 @@ +import os +import argparse +import time +from dask.distributed import Client, get_task_stream +import sys, uuid +import threading +import subprocess +import socket +import mlflow +from bokeh.io import export_png # dashboard 保存 + + +from notebook.notebookapp import list_running_servers + + +def flush(proc, proc_log): + while True: + proc_out = proc.stdout.readline() + if proc_out == "" and proc.poll() is not None: + proc_log.close() + break + elif proc_out: + sys.stdout.write(proc_out) + proc_log.write(proc_out) + proc_log.flush() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--jupyter_token", default=uuid.uuid1().hex) + parser.add_argument("--script") + + args, unparsed = parser.parse_known_args() + + for k, v in os.environ.items(): + if k.startswith("MLFLOW"): + print(k, v) + MLFLOW_RUN_ID = os.getenv("MLFLOW_RUN_ID") + + # Dashboard の保存  + client = Client() + + + # 環境変数から、Daskの起動時に必要な情報を取得する + print( + "- env: AZ_BATCHAI_JOB_MASTER_NODE_IP: ", + os.environ.get("AZ_BATCHAI_JOB_MASTER_NODE_IP"), + ) + print( + "- env: AZ_BATCHAI_IS_CURRENT_NODE_MASTER: ", + os.environ.get("AZ_BATCHAI_IS_CURRENT_NODE_MASTER"), + ) + print("- env: AZ_BATCHAI_NODE_IP: ", os.environ.get("AZ_BATCHAI_NODE_IP")) + print("- env: AZ_BATCH_HOST_LIST: ", os.environ.get("AZ_BATCH_HOST_LIST")) + print("- env: AZ_BATCH_NODE_LIST: ", os.environ.get("AZ_BATCH_NODE_LIST")) + print("- env: MASTER_ADDR: ", os.environ.get("MASTER_ADDR")) + print("- env: MASTER_PORT: ", os.environ.get("MASTER_PORT")) + print("- env: RANK: ", os.environ.get("RANK")) + print("- env: LOCAL_RANK: ", os.environ.get("LOCAL_RANK")) + print("- env: NODE_RANK: ", os.environ.get("NODE_RANK")) + print("- env: WORLD_SIZE: ", os.environ.get("WORLD_SIZE")) + + rank = os.environ.get("RANK") + ip = socket.gethostbyname(socket.gethostname()) + master = os.environ.get("MASTER_ADDR") + master_port = os.environ.get("MASTER_PORT") + + print("- my rank is ", rank) + print("- my ip is ", ip) + print("- master is ", master) + print("- master port is ", master_port) + + scheduler = master + ":8786" + dashboard = master + ":8787" + print("- scheduler is ", scheduler) + print("- dashboard is ", dashboard) + + print("args: ", args) + print("unparsed: ", unparsed) + print("- my rank is ", rank) + print("- my ip is ", ip) + + if not os.path.exists("logs"): + os.makedirs("logs") + + print("free disk space on /tmp") + os.system(f"df -P /tmp") + + mlflow.log_param("WORLD_SIZE", os.environ.get("WORLD_SIZE")) + + # RANK 0 での処理 + if str(rank) == "0": + mlflow.log_param("headnode", ip) + mlflow.log_param( + "cluster", + "scheduler: {scheduler}, dashboard: {dashboard}".format( + scheduler=scheduler, dashboard=dashboard + ), + ) + + cmd = ( + "jupyter lab --ip 0.0.0.0 --port 8888" + + " --NotebookApp.token={token}" + + " --allow-root --no-browser" + ).format(token=args.jupyter_token) + os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID + jupyter_log = open("logs/jupyter_log.txt", "w") + jupyter_proc = subprocess.Popen( + cmd.split(), + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + jupyter_flush = threading.Thread(target=flush, args=(jupyter_proc, jupyter_log)) + jupyter_flush.start() + + # while not list(list_running_servers()): + # time.sleep(5) + + # jupyter_servers = list(list_running_servers()) + # assert (len(jupyter_servers) == 1), "more than one jupyter server is running" + + mlflow.log_param( + "jupyter", "ip: {ip_addr}, port: {port}".format(ip_addr=ip, port="8888") + ) + mlflow.log_param("jupyter-token", args.jupyter_token) + + cmd = ( + "dask-scheduler " + + "--port " + + scheduler.split(":")[1] + + " --dashboard-address " + + dashboard + ) + print(cmd) + os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID + scheduler_log = open("logs/scheduler_log.txt", "w") + scheduler_proc = subprocess.Popen( + cmd.split(), + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + scheduler_flush = threading.Thread( + target=flush, args=(scheduler_proc, scheduler_log) + ) + scheduler_flush.start() + + cmd = "dask-worker " + scheduler + print(cmd) + os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID + worker_log = open("logs/worker_{rank}_log.txt".format(rank=rank), "w") + worker_proc = subprocess.Popen( + cmd.split(), + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + worker_flush = threading.Thread(target=flush, args=(worker_proc, worker_log)) + worker_flush.start() + + + + + print("### OUTPUT STREAM ###") + with get_task_stream(client, plot='save', filename='task_stream.html') as ts: + futs = client.map(lambda x: time.sleep(x**2), range(5)) + results = client.gather(futs) + + if args.script: + command_line = " ".join(["python", args.script] + unparsed) + print("Launching:", command_line) + + os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID + driver_log = open("logs/driver_log.txt", "w") + driver_proc = subprocess.Popen( + command_line.split(), + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + driver_flush = threading.Thread( + target=flush, args=(driver_proc, driver_log) + ) + driver_flush.start() + + # Wait until process terminates (without using p.wait()) + # while driver_proc.poll() is None: + # # Process hasn't exited yet, let's wait some + # time.sleep(0.5) + + print("waiting for driver process to terminate") + driver_proc.wait() + + exit_code = driver_proc.returncode + print("process ended with code", exit_code) + print("killing scheduler, worker and jupyter") + + jupyter_proc.kill() + scheduler_proc.kill() + worker_proc.kill() + exit(exit_code) + export_png(ts.figure, filename="./outputs/plot_{rank}.png") + + else: + flush(scheduler_proc, scheduler_log) + # RANK 0 以外の処理 + else: + cmd = "dask-worker " + scheduler + print(cmd) + os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID + worker_log = open("logs/worker_{rank}_log.txt".format(rank=rank), "w") + worker_proc = subprocess.Popen( + cmd.split(), + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + flush(worker_proc, worker_log) + + diff --git a/examples/train/dask-lightgbm/src/train-lgb-dask.py b/examples/train/dask-lightgbm/src/train-lgb-dask.py new file mode 100644 index 0000000..61c4eac --- /dev/null +++ b/examples/train/dask-lightgbm/src/train-lgb-dask.py @@ -0,0 +1,66 @@ +import argparse +import pickle +from re import VERBOSE +import time + +import dask.dataframe as dd +import joblib +import lightgbm as lgb +import mlflow +from dask.distributed import Client, LocalCluster, performance_report, wait + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--dataset_path") + args = parser.parse_args() + dataset_path = args.dataset_path + + OUTOUT_DIR = './outputs' + + print("loading data") + df = dd.read_csv(f"{dataset_path}/*.csv", parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"]) + + df["total"] = df["total_amount"] + df["tolls_amount"] + df["tip_amount"] + df["extra"] + + dX = df.drop(["store_and_fwd_flag", "tpep_pickup_datetime", "tpep_dropoff_datetime", "total", "total_amount", "tolls_amount", "tip_amount", "extra"], axis=1) + dy = df["total"] + + print("initializing a Dask cluster") + + #cluster = LocalCluster(dashboard_address=':9999') # port 8787 is already used by R studio server + #client = Client(cluster) + + client = Client("localhost:8786") + + print("created a Dask LocalCluster") + + dX.persist() + dy.persist() + wait(dX) + wait(dy) + print("distributing training data on the Dask cluster") + + print("beginning training") + dask_model = lgb.DaskLGBMRegressor(n_estimators=100) + + with performance_report(filename="./outputs/dask-report.html"): + start = time.time() + dask_model.fit(dX, dy, verbose=5) + elapsed_time = time.time() - start + print("elapsed time: {}".format(elapsed_time)) + mlflow.log_metric("training_time", elapsed_time) + assert dask_model.fitted_ + + # Save sklearn Estimator Model + sklearn_model = dask_model.to_local() + joblib.dump(sklearn_model, "./outputs/dask-sklearn-model.joblib") + + # Save Dask LightGBM Model + with open("./outputs/dask-model.pkl", "wb") as f: + pickle.dump(dask_model, f) + + print("done training") + + + + diff --git a/examples/train/nni-hyperband/README.md b/examples/train/nni-hyperband/README.md new file mode 100644 index 0000000..7262cc2 --- /dev/null +++ b/examples/train/nni-hyperband/README.md @@ -0,0 +1,18 @@ +# HyperParameter Tuning HyperBand with NNI + +This example shows how to use NNI to perform hyperparameter tuning with HyperBand on Azure Machine Learning. + +## Prerequisites + +- Azure Machine Learning Workspace + - Compute Clusters for parallel training + - Compute Instance with Azure ML CLI 2.0 and NNI library installed + +## HPO with NNI + +Neural Network Intelligence (NNI) is a library that provides a unified interface for hyperparameter optimization. Many tuning algorithm is included. See the following link for more details in reference section. + +## Reference + +- [Neural Network Intelligence (NNI)](https://github.com/microsoft/nni) + diff --git a/examples/train/nni-hyperband/config_hyperband.yml b/examples/train/nni-hyperband/config_hyperband.yml new file mode 100644 index 0000000..333ed5f --- /dev/null +++ b/examples/train/nni-hyperband/config_hyperband.yml @@ -0,0 +1,22 @@ +searchSpaceFile: search_space.json +trialCommand: python3 mnist.py +trialCodeDirectory: src # The path of trial code. By default it's ".", which means the same directory of this config file. +trialGpuNumber: 0 +trialConcurrency: 10 +maxExperimentDuration: 10h +maxTrialNumber: 1000 +advisor: + name: Hyperband + classArgs: + R: 100 # the maximum trial budget (could be the number of mini-batches or epochs) can be + # allocated to a trial. Each trial should use trial budget to control how long it runs. + eta: 3 # proportion of discarded trials + optimize_mode: maximize # maximize or minimize + exec_mode: parallelism # serial or parallelism +TrainingService: + platform: aml + dockerImage: msranni/nni + subscriptionId: 82a5d8d3-5322-4c49-b9d6-da6e00be5d57 + resourceGroup: azureml-automl + workspaceName: azureml-automl + computeTarget: cpuclusters \ No newline at end of file diff --git a/examples/train/nni-hyperband/src/mnist.py b/examples/train/nni-hyperband/src/mnist.py new file mode 100644 index 0000000..b7f811b --- /dev/null +++ b/examples/train/nni-hyperband/src/mnist.py @@ -0,0 +1,146 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +NNI example trial code. + +- Experiment type: Hyper-parameter Optimization +- Trial framework: Tensorflow v2.x (Keras API) +- Model: LeNet-5 +- Dataset: MNIST +""" + +import logging + +import tensorflow as tf +from tensorflow.keras import Model +from tensorflow.keras.callbacks import Callback +from tensorflow.keras.layers import (Conv2D, Dense, Dropout, Flatten, MaxPool2D) +from tensorflow.keras.optimizers import Adam + +import nni + +_logger = logging.getLogger('mnist_example') +_logger.setLevel(logging.INFO) + + +class MnistModel(Model): + """ + LeNet-5 Model with customizable hyper-parameters + """ + def __init__(self, conv_size, hidden_size, dropout_rate): + """ + Initialize hyper-parameters. + + Parameters + ---------- + conv_size : int + Kernel size of convolutional layers. + hidden_size : int + Dimensionality of last hidden layer. + dropout_rate : float + Dropout rate between two fully connected (dense) layers, to prevent co-adaptation. + """ + super().__init__() + self.conv1 = Conv2D(filters=32, kernel_size=conv_size, activation='relu') + self.pool1 = MaxPool2D(pool_size=2) + self.conv2 = Conv2D(filters=64, kernel_size=conv_size, activation='relu') + self.pool2 = MaxPool2D(pool_size=2) + self.flatten = Flatten() + self.fc1 = Dense(units=hidden_size, activation='relu') + self.dropout = Dropout(rate=dropout_rate) + self.fc2 = Dense(units=10, activation='softmax') + + def call(self, x): + """Override ``Model.call`` to build LeNet-5 model.""" + x = self.conv1(x) + x = self.pool1(x) + x = self.conv2(x) + x = self.pool2(x) + x = self.flatten(x) + x = self.fc1(x) + x = self.dropout(x) + return self.fc2(x) + + +class ReportIntermediates(Callback): + """ + Callback class for reporting intermediate accuracy metrics. + + This callback sends accuracy to NNI framework every 100 steps, + so you can view the learning curve on web UI. + + If an assessor is configured in experiment's YAML file, + it will use these metrics for early stopping. + """ + def on_epoch_end(self, epoch, logs=None): + """Reports intermediate accuracy to NNI framework""" + # TensorFlow 2.0 API reference claims the key is `val_acc`, but in fact it's `val_accuracy` + if 'val_acc' in logs: + nni.report_intermediate_result(logs['val_acc']) + else: + nni.report_intermediate_result(logs['val_accuracy']) + + +def load_dataset(): + """Download and reformat MNIST dataset""" + mnist = tf.keras.datasets.mnist + (x_train, y_train), (x_test, y_test) = mnist.load_data() + x_train, x_test = x_train / 255.0, x_test / 255.0 + x_train = x_train[..., tf.newaxis] + x_test = x_test[..., tf.newaxis] + return (x_train, y_train), (x_test, y_test) + + +def main(params): + """ + Main program: + - Build network + - Prepare dataset + - Train the model + - Report accuracy to tuner + """ + model = MnistModel( + conv_size=params['conv_size'], + hidden_size=params['hidden_size'], + dropout_rate=params['dropout_rate'] + ) + optimizer = Adam(learning_rate=params['learning_rate']) + model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy']) + _logger.info('Model built') + + (x_train, y_train), (x_test, y_test) = load_dataset() + _logger.info('Dataset loaded') + + model.fit( + x_train, + y_train, + batch_size=params['batch_size'], + epochs=10, + verbose=0, + callbacks=[ReportIntermediates()], + validation_data=(x_test, y_test) + ) + _logger.info('Training completed') + + loss, accuracy = model.evaluate(x_test, y_test, verbose=0) + nni.report_final_result(accuracy) # send final accuracy to NNI tuner and web UI + _logger.info('Final accuracy reported: %s', accuracy) + + +if __name__ == '__main__': + params = { + 'dropout_rate': 0.5, + 'conv_size': 5, + 'hidden_size': 1024, + 'batch_size': 32, + 'learning_rate': 1e-4, + } + + # fetch hyper-parameters from HPO tuner + # comment out following two lines to run the code without NNI framework + tuned_params = nni.get_next_parameter() + params.update(tuned_params) + + _logger.info('Hyper-parameters: %s', params) + main(params) \ No newline at end of file diff --git a/examples/train/pytorch-ddp/README.md b/examples/train/pytorch-ddp/README.md new file mode 100644 index 0000000..df30f00 --- /dev/null +++ b/examples/train/pytorch-ddp/README.md @@ -0,0 +1,13 @@ +# PyTorch Distributed Data Parallel (DDP) + +This example shows how to use Distributed Data Parallel (DDP) with PyTorch on Azure Machine Learning. + + +## Prerequisites +- Azure Machine Learning Workspace + - Compute Clusters with GPU for distributed training + - Compute Instance with Azure ML CLI 2.0 installed + +## Reference +- [PyTorch Distributed Data Parallel (DDP)][1] +[1]: https://pytorch.org/docs/stable/distributed.html diff --git a/examples/train/pytorch-ddp/dataprep.py b/examples/train/pytorch-ddp/dataprep.py new file mode 100644 index 0000000..d537259 --- /dev/null +++ b/examples/train/pytorch-ddp/dataprep.py @@ -0,0 +1,16 @@ +import urllib +import urllib.request +import tarfile +import os + +url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz' +filename = 'cifar-10-python.tar.gz' +data_root = 'data' +filepath = os.path.join(data_root, filename) + +if not os.path.isdir(data_root): + os.makedirs(data_root, exist_ok=True) + urllib.request.urlretrieve(url, filepath) + with tarfile.open(filepath, "r:gz") as tar: + tar.extractall(path=data_root) + os.remove(filepath) # delete tar.gz file after extraction \ No newline at end of file diff --git a/examples/train/pytorch-ddp/job.yml b/examples/train/pytorch-ddp/job.yml new file mode 100644 index 0000000..f3a6b2f --- /dev/null +++ b/examples/train/pytorch-ddp/job.yml @@ -0,0 +1,26 @@ +$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json +code: + local_path: src +command: >- + python train.py + --epochs 1000 + --data-dir {inputs.cifar} + +inputs: + cifar: + data: + local_path: data + mode: mount + +environment: azureml:AzureML-pytorch-1.9-ubuntu18.04-py37-cuda11-gpu:3 + +compute: + target: azureml:gpuclusters2 + instance_count: 2 + +distribution: + type: pytorch + process_count: 4 + +experiment_name: pytorch-cifar-distributed-example +description: Train a basic convolutional neural network (CNN) with PyTorch on the CIFAR-10 dataset, distributed via PyTorch. diff --git a/examples/train/pytorch-ddp/src/train.py b/examples/train/pytorch-ddp/src/train.py new file mode 100644 index 0000000..7ed1855 --- /dev/null +++ b/examples/train/pytorch-ddp/src/train.py @@ -0,0 +1,261 @@ +# Copyright (c) 2017 Facebook, Inc. All rights reserved. +# BSD 3-Clause License +# +# Script adapted from: https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html#sphx-glr-beginner-blitz-cifar10-tutorial-py +# ============================================================================== + +# imports +import os +import mlflow +import argparse + +import torch +import torchvision +import torchvision.transforms as transforms +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim + +# define network architecture +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(3, 32, 3) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(32, 64, 3) + self.conv3 = nn.Conv2d(64, 128, 3) + self.fc1 = nn.Linear(128 * 6 * 6, 120) + self.dropout = nn.Dropout(p=0.2) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = F.relu(self.conv1(x)) + x = self.pool(F.relu(self.conv2(x))) + x = self.pool(F.relu(self.conv3(x))) + x = x.view(-1, 128 * 6 * 6) + x = self.dropout(F.relu(self.fc1(x))) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x + + +# define functions +def train(train_loader, model, criterion, optimizer, epoch, device, print_freq, rank): + running_loss = 0.0 + for i, data in enumerate(train_loader, 0): + # get the inputs; data is a list of [inputs, labels] + inputs, labels = data[0].to(device), data[1].to(device) + + # zero the parameter gradients + optimizer.zero_grad() + + # forward + backward + optimize + outputs = model(inputs) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + # print statistics + running_loss += loss.item() + + # mlflow loss logging + mlflow.log_metric(f"train-loss-{rank}", running_loss) + + + +def evaluate(test_loader, model, device): + classes = ( + "plane", + "car", + "bird", + "cat", + "deer", + "dog", + "frog", + "horse", + "ship", + "truck", + ) + + model.eval() + + correct = 0 + total = 0 + class_correct = list(0.0 for i in range(10)) + class_total = list(0.0 for i in range(10)) + with torch.no_grad(): + for data in test_loader: + images, labels = data[0].to(device), data[1].to(device) + outputs = model(images) + _, predicted = torch.max(outputs.data, 1) + total += labels.size(0) + correct += (predicted == labels).sum().item() + c = (predicted == labels).squeeze() + for i in range(10): + label = labels[i] + class_correct[label] += c[i].item() + class_total[label] += 1 + + # print total test set accuracy + print( + "Accuracy of the network on the 10000 test images: %d %%" + % (100 * correct / total) + ) + mlflow.log_metric("Accuracy of test images", 100 * correct / total) + + # print test accuracy for each of the classes + for i in range(10): + print( + "Accuracy of %5s : %2d %%" + % (classes[i], 100 * class_correct[i] / class_total[i]) + ) + mlflow.log_metric(f"{classes[i]}", 100 * class_correct[i] / class_total[i]) + + +def main(args): + # get PyTorch environment variables + world_size = int(os.environ["WORLD_SIZE"]) + rank = int(os.environ["RANK"]) + local_rank = int(os.environ["LOCAL_RANK"]) + + distributed = world_size > 1 + + # set device + if distributed: + device = torch.device("cuda", local_rank) + else: + device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + # initialize distributed process group using default env:// method + if distributed: + torch.distributed.init_process_group(backend="nccl") + + # define train and test dataset DataLoaders + transform = transforms.Compose( + [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))] + ) + + train_set = torchvision.datasets.CIFAR10( + root=args.data_dir, train=True, download=False, transform=transform + ) + + if distributed: + train_sampler = torch.utils.data.distributed.DistributedSampler(train_set) + else: + train_sampler = None + + train_loader = torch.utils.data.DataLoader( + train_set, + batch_size=args.batch_size, + shuffle=(train_sampler is None), + num_workers=args.workers, + sampler=train_sampler, + ) + + test_set = torchvision.datasets.CIFAR10( + root=args.data_dir, train=False, download=False, transform=transform + ) + test_loader = torch.utils.data.DataLoader( + test_set, batch_size=args.batch_size, shuffle=False, num_workers=args.workers + ) + + model = Net().to(device) + + # wrap model with DDP + if distributed: + model = nn.parallel.DistributedDataParallel( + model, device_ids=[local_rank], output_device=local_rank + ) + + # define loss function and optimizer + criterion = nn.CrossEntropyLoss() + optimizer = optim.SGD( + model.parameters(), lr=args.learning_rate, momentum=args.momentum + ) + + # train the model + for epoch in range(args.epochs): + print("Rank %d: Starting epoch %d" % (rank, epoch)) + if distributed: + train_sampler.set_epoch(epoch) + model.train() + train( + train_loader, + model, + criterion, + optimizer, + epoch, + device, + args.print_freq, + rank, + ) + + print("Rank %d: Finished Training" % (rank)) + + if not distributed or rank == 0: + # log model + mlflow.pytorch.log_model(model, "./model") + + # evaluate on full test dataset + evaluate(test_loader, model, device) + + +def parse_args(): + # setup argparse + parser = argparse.ArgumentParser() + + # add arguments + parser.add_argument( + "--data-dir", type=str, help="directory containing CIFAR-10 dataset" + ) + parser.add_argument("--epochs", default=10, type=int, help="number of epochs") + parser.add_argument( + "--batch-size", + default=16, + type=int, + help="mini batch size for each gpu/process", + ) + parser.add_argument( + "--workers", + default=2, + type=int, + help="number of data loading workers for each gpu/process", + ) + parser.add_argument( + "--learning-rate", default=0.001, type=float, help="learning rate" + ) + parser.add_argument("--momentum", default=0.9, type=float, help="momentum") + parser.add_argument( + "--print-freq", + default=200, + type=int, + help="frequency of printing training statistics", + ) + + # parse args + args = parser.parse_args() + + # mlflow logging + mlflow.log_param("batch size", args.batch_size) + mlflow.log_param("learning rate", args.learning_rate) + mlflow.log_param("momentum", args.momentum) + + # return args + return args + + +# run script +if __name__ == "__main__": + # add space in logs + print("*" * 60) + print("\n\n") + + # parse args + args = parse_args() + # call main function + main(args) + + # add space in logs + print("*" * 60) + print("\n\n") diff --git a/examples/train/ray-flaml/README.md b/examples/train/ray-flaml/README.md new file mode 100644 index 0000000..1916e58 --- /dev/null +++ b/examples/train/ray-flaml/README.md @@ -0,0 +1,17 @@ +# FLAML AutoML with RAY + +This example shows how to use FLAML to train a model on a dataset using RAY on Azure Machine Learning. + +## Prerequisites +- Azure Machine Learning Workspace + - Compute Clusters for Ray + - Compute Instance with Azure ML CLI 2.0 installed + +## FLAML with RAY +FLAML is a lightweight Python library that finds accurate machine learning models automatically, efficiently and economically. FLAML support Ray Tune for distributed search. + + + +## Reference + +- [FLAML: A Framework for Learning from Data](https://github.com/microsoft/FLAML) \ No newline at end of file diff --git a/examples/train/ray-flaml/azureml-tensorboard.ipynb b/examples/train/ray-flaml/azureml-tensorboard.ipynb new file mode 100644 index 0000000..1c0ab47 --- /dev/null +++ b/examples/train/ray-flaml/azureml-tensorboard.ipynb @@ -0,0 +1,91 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "from azureml.core import Workspace, Run\n", + "from azureml.tensorboard import Tensorboard\n", + "ws = Workspace.from_config()\n", + "run = Run.get(workspace=ws, run_id=\"cd0a70d1-aa17-4991-a9a5-98dfd0142663\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "https://automl-client-6006.japaneast.instances.azureml.ms\n" + ] + }, + { + "data": { + "text/plain": [ + "'https://automl-client-6006.japaneast.instances.azureml.ms'" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "tb = Tensorboard([run], local_root=\"logs/azureml\", port=6006)\n", + "tb.start()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "tb.stop()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "interpreter": { + "hash": "be3ccc1aacd5cd0ada9eab9372c3d7f901636bca88db42a566c3f238cceb324c" + }, + "kernelspec": { + "display_name": "Python 3.6.13 64-bit ('ray': conda)", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.13" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/train/ray-flaml/conda.yml b/examples/train/ray-flaml/conda.yml new file mode 100644 index 0000000..79f827a --- /dev/null +++ b/examples/train/ray-flaml/conda.yml @@ -0,0 +1,13 @@ +name: ray +channels: + - defaults + - conda-forge +dependencies: + - python=3.7 + - pip: + - flaml[notebook, blendsearch, ray, azureml]==0.6.9 + - azureml-tensorboard + - ipykernel + - matplotlib + - tensorboardX<=2.2 + - mpi4py \ No newline at end of file diff --git a/examples/train/ray-flaml/job.yml b/examples/train/ray-flaml/job.yml new file mode 100644 index 0000000..ead44dd --- /dev/null +++ b/examples/train/ray-flaml/job.yml @@ -0,0 +1,28 @@ +$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json + +code: + local_path: src + +# This is the command that will start up the ray cluster and run the script `train-automl-flaml.py` with the following parameters. +command: >- + python startRay.py + --script train-automl-flaml.py + +environment: + conda_file: file:conda.yml + docker: + image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04 + +compute: + target: azureml:cpuclusters + instance_count: 3 + +distribution: + # The job below is currently launched with `type: pytorch` since that + # gives the full flexibility of assigning the work to the + # no pytorch is actually used in this job + type: pytorch + +experiment_name: train-automl-flaml + +description: FLAML AutoML on Ray Cluster using Azure Machine Learning Compute Cluster diff --git a/examples/train/ray-flaml/src/startRay.py b/examples/train/ray-flaml/src/startRay.py new file mode 100644 index 0000000..dc65dc9 --- /dev/null +++ b/examples/train/ray-flaml/src/startRay.py @@ -0,0 +1,100 @@ +import threading +import sys +import subprocess +import os +import argparse +from mpi4py import MPI + +def flush(proc, proc_log): + while True: + proc_out = proc.stdout.readline() + if proc_out == "" and proc.poll() is not None: + proc_log.close() + break + elif proc_out: + sys.stdout.write(proc_out) + proc_log.write(proc_out) + proc_log.flush() + +if __name__ == "__main__": + comm = MPI.COMM_WORLD + mpi_rank = comm.Get_rank() + print("mpi rank:", mpi_rank) + + parser = argparse.ArgumentParser() + parser.add_argument("--port", default="6379", type=int) + parser.add_argument("--script", type=str) + args = parser.parse_args() + + # this scirpt is for flaml automl training + script = args.script + + # port for ray head node + port = args.port + print("head port is ", port) + + head_ip = os.environ.get("MASTER_ADDR") + print("head address is ", head_ip) + + rank = os.environ.get("RANK") + print("my rank is ", rank) + + # TODO:Get Password from Azure KeyVault + password = "password" + + # Ray Head Node + if str(rank) == "0": + head_log = open("logs/worker_{rank}_log.txt".format(rank=rank), "w") + + cmd = f"ray start --head --port={port} --redis-password={password} --dashboard-port=9999" + print(cmd) + + head_proc = subprocess.Popen( + cmd.split(), + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + head_flush = threading.Thread( + target=flush, args=(head_proc, head_log) + ) + head_flush.start() + + python_log = open("logs/python_{rank}_log.txt".format(rank=rank), "w") + command_line = f"python {script} --redis-password={password}" + driver_proc = subprocess.Popen( + command_line.split(), + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + + driver_flush = threading.Thread( + target=flush, args=(driver_proc, python_log) + ) + driver_flush.start() + driver_proc.wait() + + head_proc.kill() + driver_proc.kill() + print("### Head Job Finished") + + + # Ray Worker Node + else: + worker_log = open("logs/worker_{rank}_log.txt".format(rank=rank), "w") + cmd = f"ray start --address={head_ip}:{port} --redis-password {password}" + print(cmd) + worker_proc = subprocess.Popen( + cmd.split(), + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + ) + flush(worker_proc, worker_log) + # worker_flush = threading.Thread( + # target=flush, args=(worker_proc, worker_log) + # ) + # worker_flush.start() + # worker_proc.wait() + diff --git a/examples/train/ray-flaml/src/train-automl-flaml.py b/examples/train/ray-flaml/src/train-automl-flaml.py new file mode 100644 index 0000000..e6da4b8 --- /dev/null +++ b/examples/train/ray-flaml/src/train-automl-flaml.py @@ -0,0 +1,76 @@ +import argparse + +import mlflow +import ray +from flaml import AutoML +from sklearn.datasets import load_diabetes +from tensorboardX import SummaryWriter + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--redis-password", default='password') + args = parser.parse_args() + password = args.redis_password + + ray.init('auto', _redis_password=f'{password}') + # Initialize an AutoML instance + automl = AutoML() + # Specify automl goal and constraint + automl_settings = { + "time_budget": 300, # in seconds + "metric": 'mse', + "task": 'regression', + "log_type": 'all', + "n_concurrent_trials": 3, + "log_file_name": "./outputs/diabetes.log", + "log_training_metric": True, + "log_type": 'all', + "append_log": True, + } + + X_train, y_train = load_diabetes(return_X_y=True) + + # Train with labeled input data + # TODO: mlflow logging to Azure ML + mlflow.log_param("n_concurrent_trials", automl_settings['n_concurrent_trials']) + mlflow.log_param("task", automl_settings['task']) + mlflow.log_param("metric", automl_settings['metric']) + mlflow.log_param("time_budget", automl_settings['time_budget']) + + + try: + automl.fit(X_train=X_train, y_train=y_train, **automl_settings) + except Exception as e: + print(e) + finally: + print('Best ML leaner:', automl.best_estimator) + print('Best hyperparmeter config:', automl.best_config) + print('Best MSE: ', automl.best_loss) + + from flaml.data import get_output_from_log + time_history, best_valid_loss_history, valid_loss_history, config_history, metric_history = \ + get_output_from_log(filename=automl_settings['log_file_name'], time_budget=240) + print(time_history, best_valid_loss_history, valid_loss_history, config_history, metric_history) + + with SummaryWriter(comment='azureml', log_dir="logs/azureml/") as writer: + for config, metric in zip(config_history, metric_history): + hparam_dict_learner = {key: value for key, value in config.items() if key == 'Current Learner'} + hparam_dict_param = config['Current Hyper-parameters']['ml'] + writer.add_hparams(hparam_dict=dict(**hparam_dict_learner, **hparam_dict_param), metric_dict=metric) + mlflow.log_metric("mse", metric['train_loss']) + + import matplotlib.pyplot as plt + import numpy as np + + fig = plt.figure() + plt.title('Learning Curve') + plt.xlabel('Wall Clock Time (s)') + plt.ylabel('mse') + plt.scatter(time_history, np.array(valid_loss_history)) + plt.step(time_history, np.array(best_valid_loss_history), where='post') + plt.savefig("figure.png") + mlflow.log_figure(fig, "figure.png") + plt.show() + + ray.shutdown() +