This commit is contained in:
Your Name 2021-11-05 08:20:58 +00:00
Родитель ef3a461730
Коммит b43da4c145
18 изменённых файлов: 1189 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,19 @@
# LightGBM Distributed Training with DASK
This example shows how to use DASK to train LightGBM models in distributed mode on Azure Machine Learning.
## Prerequisites
- Azure Machine Learning Workspace
- Compute Clusters for DASK
- Compute Instance with Azure ML CLI 2.0 installed
## LightGBM DASK Distributed Training
LightGBM supports distributed training with DASK. DASK is a distributed computing framework for Python. See the following documents in reference section for more details.
## Reference
- [DASK](https://dask.org/)
- [LightGBM DASK](https://lightgbm.readthedocs.io/en/latest/Parallel-Learning-Guide.html#dask)

Просмотреть файл

@ -0,0 +1,19 @@
name: dask
channels:
- defaults
- conda-forge
dependencies:
- python=3.8
- pip:
- lightgbm
- dask
- dask-ml
- bokeh
- pandas
- notebook
- matplotlib
- ipykernel
- numpy
- scikit-learn
- azureml-sdk
- azureml-mlflow

Просмотреть файл

@ -0,0 +1,37 @@
$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json
code:
local_path: src
# This is the command that will start up the dask cluster and run the script `prep-nyctaxi.py` with the following parameters.
# For an interactive session, just remove the --script. That will just start the cluster and mount the dataset.
command: >-
python startDask.py
--script train-lgb-dask.py
--dataset_path {inputs.nyc_taxi_dataset}
inputs:
nyc_taxi_dataset:
data:
path: https://azuremlexamples.blob.core.windows.net/datasets/nyctaxi/
mode: mount
environment:
conda_file: file:conda.yml
docker:
image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04
compute:
# use a sku with lots of disk space and memory
target: azureml:daskclusters
instance_count: 5
distribution:
# The job below is currently launched with `type: pytorch` since that
# gives the full flexibility of assigning the work to the
# no pytorch is actually used in this job
type: pytorch
experiment_name: dask-nyctaxi-lgb-train
description: DASK LightGBM Job (Multiple Instances)

Просмотреть файл

@ -0,0 +1,221 @@
import os
import argparse
import time
from dask.distributed import Client, get_task_stream
import sys, uuid
import threading
import subprocess
import socket
import mlflow
from bokeh.io import export_png # dashboard 保存
from notebook.notebookapp import list_running_servers
def flush(proc, proc_log):
while True:
proc_out = proc.stdout.readline()
if proc_out == "" and proc.poll() is not None:
proc_log.close()
break
elif proc_out:
sys.stdout.write(proc_out)
proc_log.write(proc_out)
proc_log.flush()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--jupyter_token", default=uuid.uuid1().hex)
parser.add_argument("--script")
args, unparsed = parser.parse_known_args()
for k, v in os.environ.items():
if k.startswith("MLFLOW"):
print(k, v)
MLFLOW_RUN_ID = os.getenv("MLFLOW_RUN_ID")
# Dashboard の保存 
client = Client()
# 環境変数から、Daskの起動時に必要な情報を取得する
print(
"- env: AZ_BATCHAI_JOB_MASTER_NODE_IP: ",
os.environ.get("AZ_BATCHAI_JOB_MASTER_NODE_IP"),
)
print(
"- env: AZ_BATCHAI_IS_CURRENT_NODE_MASTER: ",
os.environ.get("AZ_BATCHAI_IS_CURRENT_NODE_MASTER"),
)
print("- env: AZ_BATCHAI_NODE_IP: ", os.environ.get("AZ_BATCHAI_NODE_IP"))
print("- env: AZ_BATCH_HOST_LIST: ", os.environ.get("AZ_BATCH_HOST_LIST"))
print("- env: AZ_BATCH_NODE_LIST: ", os.environ.get("AZ_BATCH_NODE_LIST"))
print("- env: MASTER_ADDR: ", os.environ.get("MASTER_ADDR"))
print("- env: MASTER_PORT: ", os.environ.get("MASTER_PORT"))
print("- env: RANK: ", os.environ.get("RANK"))
print("- env: LOCAL_RANK: ", os.environ.get("LOCAL_RANK"))
print("- env: NODE_RANK: ", os.environ.get("NODE_RANK"))
print("- env: WORLD_SIZE: ", os.environ.get("WORLD_SIZE"))
rank = os.environ.get("RANK")
ip = socket.gethostbyname(socket.gethostname())
master = os.environ.get("MASTER_ADDR")
master_port = os.environ.get("MASTER_PORT")
print("- my rank is ", rank)
print("- my ip is ", ip)
print("- master is ", master)
print("- master port is ", master_port)
scheduler = master + ":8786"
dashboard = master + ":8787"
print("- scheduler is ", scheduler)
print("- dashboard is ", dashboard)
print("args: ", args)
print("unparsed: ", unparsed)
print("- my rank is ", rank)
print("- my ip is ", ip)
if not os.path.exists("logs"):
os.makedirs("logs")
print("free disk space on /tmp")
os.system(f"df -P /tmp")
mlflow.log_param("WORLD_SIZE", os.environ.get("WORLD_SIZE"))
# RANK 0 での処理
if str(rank) == "0":
mlflow.log_param("headnode", ip)
mlflow.log_param(
"cluster",
"scheduler: {scheduler}, dashboard: {dashboard}".format(
scheduler=scheduler, dashboard=dashboard
),
)
cmd = (
"jupyter lab --ip 0.0.0.0 --port 8888"
+ " --NotebookApp.token={token}"
+ " --allow-root --no-browser"
).format(token=args.jupyter_token)
os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID
jupyter_log = open("logs/jupyter_log.txt", "w")
jupyter_proc = subprocess.Popen(
cmd.split(),
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
jupyter_flush = threading.Thread(target=flush, args=(jupyter_proc, jupyter_log))
jupyter_flush.start()
# while not list(list_running_servers()):
# time.sleep(5)
# jupyter_servers = list(list_running_servers())
# assert (len(jupyter_servers) == 1), "more than one jupyter server is running"
mlflow.log_param(
"jupyter", "ip: {ip_addr}, port: {port}".format(ip_addr=ip, port="8888")
)
mlflow.log_param("jupyter-token", args.jupyter_token)
cmd = (
"dask-scheduler "
+ "--port "
+ scheduler.split(":")[1]
+ " --dashboard-address "
+ dashboard
)
print(cmd)
os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID
scheduler_log = open("logs/scheduler_log.txt", "w")
scheduler_proc = subprocess.Popen(
cmd.split(),
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
scheduler_flush = threading.Thread(
target=flush, args=(scheduler_proc, scheduler_log)
)
scheduler_flush.start()
cmd = "dask-worker " + scheduler
print(cmd)
os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID
worker_log = open("logs/worker_{rank}_log.txt".format(rank=rank), "w")
worker_proc = subprocess.Popen(
cmd.split(),
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
worker_flush = threading.Thread(target=flush, args=(worker_proc, worker_log))
worker_flush.start()
print("### OUTPUT STREAM ###")
with get_task_stream(client, plot='save', filename='task_stream.html') as ts:
futs = client.map(lambda x: time.sleep(x**2), range(5))
results = client.gather(futs)
if args.script:
command_line = " ".join(["python", args.script] + unparsed)
print("Launching:", command_line)
os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID
driver_log = open("logs/driver_log.txt", "w")
driver_proc = subprocess.Popen(
command_line.split(),
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
driver_flush = threading.Thread(
target=flush, args=(driver_proc, driver_log)
)
driver_flush.start()
# Wait until process terminates (without using p.wait())
# while driver_proc.poll() is None:
# # Process hasn't exited yet, let's wait some
# time.sleep(0.5)
print("waiting for driver process to terminate")
driver_proc.wait()
exit_code = driver_proc.returncode
print("process ended with code", exit_code)
print("killing scheduler, worker and jupyter")
jupyter_proc.kill()
scheduler_proc.kill()
worker_proc.kill()
exit(exit_code)
export_png(ts.figure, filename="./outputs/plot_{rank}.png")
else:
flush(scheduler_proc, scheduler_log)
# RANK 0 以外の処理
else:
cmd = "dask-worker " + scheduler
print(cmd)
os.environ["MLFLOW_RUN_ID"] = MLFLOW_RUN_ID
worker_log = open("logs/worker_{rank}_log.txt".format(rank=rank), "w")
worker_proc = subprocess.Popen(
cmd.split(),
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
flush(worker_proc, worker_log)

Просмотреть файл

@ -0,0 +1,66 @@
import argparse
import pickle
from re import VERBOSE
import time
import dask.dataframe as dd
import joblib
import lightgbm as lgb
import mlflow
from dask.distributed import Client, LocalCluster, performance_report, wait
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--dataset_path")
args = parser.parse_args()
dataset_path = args.dataset_path
OUTOUT_DIR = './outputs'
print("loading data")
df = dd.read_csv(f"{dataset_path}/*.csv", parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"])
df["total"] = df["total_amount"] + df["tolls_amount"] + df["tip_amount"] + df["extra"]
dX = df.drop(["store_and_fwd_flag", "tpep_pickup_datetime", "tpep_dropoff_datetime", "total", "total_amount", "tolls_amount", "tip_amount", "extra"], axis=1)
dy = df["total"]
print("initializing a Dask cluster")
#cluster = LocalCluster(dashboard_address=':9999') # port 8787 is already used by R studio server
#client = Client(cluster)
client = Client("localhost:8786")
print("created a Dask LocalCluster")
dX.persist()
dy.persist()
wait(dX)
wait(dy)
print("distributing training data on the Dask cluster")
print("beginning training")
dask_model = lgb.DaskLGBMRegressor(n_estimators=100)
with performance_report(filename="./outputs/dask-report.html"):
start = time.time()
dask_model.fit(dX, dy, verbose=5)
elapsed_time = time.time() - start
print("elapsed time: {}".format(elapsed_time))
mlflow.log_metric("training_time", elapsed_time)
assert dask_model.fitted_
# Save sklearn Estimator Model
sklearn_model = dask_model.to_local()
joblib.dump(sklearn_model, "./outputs/dask-sklearn-model.joblib")
# Save Dask LightGBM Model
with open("./outputs/dask-model.pkl", "wb") as f:
pickle.dump(dask_model, f)
print("done training")

Просмотреть файл

@ -0,0 +1,18 @@
# HyperParameter Tuning HyperBand with NNI
This example shows how to use NNI to perform hyperparameter tuning with HyperBand on Azure Machine Learning.
## Prerequisites
- Azure Machine Learning Workspace
- Compute Clusters for parallel training
- Compute Instance with Azure ML CLI 2.0 and NNI library installed
## HPO with NNI
Neural Network Intelligence (NNI) is a library that provides a unified interface for hyperparameter optimization. Many tuning algorithm is included. See the following link for more details in reference section.
## Reference
- [Neural Network Intelligence (NNI)](https://github.com/microsoft/nni)

Просмотреть файл

@ -0,0 +1,22 @@
searchSpaceFile: search_space.json
trialCommand: python3 mnist.py
trialCodeDirectory: src # The path of trial code. By default it's ".", which means the same directory of this config file.
trialGpuNumber: 0
trialConcurrency: 10
maxExperimentDuration: 10h
maxTrialNumber: 1000
advisor:
name: Hyperband
classArgs:
R: 100 # the maximum trial budget (could be the number of mini-batches or epochs) can be
# allocated to a trial. Each trial should use trial budget to control how long it runs.
eta: 3 # proportion of discarded trials
optimize_mode: maximize # maximize or minimize
exec_mode: parallelism # serial or parallelism
TrainingService:
platform: aml
dockerImage: msranni/nni
subscriptionId: 82a5d8d3-5322-4c49-b9d6-da6e00be5d57
resourceGroup: azureml-automl
workspaceName: azureml-automl
computeTarget: cpuclusters

Просмотреть файл

@ -0,0 +1,146 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
"""
NNI example trial code.
- Experiment type: Hyper-parameter Optimization
- Trial framework: Tensorflow v2.x (Keras API)
- Model: LeNet-5
- Dataset: MNIST
"""
import logging
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.layers import (Conv2D, Dense, Dropout, Flatten, MaxPool2D)
from tensorflow.keras.optimizers import Adam
import nni
_logger = logging.getLogger('mnist_example')
_logger.setLevel(logging.INFO)
class MnistModel(Model):
"""
LeNet-5 Model with customizable hyper-parameters
"""
def __init__(self, conv_size, hidden_size, dropout_rate):
"""
Initialize hyper-parameters.
Parameters
----------
conv_size : int
Kernel size of convolutional layers.
hidden_size : int
Dimensionality of last hidden layer.
dropout_rate : float
Dropout rate between two fully connected (dense) layers, to prevent co-adaptation.
"""
super().__init__()
self.conv1 = Conv2D(filters=32, kernel_size=conv_size, activation='relu')
self.pool1 = MaxPool2D(pool_size=2)
self.conv2 = Conv2D(filters=64, kernel_size=conv_size, activation='relu')
self.pool2 = MaxPool2D(pool_size=2)
self.flatten = Flatten()
self.fc1 = Dense(units=hidden_size, activation='relu')
self.dropout = Dropout(rate=dropout_rate)
self.fc2 = Dense(units=10, activation='softmax')
def call(self, x):
"""Override ``Model.call`` to build LeNet-5 model."""
x = self.conv1(x)
x = self.pool1(x)
x = self.conv2(x)
x = self.pool2(x)
x = self.flatten(x)
x = self.fc1(x)
x = self.dropout(x)
return self.fc2(x)
class ReportIntermediates(Callback):
"""
Callback class for reporting intermediate accuracy metrics.
This callback sends accuracy to NNI framework every 100 steps,
so you can view the learning curve on web UI.
If an assessor is configured in experiment's YAML file,
it will use these metrics for early stopping.
"""
def on_epoch_end(self, epoch, logs=None):
"""Reports intermediate accuracy to NNI framework"""
# TensorFlow 2.0 API reference claims the key is `val_acc`, but in fact it's `val_accuracy`
if 'val_acc' in logs:
nni.report_intermediate_result(logs['val_acc'])
else:
nni.report_intermediate_result(logs['val_accuracy'])
def load_dataset():
"""Download and reformat MNIST dataset"""
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]
return (x_train, y_train), (x_test, y_test)
def main(params):
"""
Main program:
- Build network
- Prepare dataset
- Train the model
- Report accuracy to tuner
"""
model = MnistModel(
conv_size=params['conv_size'],
hidden_size=params['hidden_size'],
dropout_rate=params['dropout_rate']
)
optimizer = Adam(learning_rate=params['learning_rate'])
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
_logger.info('Model built')
(x_train, y_train), (x_test, y_test) = load_dataset()
_logger.info('Dataset loaded')
model.fit(
x_train,
y_train,
batch_size=params['batch_size'],
epochs=10,
verbose=0,
callbacks=[ReportIntermediates()],
validation_data=(x_test, y_test)
)
_logger.info('Training completed')
loss, accuracy = model.evaluate(x_test, y_test, verbose=0)
nni.report_final_result(accuracy) # send final accuracy to NNI tuner and web UI
_logger.info('Final accuracy reported: %s', accuracy)
if __name__ == '__main__':
params = {
'dropout_rate': 0.5,
'conv_size': 5,
'hidden_size': 1024,
'batch_size': 32,
'learning_rate': 1e-4,
}
# fetch hyper-parameters from HPO tuner
# comment out following two lines to run the code without NNI framework
tuned_params = nni.get_next_parameter()
params.update(tuned_params)
_logger.info('Hyper-parameters: %s', params)
main(params)

Просмотреть файл

@ -0,0 +1,13 @@
# PyTorch Distributed Data Parallel (DDP)
This example shows how to use Distributed Data Parallel (DDP) with PyTorch on Azure Machine Learning.
## Prerequisites
- Azure Machine Learning Workspace
- Compute Clusters with GPU for distributed training
- Compute Instance with Azure ML CLI 2.0 installed
## Reference
- [PyTorch Distributed Data Parallel (DDP)][1]
[1]: https://pytorch.org/docs/stable/distributed.html

Просмотреть файл

@ -0,0 +1,16 @@
import urllib
import urllib.request
import tarfile
import os
url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
filename = 'cifar-10-python.tar.gz'
data_root = 'data'
filepath = os.path.join(data_root, filename)
if not os.path.isdir(data_root):
os.makedirs(data_root, exist_ok=True)
urllib.request.urlretrieve(url, filepath)
with tarfile.open(filepath, "r:gz") as tar:
tar.extractall(path=data_root)
os.remove(filepath) # delete tar.gz file after extraction

Просмотреть файл

@ -0,0 +1,26 @@
$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json
code:
local_path: src
command: >-
python train.py
--epochs 1000
--data-dir {inputs.cifar}
inputs:
cifar:
data:
local_path: data
mode: mount
environment: azureml:AzureML-pytorch-1.9-ubuntu18.04-py37-cuda11-gpu:3
compute:
target: azureml:gpuclusters2
instance_count: 2
distribution:
type: pytorch
process_count: 4
experiment_name: pytorch-cifar-distributed-example
description: Train a basic convolutional neural network (CNN) with PyTorch on the CIFAR-10 dataset, distributed via PyTorch.

Просмотреть файл

@ -0,0 +1,261 @@
# Copyright (c) 2017 Facebook, Inc. All rights reserved.
# BSD 3-Clause License
#
# Script adapted from: https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html#sphx-glr-beginner-blitz-cifar10-tutorial-py
# ==============================================================================
# imports
import os
import mlflow
import argparse
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# define network architecture
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 32, 3)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(32, 64, 3)
self.conv3 = nn.Conv2d(64, 128, 3)
self.fc1 = nn.Linear(128 * 6 * 6, 120)
self.dropout = nn.Dropout(p=0.2)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = self.pool(F.relu(self.conv2(x)))
x = self.pool(F.relu(self.conv3(x)))
x = x.view(-1, 128 * 6 * 6)
x = self.dropout(F.relu(self.fc1(x)))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
# define functions
def train(train_loader, model, criterion, optimizer, epoch, device, print_freq, rank):
running_loss = 0.0
for i, data in enumerate(train_loader, 0):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data[0].to(device), data[1].to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
# mlflow loss logging
mlflow.log_metric(f"train-loss-{rank}", running_loss)
def evaluate(test_loader, model, device):
classes = (
"plane",
"car",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck",
)
model.eval()
correct = 0
total = 0
class_correct = list(0.0 for i in range(10))
class_total = list(0.0 for i in range(10))
with torch.no_grad():
for data in test_loader:
images, labels = data[0].to(device), data[1].to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
c = (predicted == labels).squeeze()
for i in range(10):
label = labels[i]
class_correct[label] += c[i].item()
class_total[label] += 1
# print total test set accuracy
print(
"Accuracy of the network on the 10000 test images: %d %%"
% (100 * correct / total)
)
mlflow.log_metric("Accuracy of test images", 100 * correct / total)
# print test accuracy for each of the classes
for i in range(10):
print(
"Accuracy of %5s : %2d %%"
% (classes[i], 100 * class_correct[i] / class_total[i])
)
mlflow.log_metric(f"{classes[i]}", 100 * class_correct[i] / class_total[i])
def main(args):
# get PyTorch environment variables
world_size = int(os.environ["WORLD_SIZE"])
rank = int(os.environ["RANK"])
local_rank = int(os.environ["LOCAL_RANK"])
distributed = world_size > 1
# set device
if distributed:
device = torch.device("cuda", local_rank)
else:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# initialize distributed process group using default env:// method
if distributed:
torch.distributed.init_process_group(backend="nccl")
# define train and test dataset DataLoaders
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
train_set = torchvision.datasets.CIFAR10(
root=args.data_dir, train=True, download=False, transform=transform
)
if distributed:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_set)
else:
train_sampler = None
train_loader = torch.utils.data.DataLoader(
train_set,
batch_size=args.batch_size,
shuffle=(train_sampler is None),
num_workers=args.workers,
sampler=train_sampler,
)
test_set = torchvision.datasets.CIFAR10(
root=args.data_dir, train=False, download=False, transform=transform
)
test_loader = torch.utils.data.DataLoader(
test_set, batch_size=args.batch_size, shuffle=False, num_workers=args.workers
)
model = Net().to(device)
# wrap model with DDP
if distributed:
model = nn.parallel.DistributedDataParallel(
model, device_ids=[local_rank], output_device=local_rank
)
# define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(
model.parameters(), lr=args.learning_rate, momentum=args.momentum
)
# train the model
for epoch in range(args.epochs):
print("Rank %d: Starting epoch %d" % (rank, epoch))
if distributed:
train_sampler.set_epoch(epoch)
model.train()
train(
train_loader,
model,
criterion,
optimizer,
epoch,
device,
args.print_freq,
rank,
)
print("Rank %d: Finished Training" % (rank))
if not distributed or rank == 0:
# log model
mlflow.pytorch.log_model(model, "./model")
# evaluate on full test dataset
evaluate(test_loader, model, device)
def parse_args():
# setup argparse
parser = argparse.ArgumentParser()
# add arguments
parser.add_argument(
"--data-dir", type=str, help="directory containing CIFAR-10 dataset"
)
parser.add_argument("--epochs", default=10, type=int, help="number of epochs")
parser.add_argument(
"--batch-size",
default=16,
type=int,
help="mini batch size for each gpu/process",
)
parser.add_argument(
"--workers",
default=2,
type=int,
help="number of data loading workers for each gpu/process",
)
parser.add_argument(
"--learning-rate", default=0.001, type=float, help="learning rate"
)
parser.add_argument("--momentum", default=0.9, type=float, help="momentum")
parser.add_argument(
"--print-freq",
default=200,
type=int,
help="frequency of printing training statistics",
)
# parse args
args = parser.parse_args()
# mlflow logging
mlflow.log_param("batch size", args.batch_size)
mlflow.log_param("learning rate", args.learning_rate)
mlflow.log_param("momentum", args.momentum)
# return args
return args
# run script
if __name__ == "__main__":
# add space in logs
print("*" * 60)
print("\n\n")
# parse args
args = parse_args()
# call main function
main(args)
# add space in logs
print("*" * 60)
print("\n\n")

Просмотреть файл

@ -0,0 +1,17 @@
# FLAML AutoML with RAY
This example shows how to use FLAML to train a model on a dataset using RAY on Azure Machine Learning.
## Prerequisites
- Azure Machine Learning Workspace
- Compute Clusters for Ray
- Compute Instance with Azure ML CLI 2.0 installed
## FLAML with RAY
FLAML is a lightweight Python library that finds accurate machine learning models automatically, efficiently and economically. FLAML support Ray Tune for distributed search.
## Reference
- [FLAML: A Framework for Learning from Data](https://github.com/microsoft/FLAML)

Просмотреть файл

@ -0,0 +1,91 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"from azureml.core import Workspace, Run\n",
"from azureml.tensorboard import Tensorboard\n",
"ws = Workspace.from_config()\n",
"run = Run.get(workspace=ws, run_id=\"cd0a70d1-aa17-4991-a9a5-98dfd0142663\")"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"https://automl-client-6006.japaneast.instances.azureml.ms\n"
]
},
{
"data": {
"text/plain": [
"'https://automl-client-6006.japaneast.instances.azureml.ms'"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"tb = Tensorboard([run], local_root=\"logs/azureml\", port=6006)\n",
"tb.start()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"tb.stop()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"interpreter": {
"hash": "be3ccc1aacd5cd0ada9eab9372c3d7f901636bca88db42a566c3f238cceb324c"
},
"kernelspec": {
"display_name": "Python 3.6.13 64-bit ('ray': conda)",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.13"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

Просмотреть файл

@ -0,0 +1,13 @@
name: ray
channels:
- defaults
- conda-forge
dependencies:
- python=3.7
- pip:
- flaml[notebook, blendsearch, ray, azureml]==0.6.9
- azureml-tensorboard
- ipykernel
- matplotlib
- tensorboardX<=2.2
- mpi4py

Просмотреть файл

@ -0,0 +1,28 @@
$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json
code:
local_path: src
# This is the command that will start up the ray cluster and run the script `train-automl-flaml.py` with the following parameters.
command: >-
python startRay.py
--script train-automl-flaml.py
environment:
conda_file: file:conda.yml
docker:
image: mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04
compute:
target: azureml:cpuclusters
instance_count: 3
distribution:
# The job below is currently launched with `type: pytorch` since that
# gives the full flexibility of assigning the work to the
# no pytorch is actually used in this job
type: pytorch
experiment_name: train-automl-flaml
description: FLAML AutoML on Ray Cluster using Azure Machine Learning Compute Cluster

Просмотреть файл

@ -0,0 +1,100 @@
import threading
import sys
import subprocess
import os
import argparse
from mpi4py import MPI
def flush(proc, proc_log):
while True:
proc_out = proc.stdout.readline()
if proc_out == "" and proc.poll() is not None:
proc_log.close()
break
elif proc_out:
sys.stdout.write(proc_out)
proc_log.write(proc_out)
proc_log.flush()
if __name__ == "__main__":
comm = MPI.COMM_WORLD
mpi_rank = comm.Get_rank()
print("mpi rank:", mpi_rank)
parser = argparse.ArgumentParser()
parser.add_argument("--port", default="6379", type=int)
parser.add_argument("--script", type=str)
args = parser.parse_args()
# this scirpt is for flaml automl training
script = args.script
# port for ray head node
port = args.port
print("head port is ", port)
head_ip = os.environ.get("MASTER_ADDR")
print("head address is ", head_ip)
rank = os.environ.get("RANK")
print("my rank is ", rank)
# TODO:Get Password from Azure KeyVault
password = "password"
# Ray Head Node
if str(rank) == "0":
head_log = open("logs/worker_{rank}_log.txt".format(rank=rank), "w")
cmd = f"ray start --head --port={port} --redis-password={password} --dashboard-port=9999"
print(cmd)
head_proc = subprocess.Popen(
cmd.split(),
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
head_flush = threading.Thread(
target=flush, args=(head_proc, head_log)
)
head_flush.start()
python_log = open("logs/python_{rank}_log.txt".format(rank=rank), "w")
command_line = f"python {script} --redis-password={password}"
driver_proc = subprocess.Popen(
command_line.split(),
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
driver_flush = threading.Thread(
target=flush, args=(driver_proc, python_log)
)
driver_flush.start()
driver_proc.wait()
head_proc.kill()
driver_proc.kill()
print("### Head Job Finished")
# Ray Worker Node
else:
worker_log = open("logs/worker_{rank}_log.txt".format(rank=rank), "w")
cmd = f"ray start --address={head_ip}:{port} --redis-password {password}"
print(cmd)
worker_proc = subprocess.Popen(
cmd.split(),
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
flush(worker_proc, worker_log)
# worker_flush = threading.Thread(
# target=flush, args=(worker_proc, worker_log)
# )
# worker_flush.start()
# worker_proc.wait()

Просмотреть файл

@ -0,0 +1,76 @@
import argparse
import mlflow
import ray
from flaml import AutoML
from sklearn.datasets import load_diabetes
from tensorboardX import SummaryWriter
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--redis-password", default='password')
args = parser.parse_args()
password = args.redis_password
ray.init('auto', _redis_password=f'{password}')
# Initialize an AutoML instance
automl = AutoML()
# Specify automl goal and constraint
automl_settings = {
"time_budget": 300, # in seconds
"metric": 'mse',
"task": 'regression',
"log_type": 'all',
"n_concurrent_trials": 3,
"log_file_name": "./outputs/diabetes.log",
"log_training_metric": True,
"log_type": 'all',
"append_log": True,
}
X_train, y_train = load_diabetes(return_X_y=True)
# Train with labeled input data
# TODO: mlflow logging to Azure ML
mlflow.log_param("n_concurrent_trials", automl_settings['n_concurrent_trials'])
mlflow.log_param("task", automl_settings['task'])
mlflow.log_param("metric", automl_settings['metric'])
mlflow.log_param("time_budget", automl_settings['time_budget'])
try:
automl.fit(X_train=X_train, y_train=y_train, **automl_settings)
except Exception as e:
print(e)
finally:
print('Best ML leaner:', automl.best_estimator)
print('Best hyperparmeter config:', automl.best_config)
print('Best MSE: ', automl.best_loss)
from flaml.data import get_output_from_log
time_history, best_valid_loss_history, valid_loss_history, config_history, metric_history = \
get_output_from_log(filename=automl_settings['log_file_name'], time_budget=240)
print(time_history, best_valid_loss_history, valid_loss_history, config_history, metric_history)
with SummaryWriter(comment='azureml', log_dir="logs/azureml/") as writer:
for config, metric in zip(config_history, metric_history):
hparam_dict_learner = {key: value for key, value in config.items() if key == 'Current Learner'}
hparam_dict_param = config['Current Hyper-parameters']['ml']
writer.add_hparams(hparam_dict=dict(**hparam_dict_learner, **hparam_dict_param), metric_dict=metric)
mlflow.log_metric("mse", metric['train_loss'])
import matplotlib.pyplot as plt
import numpy as np
fig = plt.figure()
plt.title('Learning Curve')
plt.xlabel('Wall Clock Time (s)')
plt.ylabel('mse')
plt.scatter(time_history, np.array(valid_loss_history))
plt.step(time_history, np.array(best_valid_loss_history), where='post')
plt.savefig("figure.png")
mlflow.log_figure(fig, "figure.png")
plt.show()
ray.shutdown()