* Adds PyTorch examples

* Adds pytorch to cookiecutter.json

* Adds pytorch instructions to readme

* Adds modifications for PyTorch
This commit is contained in:
Mat 2019-10-09 21:24:58 +01:00 коммит произвёл GitHub
Родитель db218be2df
Коммит 092a17372e
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
25 изменённых файлов: 1775 добавлений и 20 удалений

Просмотреть файл

@ -5,10 +5,15 @@ The project contains the following:
#### Tensorflow Benchmark
This is a demo template that allows you to easily run [tf_cnn_benchmarks](https://github.com/tensorflow/benchmarks/tree/master/scripts/tf_cnn_benchmarks) on Azure ML. This is a great way to test performance as well as compare to other platforms
#### Tensorflow Imagenet
This is another demo template that shows you how to train a ResNet50 model using Imagenet on Azure. We include scripts for processing the Imagenet data, transforming them to TF Records as well as leveraging AzCopy to quickly upload the data to the cloud.
This is another demo template that shows you how to train a ResNet50 model using Imagenet on Azure. We include scripts for processing the Imagenet data, transforming them to TF Records as well as leveraging AzCopy to quickly upload the data to the cloud.
#### Tensorflow Template
This is a blank template you can use for your own distributed training projects. It allows you to leverage all the tooling built around the previous two demos to speed up the time it takes to run your model in a distributed fashion on Azure.
#### PyTorch Benchmark
This is a demo template that allows you to easily run a simple PyTorch benchmarking script on Azure ML. This is a great way to test performance as well as compare to other platforms
#### PyTorch Imagenet
This is another demo template that shows you how to train a ResNet50 model using Imagenet on Azure. We include scripts for processing the Imagenet data as well as leveraging AzCopy to quickly upload the data to the cloud.
#### PyTorch Template
This is a blank template you can use for your own distributed training projects. It allows you to leverage all the tooling built around the previous two demos to speed up the time it takes to run your model in a distributed fashion on Azure.
# Prerequisites
Before you get started you need a PC running Ubuntu and the following installed:
@ -92,7 +97,7 @@ make run
This will put you in an environment inside your container in a tmux session (for a tutorial on tmux see [here](https://www.hamvocke.com/blog/a-quick-and-easy-guide-to-tmux/)). The tmux control key has been mapped to **ctrl+a** rather than the standard ctrl+b so as not to interfere with outer tmux session if you are already a tmux user. You can alter this in the tmux.conf file in the Docker folder. The docker container will map the location you launched it from to the location /workspace inside the docker container. Therefore you can edit files outside of the container in the project folder and the changes will be reflected inside the container.
## Imagenet data
If you have selected **all** or **imagenet** in the type question during cookiecutter invocation then you will need to have **ILSVRC2012_img_train.tar** and **ILSVRC2012_img_val.tar** present in the direcotry you specified as your data directory. Go to the [download page](http://www.image-net.org/download-images) (you may need to register an account), and find the page for ILSVRC2012. You will need to download the two files mentioned earlier.
If you have selected **all**, **tensorflow_imagenet** or **pytorch_imagenet** in the type question during cookiecutter invocation then you will need to have **ILSVRC2012_img_train.tar** and **ILSVRC2012_img_val.tar** present in the direcotry you specified as your data directory. Go to the [download page](http://www.image-net.org/download-images) (you may need to register an account), and find the page for ILSVRC2012. You will need to download the two files mentioned earlier.
## Template selection
Based on the option you selected for **type** during the cookiecutter invocation you will get all or one of the options below. Cookiecutter will create your project folder which will contain the tempalte folders. When inside your project folder make sure you have run the **make build** and **make run** commands as mentioned in _building environment_ section above. Once you run the run command you will be greeted by a prompt, this is now your control plane. First you will need to set everything up. To do this run
@ -116,7 +121,7 @@ inv tf-benchmark.submit.remote.synthetic
Note that this will create the cluster if it wasn't created earlier and create the appropriate environment.
#### Tensorflow Imagenet
This is the second demo template that will train a ResNet50 model on imagenet. It allows the options of using synthetic data, image data as well as tfrecords. To use this you must either select **imagenet** or **all** when cookiecutter asks what type of project you want to create.
This is the second demo template that will train a ResNet50 model on imagenet. It allows the options of using synthetic data, image data as well as tfrecords. To use this you must either select **tensorflow_imagenet** or **all** when cookiecutter asks what type of project you want to create.
The run things locally using synthetic data simply run:
```
inv tf-imagenet.submit.local.synthetic
@ -131,6 +136,35 @@ This only covers a small number of commands, to see the full list of commands si
#### Tensorflow Experiment
This is the option that you should use if you want to run your own training script. It is up to you to add the appropriate training scripts and modify the tensorflow_experiment.py file to run the appropriate commands. If you want to see how to invoke things simply look at the other examples.
#### Pytorch Benchmark
This is a demo template allows you to easily run a simple PyTorch benchmarking script on Azure ML. To use this you must either select benchmark or all when invoking cookiecutter.
Once setup is complete then simply run:
```bash
inv pytorch-benchmark.submit.local.synthetic
```
to run things locally on a single GPU. Note that the first time you run things you will have to build the environment.
To run things on a cluster simply run:
```bash
inv pytorch-benchmark.submit.remote.synthetic
```
Note that this will create the cluster if it wasn't created earlier and create the appropriate environment.
#### PyTorch Imagenet
This is the second demo template that will train a ResNet50 model on imagenet. It allows the options of using synthetic data or image data. To use this you must either select **pytorch_imagenet** or **all** when cookiecutter asks what type of project you want to create.
The run things locally using synthetic data simply run:
```
inv pytorch-imagenet.submit.local.synthetic
```
To run things on a remote cluster with real data in tfrecords format simply run:
```
inv pytorch-imagenet.submit.remote.tfrecords
```
#### Pytorch Experiment
This is the option that you should use if you want to run your own training script. It is up to you to add the appropriate training scripts and modify the pytorch_experiment.py file to run the appropriate commands. If you want to see how to invoke things simply look at the other examples.
# Architecture
Below is a diagram that shows how the project is set up.
@ -175,6 +209,26 @@ The original project structure is as shown below.
│ ├── storage.py <-- Invoke module for using Azure storage
│ └── tfrecords.py <-- Invoke module for working with tf records
├── tasks.py <-- Main invoke module
├── PyTorch_benchmark<-- Template for running PyTorch benchmarks
│ ├── environment_cpu.yml
│ ├── environment_gpu.yml<-- Conda specification file used by Azure ML to create environment to run project in
│ ├── pytorch_benchmark.py<-- Invoke module for running benchmarks
│ └── src
│ └── pytorch_synthetic_benchmark.py
├── PyTorch_imagenet
│ ├── environment_cpu.yml
│ ├── environment_gpu.yml<-- Conda specification file used by Azure ML to create environment to run project in
│ ├── pytorch_imagenet.py<-- Invoke module for running benchmarks
│ └── src
│ ├── imagenet_pytorch_horovod.py
│ ├── logging.conf
│ └── timer.py
├── PyTorch_experiment<-- PyTorch distributed training template [Put your code here]
│ ├── environment_cpu.yml
│ ├── environment_gpu.yml<-- Conda specification file used by Azure ML to create environment to run project in
│ ├── pytorch_experiment.py<-- Invoke module for running benchmarks
│ └── src
│ └── train_model.py
├── TensorFlow_benchmark <-- Template for running Tensorflow benchmarks
│ ├── environment_cpu.yml
│ ├── environment_gpu.yml <-- Conda specification file used by Azure ML to create environment to run project in
@ -223,6 +277,12 @@ inv --list
select-subscription Select Azure subscription to use
setup Setup the environment and process the imagenet data
tensorboard Runs tensorboard in a seperate tmux session
pytorch-benchmark.submit.local.synthetic Submit PyTorch training job using synthetic data for local execution
pytorch-benchmark.submit.remote.synthetic Submit PyTorch training job using synthetic data to remote cluster
pytorch-imagenet.submit.local.images Submit PyTorch training job using real imagenet data for local execution
pytorch-imagenet.submit.local.synthetic Submit PyTorch training job using synthetic imagenet data for local execution
pytorch-imagenet.submit.remote.images Submit PyTorch training job using real imagenet data to remote cluster
pytorch-imagenet.submit.remote.synthetic Submit PyTorch training job using synthetic imagenet data to remote cluster
storage.create-resource-group
storage.store-key Retrieves premium storage account key from Azure and stores it in .env file
storage.image.create-container Creates container based on the parameters found in the .env file

Просмотреть файл

@ -17,9 +17,12 @@
"container_registry": "dockerhub",
"type": [
"all",
"template",
"benchmark",
"imagenet"
"tesnorflow_template",
"tesnorflow_benchmark",
"tesnorflow_imagenet",
"pytorch_benchmark",
"pytorch_imagenet",
"pytorch_template"
],
"region": [
"eastus",

Просмотреть файл

@ -16,11 +16,15 @@ def _remove_directories(*directories):
def _copy_env_file():
shutil.move("_dotenv_template", ".env")
_ALL_DIRECTORIES = "TensorFlow_benchmark", "TensorFlow_imagenet", "TensorFlow_experiment", "PyTorch_benchmark", "PyTorch_imagenet", "PyTorch_experiment"
_CHOICES_DICT = {
"template": ("TensorFlow_benchmark", "TensorFlow_imagenet"),
"benchmark": ("TensorFlow_experiment", "TensorFlow_imagenet"),
"imagenet": ("TensorFlow_benchmark", "TensorFlow_experiment")
"tensorflow_template": filter(lambda x: x.lower()!="tensorflow_experiment", _ALL_DIRECTORIES),
"tensorflow_benchmark": filter(lambda x: x.lower()!="tensorflow_benchmark", _ALL_DIRECTORIES),
"tensorflow_imagenet": filter(lambda x: x.lower()!="tensorflow_imagenet", _ALL_DIRECTORIES),
"pytorch_imagenet": filter(lambda x: x.lower()!="pytorch_imagenet", _ALL_DIRECTORIES),
"pytorch_benchmark": filter(lambda x: x.lower()!="pytorch_benchmark", _ALL_DIRECTORIES),
"pytorch_template": filter(lambda x: x.lower()!="pytorch_experiment", _ALL_DIRECTORIES),
}
if __name__ == "__main__":

Просмотреть файл

@ -0,0 +1,15 @@
name: project_environment
dependencies:
# The python interpreter version.
# Currently Azure ML only supports 3.5.2 and later.
- python=3.6.2
- pandas
- numpy
- pip:
# Required packages for AzureML execution, history, and data preparation.
- azureml-defaults
- torch-cpu==1.0.0
- torchvision-cpu
- horovod==0.15.2
- pillow
- tensorboardX

Просмотреть файл

@ -0,0 +1,15 @@
name: project_environment
dependencies:
# The python interpreter version.
# Currently Azure ML only supports 3.5.2 and later.
- python=3.6.2
- pandas
- numpy
- pip:
# Required packages for AzureML execution, history, and data preparation.
- azureml-defaults
- torch==1.0.0
- torchvision==0.2.1
- horovod==0.15.2
- pillow
- tensorboardX

Просмотреть файл

@ -0,0 +1,59 @@
"""Module for running PyTorch benchmark using synthetic data
"""
from invoke import task, Collection
import os
from config import load_config
_BASE_PATH = os.path.dirname(os.path.abspath(__file__))
env_values = load_config()
@task
def submit_benchmark_remote(c, node_count=int(env_values["CLUSTER_MAX_NODES"])):
"""Submit PyTorch training job using synthetic data to remote cluster
Args:
node_count (int, optional): The number of nodes to use in cluster. Defaults to env_values['CLUSTER_MAX_NODES'].
"""
from aml_compute import PyTorchExperimentCLI
exp = PyTorchExperimentCLI("synthetic_benchmark_remote")
run = exp.submit(
os.path.join(_BASE_PATH, "src"),
"pytorch_synthetic_benchmark.py",
{"--model": "resnet50", "--batch-size": 64},
node_count=node_count,
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
wait_for_completion=True,
)
print(run)
@task
def submit_benchmark_local(c):
"""Submit PyTorch training job using synthetic data for local execution
"""
from aml_compute import TFExperimentCLI
exp = TFExperimentCLI("synthetic_images_local")
run = exp.submit_local(
os.path.join(_BASE_PATH, "src"),
"pytorch_synthetic_benchmark.py",
{"--model": "resnet50", "--batch-size": 64},
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
wait_for_completion=True,
)
print(run)
remote_collection = Collection("remote")
remote_collection.add_task(submit_benchmark_remote, "synthetic")
local_collection = Collection("local")
local_collection.add_task(submit_benchmark_local, "synthetic")
submit_collection = Collection("submit", local_collection, remote_collection)
namespace = Collection("pytorch_benchmark", submit_collection)

Просмотреть файл

@ -0,0 +1,126 @@
from __future__ import print_function
import argparse
import torch.backends.cudnn as cudnn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from torchvision import models
import horovod.torch as hvd
import timeit
import numpy as np
# Benchmark settings
parser = argparse.ArgumentParser(
description="PyTorch Synthetic Benchmark",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--fp16-allreduce",
action="store_true",
default=False,
help="use fp16 compression during allreduce",
)
parser.add_argument("--model", type=str, default="resnet50", help="model to benchmark")
parser.add_argument("--batch-size", type=int, default=32, help="input batch size")
parser.add_argument(
"--num-warmup-batches",
type=int,
default=10,
help="number of warm-up batches that don't count towards benchmark",
)
parser.add_argument(
"--num-batches-per-iter",
type=int,
default=10,
help="number of batches per benchmark iteration",
)
parser.add_argument(
"--num-iters", type=int, default=10, help="number of benchmark iterations"
)
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables CUDA training"
)
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
hvd.init()
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
cudnn.benchmark = True
# Set up standard model.
model = getattr(models, args.model)()
if args.cuda:
# Move model to GPU.
model.cuda()
optimizer = optim.SGD(model.parameters(), lr=0.01)
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters(), compression=compression
)
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# Set up fixed fake data
data = torch.randn(args.batch_size, 3, 224, 224)
target = torch.LongTensor(args.batch_size).random_() % 1000
if args.cuda:
data, target = data.cuda(), target.cuda()
def benchmark_step():
optimizer.zero_grad()
output = model(data)
loss = F.cross_entropy(output, target)
loss.backward()
optimizer.step()
def log(s, nl=True):
if hvd.rank() != 0:
return
print(s, end="\n" if nl else "")
log("Model: %s" % args.model)
log("Batch size: %d" % args.batch_size)
device = "GPU" if args.cuda else "CPU"
log("Number of %ss: %d" % (device, hvd.size()))
# Warm-up
log("Running warmup...")
timeit.timeit(benchmark_step, number=args.num_warmup_batches)
# Benchmark
log("Running benchmark...")
img_secs = []
for x in range(args.num_iters):
time = timeit.timeit(benchmark_step, number=args.num_batches_per_iter)
img_sec = args.batch_size * args.num_batches_per_iter / time
log("Iter #%d: %.1f img/sec per %s" % (x, img_sec, device))
img_secs.append(img_sec)
# Results
img_sec_mean = np.mean(img_secs)
img_sec_conf = 1.96 * np.std(img_secs)
log("Img/sec per %s: %.1f +-%.1f" % (device, img_sec_mean, img_sec_conf))
log(
"Total img/sec on %d %s(s): %.1f +-%.1f"
% (hvd.size(), device, hvd.size() * img_sec_mean, hvd.size() * img_sec_conf)
)

Просмотреть файл

@ -0,0 +1,16 @@
name: project_environment
dependencies:
# The python interpreter version.
# Currently Azure ML only supports 3.5.2 and later.
- python=3.6.2
- pandas
- numpy
- pip:
# Required packages for AzureML execution, history, and data preparation.
- azureml-defaults
- torch-cpu==1.0.0
- torchvision-cpu
- horovod==0.15.2
- pillow
- fire
- tensorboardX

Просмотреть файл

@ -0,0 +1,17 @@
name: project_environment
dependencies:
# The python interpreter version.
# Currently Azure ML only supports 3.5.2 and later.
- python=3.6.2
- pandas
- numpy
- pip:
# Required packages for AzureML execution, history, and data preparation.
- azureml-defaults
- torch==1.0.0
- torchvision
- horovod==0.15.2
- pillow
- fire
- tensorboardX
- tqdm

Просмотреть файл

@ -0,0 +1,257 @@
from __future__ import print_function
import argparse
import torch.backends.cudnn as cudnn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from torchvision import datasets, transforms, models
import horovod.torch as hvd
import tensorboardX
import os
from tqdm import tqdm
# Training settings
parser = argparse.ArgumentParser(description='PyTorch ImageNet Example',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'),
help='path to training data')
parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'),
help='path to validation data')
parser.add_argument('--log-dir', default='./logs',
help='tensorboard log directory')
parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar',
help='checkpoint file format')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
# Default settings from https://arxiv.org/abs/1706.02677.
parser.add_argument('--batch-size', type=int, default=32,
help='input batch size for training')
parser.add_argument('--val-batch-size', type=int, default=32,
help='input batch size for validation')
parser.add_argument('--epochs', type=int, default=90,
help='number of epochs to train')
parser.add_argument('--base-lr', type=float, default=0.0125,
help='learning rate for a single GPU')
parser.add_argument('--warmup-epochs', type=float, default=5,
help='number of warmup epochs')
parser.add_argument('--momentum', type=float, default=0.9,
help='SGD momentum')
parser.add_argument('--wd', type=float, default=0.00005,
help='weight decay')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42,
help='random seed')
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
hvd.init()
torch.manual_seed(args.seed)
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(args.seed)
cudnn.benchmark = True
# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
for try_epoch in range(args.epochs, 0, -1):
if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
resume_from_epoch = try_epoch
break
# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(torch.tensor(resume_from_epoch), root_rank=0,
name='resume_from_epoch').item()
# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0
# Horovod: write TensorBoard logs on first worker.
log_writer = tensorboardX.SummaryWriter(args.log_dir) if hvd.rank() == 0 else None
# kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {}
kwargs={'num_workers': 4}
train_dataset = \
datasets.ImageFolder(args.train_dir,
transform=transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
]))
# Horovod: use DistributedSampler to partition data among workers. Manually specify
# `num_replicas=hvd.size()` and `rank=hvd.rank()`.
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
val_dataset = \
datasets.ImageFolder(args.val_dir,
transform=transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
]))
val_sampler = torch.utils.data.distributed.DistributedSampler(
val_dataset, num_replicas=hvd.size(), rank=hvd.rank())
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=args.val_batch_size,
sampler=val_sampler, **kwargs)
print('Model')
# Set up standard ResNet-50 model.
model = models.resnet50()
if args.cuda:
# Move model to GPU.
model.cuda()
# Horovod: scale learning rate by the number of GPUs.
optimizer = optim.SGD(model.parameters(), lr=args.base_lr * hvd.size(),
momentum=args.momentum, weight_decay=args.wd)
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(optimizer,
named_parameters=model.named_parameters(),
compression=compression)
# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast weights to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
filepath = args.checkpoint_format.format(epoch=resume_from_epoch)
checkpoint = torch.load(filepath)
model.load_state_dict(checkpoint['model'])
optimizer.load_state_dict(checkpoint['optimizer'])
print('Broadcasting')
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
def train(epoch):
model.train()
train_sampler.set_epoch(epoch)
train_loss = Metric('train_loss')
train_accuracy = Metric('train_accuracy')
with tqdm(total=len(train_loader),
desc='Train Epoch #{}'.format(epoch + 1),
disable=not verbose) as t:
for batch_idx, (data, target) in enumerate(train_loader):
adjust_learning_rate(epoch, batch_idx)
if args.cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = F.cross_entropy(output, target)
loss.backward()
optimizer.step()
train_loss.update(loss)
train_accuracy.update(accuracy(output, target))
t.set_postfix({'loss': train_loss.avg.item(),
'accuracy': 100. * train_accuracy.avg.item()})
t.update(1)
if log_writer:
log_writer.add_scalar('train/loss', train_loss.avg, epoch)
log_writer.add_scalar('train/accuracy', train_accuracy.avg, epoch)
def validate(epoch):
model.eval()
val_loss = Metric('val_loss')
val_accuracy = Metric('val_accuracy')
with tqdm(total=len(val_loader),
desc='Validate Epoch #{}'.format(epoch + 1),
disable=not verbose) as t:
with torch.no_grad():
for data, target in val_loader:
if args.cuda:
data, target = data.cuda(), target.cuda()
output = model(data)
val_loss.update(F.cross_entropy(output, target))
val_accuracy.update(accuracy(output, target))
t.set_postfix({'loss': val_loss.avg.item(),
'accuracy': 100. * val_accuracy.avg.item()})
t.update(1)
if log_writer:
log_writer.add_scalar('val/loss', val_loss.avg, epoch)
log_writer.add_scalar('val/accuracy', val_accuracy.avg, epoch)
# Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
# After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
def adjust_learning_rate(epoch, batch_idx):
if epoch < args.warmup_epochs:
epoch += float(batch_idx + 1) / len(train_loader)
lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1)
elif epoch < 30:
lr_adj = 1.
elif epoch < 60:
lr_adj = 1e-1
elif epoch < 80:
lr_adj = 1e-2
else:
lr_adj = 1e-3
for param_group in optimizer.param_groups:
param_group['lr'] = args.base_lr * hvd.size() * lr_adj
def accuracy(output, target):
# get the index of the max log-probability
pred = output.max(1, keepdim=True)[1]
return pred.eq(target.view_as(pred)).cpu().float().mean()
def save_checkpoint(epoch):
if hvd.rank() == 0:
filepath = args.checkpoint_format.format(epoch=epoch + 1)
state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
}
torch.save(state, filepath)
# Horovod: average metrics from distributed training.
class Metric(object):
def __init__(self, name):
self.name = name
self.sum = torch.tensor(0.)
self.n = torch.tensor(0.)
def update(self, val):
self.sum += hvd.allreduce(val.detach().cpu(), name=self.name)
self.n += 1
@property
def avg(self):
return self.sum / self.n
print('Training')
for epoch in range(resume_from_epoch, args.epochs):
train(epoch)
validate(epoch)
save_checkpoint(epoch)

Просмотреть файл

@ -0,0 +1,27 @@
[loggers]
keys=root,__main__
[handlers]
keys=consoleHandler
[formatters]
keys=simpleFormatter
[logger_root]
level=WARNING
handlers=consoleHandler
[logger___main__]
level=DEBUG
handlers=consoleHandler
qualname=__main__
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=simpleFormatter
args=(sys.stdout,)
[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s

Просмотреть файл

@ -0,0 +1,105 @@
import collections
import functools
import logging
from timeit import default_timer
class Timer(object):
"""
Keyword arguments:
output: if True, print output after exiting context.
if callable, pass output to callable.
format: str.format string to be used for output; default "took {} seconds"
prefix: string to prepend (plus a space) to output
For convenience, if you only specify this, output defaults to True.
"""
def __init__(self,
timer=default_timer,
factor=1,
output=None,
fmt="took {:.3f} seconds",
prefix=""):
self._timer = timer
self._factor = factor
self._output = output
self._fmt = fmt
self._prefix = prefix
self._end = None
self._start = None
def start(self):
self._start = self()
def stop(self):
self._end = self()
def __call__(self):
""" Return the current time """
return self._timer()
def __enter__(self):
""" Set the start time """
self.start()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
""" Set the end time """
self.stop()
if self._output is True or (self._output is None and self._prefix):
self._output = print
if callable(self._output):
output = " ".join([self._prefix, self._fmt.format(self.elapsed)])
self._output(output)
def __str__(self):
return '%.3f' % (self.elapsed)
@property
def elapsed(self):
""" Return the elapsed time
"""
if self._end is None:
# if elapsed is called in the context manager scope
return (self() - self._start) * self._factor
else:
# if elapsed is called out of the context manager scope
return (self._end - self._start) * self._factor
def timer(logger=None,
level=logging.INFO,
fmt="function %(function_name)s execution time: %(execution_time).3f",
*func_or_func_args,
**timer_kwargs):
""" Function decorator displaying the function execution time
"""
def wrapped_f(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
with Timer(**timer_kwargs) as t:
out = f(*args, **kwargs)
context = {
'function_name': f.__name__,
'execution_time': t.elapsed,
}
if logger:
logger.log(
level,
fmt % context,
extra=context)
else:
print(fmt % context)
return out
return wrapped
if (len(func_or_func_args) == 1
and isinstance(func_or_func_args[0], collections.Callable)):
return wrapped_f(func_or_func_args[0])
else:
return wrapped_f

Просмотреть файл

@ -0,0 +1,16 @@
name: project_environment
dependencies:
# The python interpreter version.
# Currently Azure ML only supports 3.5.2 and later.
- python=3.6.2
- pandas
- numpy
- pip:
# Required packages for AzureML execution, history, and data preparation.
- azureml-defaults
- torch-cpu==1.0.0
- torchvision-cpu
- horovod==0.15.2
- pillow
- fire
- tensorboardX

Просмотреть файл

@ -0,0 +1,16 @@
name: project_environment
dependencies:
# The python interpreter version.
# Currently Azure ML only supports 3.5.2 and later.
- python=3.6.2
- pandas
- numpy
- pip:
# Required packages for AzureML execution, history, and data preparation.
- azureml-defaults
- torch==1.0.0
- torchvision==0.2.1
- horovod==0.15.2
- pillow
- fire
- tensorboardX

Просмотреть файл

@ -0,0 +1,119 @@
"""Module for running PyTorch training on Imagenet data
"""
from invoke import task, Collection
import os
from config import load_config
_BASE_PATH = os.path.dirname(os.path.abspath(__file__))
env_values = load_config()
@task
def submit_synthetic(c, node_count=int(env_values["CLUSTER_MAX_NODES"]), epochs=1):
"""Submit PyTorch training job using synthetic imagenet data to remote cluster
Args:
node_count (int, optional): The number of nodes to use in cluster. Defaults to env_values['CLUSTER_MAX_NODES'].
epochs (int, optional): Number of epochs to run training for. Defaults to 1.
"""
from aml_compute import PyTorchExperimentCLI
exp = PyTorchExperimentCLI("pytorch_synthetic_images_remote")
run = exp.submit(
os.path.join(_BASE_PATH, "src"),
"imagenet_pytorch_horovod.py",
{"--epochs": epochs, "--use_gpu":True},
node_count=node_count,
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
wait_for_completion=True,
)
print(run)
@task
def submit_synthetic_local(c, epochs=1):
"""Submit PyTorch training job using synthetic imagenet data for local execution
Args:
epochs (int, optional): Number of epochs to run training for. Defaults to 1.
"""
from aml_compute import PyTorchExperimentCLI
exp = PyTorchExperimentCLI("pytorch_synthetic_images_local")
run = exp.submit_local(
os.path.join(_BASE_PATH, "src"),
"imagenet_pytorch_horovod.py",
{"--epochs": epochs, "--use_gpu":True},
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
wait_for_completion=True,
)
print(run)
@task
def submit_images(c, node_count=int(env_values["CLUSTER_MAX_NODES"]), epochs=1):
"""Submit PyTorch training job using real imagenet data to remote cluster
Args:
node_count (int, optional): The number of nodes to use in cluster. Defaults to env_values['CLUSTER_MAX_NODES'].
epochs (int, optional): Number of epochs to run training for. Defaults to 1.
"""
from aml_compute import PyTorchExperimentCLI
exp = PyTorchExperimentCLI("pytorch_real_images_remote")
run = exp.submit(
os.path.join(_BASE_PATH, "src"),
"imagenet_pytorch_horovod.py",
{
"--use_gpu":True,
"--epochs":epochs,
"--training_data_path": "{datastore}/train",
"--validation_data_path": "{datastore}/validation",
},
node_count=node_count,
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
wait_for_completion=True,
)
print(run)
@task
def submit_images_local(c, epochs=1):
"""Submit PyTorch training job using real imagenet data for local execution
Args:
epochs (int, optional): Number of epochs to run training for. Defaults to 1.
"""
from aml_compute import PyTorchExperimentCLI
exp = PyTorchExperimentCLI("pytorch_real_images_local")
run = exp.submit_local(
os.path.join(_BASE_PATH, "src"),
"imagenet_pytorch_horovod.py",
{
"--epochs": epochs,
"--use_gpu":True,
"--training_data_path": "/data/train",
"--validation_data_path": "/data/validation",
},
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
docker_args=["-v", f"{env_values['DATA']}:/data"],
wait_for_completion=True,
)
print(run)
remote_collection = Collection("remote")
remote_collection.add_task(submit_images, "images")
remote_collection.add_task(submit_synthetic, "synthetic")
local_collection = Collection("local")
local_collection.add_task(submit_images_local, "images")
local_collection.add_task(submit_synthetic_local, "synthetic")
submit_collection = Collection("submit", local_collection, remote_collection)
namespace = Collection("pytorch_imagenet", submit_collection)

Просмотреть файл

@ -0,0 +1,446 @@
""" Trains ResNet50 in PyTorch using Horovod.
"""
import logging
import logging.config
import os
import shutil
from os import path
import fire
import numpy as np
import torch.backends.cudnn as cudnn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
import torchvision.models as models
from azureml.core.run import Run
from tensorboardX import SummaryWriter
from torch.utils.data import Dataset
from torchvision import transforms, datasets
from timer import Timer
def _str_to_bool(in_str):
if "t" in in_str.lower():
return True
else:
return False
_WIDTH = 224
_HEIGHT = 224
_CHANNELS = 3
_LR = 0.001
_EPOCHS = os.getenv("EPOCHS", 5)
_BATCHSIZE = 64
_RGB_MEAN = [0.485, 0.456, 0.406]
_RGB_SD = [0.229, 0.224, 0.225]
_SEED = 42
# Settings from https://arxiv.org/abs/1706.02677.
_WARMUP_EPOCHS = 5
_WEIGHT_DECAY = 0.00005
_DATA_LENGTH = int(
os.getenv("FAKE_DATA_LENGTH", 1281167) # 1281167
) # How much fake data to simulate, default to size of imagenet dataset
_DISTRIBUTED = _str_to_bool(os.getenv("DISTRIBUTED", "False"))
if _DISTRIBUTED:
import horovod.torch as hvd
hvd.init()
def _get_rank():
if _DISTRIBUTED:
try:
return hvd.rank()
except:
return 0
else:
return 0
def _append_path_to(data_path, data_series):
return data_series.apply(lambda x: path.join(data_path, x))
def _create_data(batch_size, num_batches, dim, channels, seed=42):
np.random.seed(seed)
return np.random.rand(batch_size * num_batches, channels, dim[0], dim[1]).astype(
np.float32
)
def _create_labels(batch_size, num_batches, n_classes):
return np.random.choice(n_classes, batch_size * num_batches)
class FakeData(Dataset):
def __init__(
self,
batch_size=32,
num_batches=20,
dim=(224, 224),
n_channels=3,
n_classes=10,
length=_DATA_LENGTH,
data_transform=None,
):
self.dim = dim
self.n_channels = n_channels
self.n_classes = n_classes
self.num_batches = num_batches
self._data = _create_data(
batch_size, self.num_batches, self.dim, self.n_channels
)
self._labels = _create_labels(batch_size, self.num_batches, self.n_classes)
self.translation_index = np.random.choice(len(self._labels), length)
self._length = length
self._data_transform = data_transform
logger = logging.getLogger(__name__)
logger.info(
"Creating fake data {} labels and {} images".format(
n_classes, len(self._data)
)
)
def __getitem__(self, idx):
logger = logging.getLogger(__name__)
logger.debug("Retrieving samples")
logger.debug(str(idx))
tr_index_array = self.translation_index[idx]
if self._data_transform is not None:
data = self._data_transform(self._data[tr_index_array])
else:
data = self._data[tr_index_array]
return data, self._labels[tr_index_array]
def __len__(self):
return self._length
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self._val = 0
self._sum = 0
self._count = 0
def update(self, val, n=1):
self._val = val
self._sum += val * n
self._count += n
@property
def avg(self):
return self._sum / self._count
def accuracy(output, target, topk=(1,)):
"""Computes the accuracy over the k top predictions for the specified values of k"""
with torch.no_grad():
maxk = max(topk)
batch_size = target.size(0)
_, pred = output.topk(maxk, 1, True, True)
pred = pred.t()
correct = pred.eq(target.view(1, -1).expand_as(pred))
res = []
for k in topk:
correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
res.append(correct_k.mul_(100.0 / batch_size))
return res
def train(train_loader, model, criterion, optimizer, base_lr, warmup_epochs, epoch):
logger = logging.getLogger(__name__)
batch_time = AverageMeter()
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
msg = " duration({}) loss:{} total-samples: {}"
t = Timer()
t.start()
for i, (data, target) in enumerate(train_loader):
adjust_learning_rate(optimizer, base_lr, warmup_epochs, train_loader, epoch, i)
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
# compute output
output = model(data)
loss = criterion(output, target)
# compute gradient and do SGD step
loss.backward()
optimizer.step()
losses.update(loss.item(), data.size(0))
acc1, acc5 = accuracy(output, target, topk=(1, 5))
top1.update(acc1.item(), data.size(0))
top5.update(acc5.item(), data.size(0))
if i % 100 == 0:
t.stop()
batch_time.update(t.elapsed, n=100)
logger.info(msg.format(t.elapsed, loss.item(), i * len(data)))
t.start()
return {"acc": top1.avg, "loss": losses.avg, "batch_time": batch_time.avg}
def validate(val_loader, model, criterion, device):
losses = AverageMeter()
top1 = AverageMeter()
top5 = AverageMeter()
logger = logging.getLogger(__name__)
msg = " duration({}) loss:{} total-samples: {}"
t = Timer()
t.start()
for i, (data, target) in enumerate(val_loader):
logger.debug("bug")
data, target = (
data.to(device, non_blocking=True),
target.to(device, non_blocking=True),
)
# compute output
output = model(data)
loss = criterion(output, target)
losses.update(loss.item(), data.size(0))
acc1, acc5 = accuracy(output, target, topk=(1, 5))
top1.update(acc1.item(), data.size(0))
top5.update(acc5.item(), data.size(0))
if i % 100 == 0:
logger.info(msg.format(t.elapsed, loss.item(), i * len(data)))
t.start()
return {"acc": top1.avg, "loss": losses.avg}
def _log_summary(data_length, duration, batch_size):
logger = logging.getLogger(__name__)
images_per_second = data_length / duration
logger.info("Data length: {}".format(data_length))
logger.info("Total duration: {:.3f}".format(duration))
logger.info("Total images/sec: {:.3f}".format(images_per_second))
logger.info(
"Batch size: (Per GPU {}: Total {})".format(
batch_size, hvd.size() * batch_size if _DISTRIBUTED else batch_size
)
)
logger.info("Distributed: {}".format("True" if _DISTRIBUTED else "False"))
logger.info("Num GPUs: {:.3f}".format(hvd.size() if _DISTRIBUTED else 1))
def _get_sampler(dataset, is_distributed=_DISTRIBUTED):
if is_distributed:
return torch.utils.data.distributed.DistributedSampler(
dataset, num_replicas=hvd.size(), rank=hvd.rank()
)
else:
return torch.utils.data.sampler.RandomSampler(dataset)
def save_checkpoint(model, optimizer, filepath):
if hvd.rank() == 0:
state = {"model": model.state_dict(), "optimizer": optimizer.state_dict()}
torch.save(state, filepath)
# Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
# After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
def adjust_learning_rate(
optimizer, base_lr, warmup_epochs, data_loader, epoch, batch_idx
):
logger = logging.getLogger(__name__)
size = hvd.size() if _DISTRIBUTED else 1
if epoch < warmup_epochs:
epoch += float(batch_idx + 1) / len(data_loader)
lr_adj = 1.0 / size * (epoch * (size - 1) / warmup_epochs + 1)
elif epoch < 30:
lr_adj = 1.0
elif epoch < 60:
lr_adj = 1e-1
elif epoch < 80:
lr_adj = 1e-2
else:
lr_adj = 1e-3
for param_group in optimizer.param_groups:
new_lr = base_lr * size * lr_adj
if param_group["lr"]!=new_lr:
param_group["lr"] = new_lr
if _get_rank()==0:
logger.info(f"setting lr to {param_group['lr']}")
def main(
training_data_path=None,
validation_data_path=None,
use_gpu=False,
save_filepath=None,
model="resnet50",
epochs=_EPOCHS,
batch_size=_BATCHSIZE,
fp16_allreduce=False,
base_lr=0.0125,
warmup_epochs=5,
):
logger = logging.getLogger(__name__)
device = torch.device("cuda" if use_gpu else "cpu")
logger.info(f"Running on {device}")
if _DISTRIBUTED:
# Horovod: initialize Horovod.
logger.info("Running Distributed")
torch.manual_seed(_SEED)
if use_gpu:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(_SEED)
logger.info("PyTorch version {}".format(torch.__version__))
# Horovod: write TensorBoard logs on first worker.
if (_DISTRIBUTED and hvd.rank() == 0) or not _DISTRIBUTED:
run = Run.get_context()
run.tag("model", value=model)
logs_dir = os.path.join(os.curdir, "logs")
if os.path.exists(logs_dir):
logger.debug(f"Log directory {logs_dir} found | Deleting")
shutil.rmtree(logs_dir)
summary_writer = SummaryWriter(logdir=logs_dir)
if training_data_path is None:
logger.info("Setting up fake loaders")
train_dataset = FakeData(n_classes=1000, data_transform=torch.FloatTensor)
validation_dataset = None
else:
normalize = transforms.Normalize(_RGB_MEAN, _RGB_SD)
logger.info("Setting up loaders")
logger.info(f"Loading training from {training_data_path}")
train_dataset = datasets.ImageFolder(
training_data_path,
transforms.Compose(
[
transforms.RandomResizedCrop(_WIDTH),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize,
]
),
)
if validation_data_path is not None:
logger.info(f"Loading validation from {validation_data_path}")
validation_dataset = datasets.ImageFolder(
validation_data_path,
transforms.Compose(
[
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
normalize,
]
),
)
train_sampler = _get_sampler(train_dataset)
kwargs = {"num_workers": 5, "pin_memory": True}
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=batch_size, sampler=train_sampler, **kwargs
)
if validation_data_path is not None:
val_sampler = _get_sampler(validation_dataset)
val_loader = torch.utils.data.DataLoader(
validation_dataset, batch_size=batch_size, sampler=val_sampler, **kwargs
)
# Autotune
cudnn.benchmark = True
logger.info("Loading model")
# Load symbol
model = models.__dict__[model](pretrained=False)
# model.to(device)
if use_gpu:
# Move model to GPU.
model.cuda()
# # Horovod: (optional) compression algorithm.
# compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
num_gpus = hvd.size() if _DISTRIBUTED else 1
# Horovod: scale learning rate by the number of GPUs.
optimizer = optim.SGD(model.parameters(), lr=_LR * num_gpus, momentum=0.9)
if _DISTRIBUTED:
compression = hvd.Compression.fp16 if fp16_allreduce else hvd.Compression.none
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters(),
compression=compression,
)
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
criterion = F.cross_entropy
# Main training-loop
logger.info("Training ...")
for epoch in range(epochs):
with Timer(output=logger.info, prefix=f"Training epoch {epoch} ") as t:
model.train()
if _DISTRIBUTED:
train_sampler.set_epoch(epoch)
metrics = train(
train_loader, model, criterion, optimizer, base_lr, warmup_epochs, epoch
)
if (_DISTRIBUTED and hvd.rank() == 0) or not _DISTRIBUTED:
run.log_row("Training metrics", epoch=epoch, **metrics)
summary_writer.add_scalar("Train/Loss", metrics["loss"], epoch)
summary_writer.add_scalar("Train/Acc", metrics["acc"], epoch)
summary_writer.add_scalar("Train/BatchTime", metrics["batch_time"], epoch)
if validation_data_path is not None:
model.eval()
metrics = validate(val_loader, model, criterion, device)
if (_DISTRIBUTED and hvd.rank() == 0) or not _DISTRIBUTED:
run.log_row("Validation metrics", epoch=epoch, **metrics)
summary_writer.add_scalar("Validation/Loss", metrics["loss"], epoch)
summary_writer.add_scalar("Validation/Acc", metrics["acc"], epoch)
if save_filepath is not None:
save_checkpoint(model, optimizer, save_filepath)
_log_summary(epochs * len(train_dataset), t.elapsed, batch_size)
if __name__ == "__main__":
logging.config.fileConfig(os.getenv("LOG_CONFIG", "logging.conf"))
fire.Fire(main)

Просмотреть файл

@ -0,0 +1,27 @@
[loggers]
keys=root,__main__
[handlers]
keys=consoleHandler
[formatters]
keys=simpleFormatter
[logger_root]
level=WARNING
handlers=consoleHandler
[logger___main__]
level=INFO
handlers=consoleHandler
qualname=__main__
propagate=0
[handler_consoleHandler]
class=StreamHandler
level=INFO
formatter=simpleFormatter
args=(sys.stdout,)
[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s

Просмотреть файл

@ -0,0 +1,105 @@
import collections
import functools
import logging
from timeit import default_timer
class Timer(object):
"""
Keyword arguments:
output: if True, print output after exiting context.
if callable, pass output to callable.
format: str.format string to be used for output; default "took {} seconds"
prefix: string to prepend (plus a space) to output
For convenience, if you only specify this, output defaults to True.
"""
def __init__(self,
timer=default_timer,
factor=1,
output=None,
fmt="took {:.3f} seconds",
prefix=""):
self._timer = timer
self._factor = factor
self._output = output
self._fmt = fmt
self._prefix = prefix
self._end = None
self._start = None
def start(self):
self._start = self()
def stop(self):
self._end = self()
def __call__(self):
""" Return the current time """
return self._timer()
def __enter__(self):
""" Set the start time """
self.start()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
""" Set the end time """
self.stop()
if self._output is True or (self._output is None and self._prefix):
self._output = print
if callable(self._output):
output = " ".join([self._prefix, self._fmt.format(self.elapsed)])
self._output(output)
def __str__(self):
return '%.3f' % (self.elapsed)
@property
def elapsed(self):
""" Return the elapsed time
"""
if self._end is None:
# if elapsed is called in the context manager scope
return (self() - self._start) * self._factor
else:
# if elapsed is called out of the context manager scope
return (self._end - self._start) * self._factor
def timer(logger=None,
level=logging.INFO,
fmt="function %(function_name)s execution time: %(execution_time).3f",
*func_or_func_args,
**timer_kwargs):
""" Function decorator displaying the function execution time
"""
def wrapped_f(f):
@functools.wraps(f)
def wrapped(*args, **kwargs):
with Timer(**timer_kwargs) as t:
out = f(*args, **kwargs)
context = {
'function_name': f.__name__,
'execution_time': t.elapsed,
}
if logger:
logger.log(
level,
fmt % context,
extra=context)
else:
print(fmt % context)
return out
return wrapped
if (len(func_or_func_args) == 1
and isinstance(func_or_func_args[0], collections.Callable)):
return wrapped_f(func_or_func_args[0])
else:
return wrapped_f

Просмотреть файл

@ -0,0 +1,15 @@
name: project_environment
dependencies:
# The python interpreter version.
# Currently Azure ML only supports 3.5.2 and later.
- python=3.6.2
- pandas
- numpy
- pip:
# Required packages for AzureML execution, history, and data preparation.
- azureml-defaults
- torch-cpu==1.0.0
- torchvision-cpu
- horovod==0.15.2
- pillow
- tensorboardX

Просмотреть файл

@ -0,0 +1,15 @@
name: project_environment
dependencies:
# The python interpreter version.
# Currently Azure ML only supports 3.5.2 and later.
- python=3.6.2
- pandas
- numpy
- pip:
# Required packages for AzureML execution, history, and data preparation.
- azureml-defaults
- torch==1.0.0
- torchvision
- horovod==0.15.2
- pillow
- tensorboardX

Просмотреть файл

@ -0,0 +1,126 @@
""" This is an example template that you can use to create functions that you can call with invoke
"""
from invoke import task, Collection
import os
from config import load_config
_BASE_PATH = os.path.dirname(os.path.abspath( __file__ ))
env_values = load_config()
@task
def submit_local(c):
"""This command isn't implemented please modify to use.
The call below will work for submitting jobs to execute locally on a GPU.
"""
raise NotImplementedError(
"You need to modify this call before being able to use it"
)
from aml_compute import PyTorchExperimentCLI
exp = PyTorchExperimentCLI("<YOUR-EXPERIMENT-NAME>")
run = exp.submit_local(
os.path.join(_BASE_PATH, "src"),
"<YOUR-TRAINING-SCRIPT>",
{"YOUR": "ARGS"},
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
wait_for_completion=True,
)
print(run)
@task
def submit_remote(c, node_count=int(env_values["CLUSTER_MAX_NODES"])):
"""This command isn't implemented please modify to use.
The call below will work for submitting jobs to execute on a remote cluster using GPUs.
"""
raise NotImplementedError(
"You need to modify this call before being able to use it"
)
from aml_compute import PyTorchExperimentCLI
exp = PyTorchExperimentCLI("<YOUR-EXPERIMENT-NAME>")
run = exp.submit(
os.path.join(_BASE_PATH, "src"),
"<YOUR-TRAINING-SCRIPT>",
{"YOUR": "ARGS"},
node_count=node_count,
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
wait_for_completion=True,
)
print(run)
@task
def submit_images_remote(c, node_count=int(env_values["CLUSTER_MAX_NODES"])):
"""This command isn't implemented please modify to use.
The call below will work for submitting jobs to execute on a remote cluster using GPUs.
Notive that we are passing in a {datastore} parameter to the path. This tells the submit
method that we want the location as mapped by the datastore to be inserted here. Upon
execution the appropriate path will be prepended to the training_data_path and validation_data_path.
"""
raise NotImplementedError(
"You need to modify this call before being able to use it"
)
from aml_compute import PyTorchExperimentCLI
exp = PyTorchExperimentCLI("<YOUR-EXPERIMENT-NAME>")
run = exp.submit(
os.path.join(_BASE_PATH, "src"),
"<YOUR-TRAINING-SCRIPT>",
{
"--training_data_path": "{datastore}/train",
"--validation_data_path": "{datastore}/validation",
"--epochs": "1",
"--data_type": "images",
"--data-format": "channels_first",
},
node_count=node_count,
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
wait_for_completion=True,
)
print(run)
@task
def submit_images_local(c):
"""This command isn't implemented please modify to use.
The call below will work for submitting jobs to execute locally on a GPU.
Here we also map a volume to the docker container executing locally. This is the
location we tell our script to look for our training and validation data. Feel free to
adjust the other arguments as required by your trainining script.
"""
raise NotImplementedError(
"You need to modify this call before being able to use it"
)
from aml_compute import PyTorchExperimentCLI
exp = PyTorchExperimentCLI("<YOUR-EXPERIMENT-NAME>")
run = exp.submit_local(
os.path.join(_BASE_PATH, "src"),
"<YOUR-TRAINING-SCRIPT>",
{
"--training_data_path": "/data/train",
"--validation_data_path": "/data/validation",
"--epochs": "1",
"--data_type": "images",
"--data-format": "channels_first",
},
dependencies_file=os.path.join(_BASE_PATH, "environment_gpu.yml"),
docker_args=["-v", f"{env_values['data']}:/data"],
wait_for_completion=True,
)
print(run)
remote_collection = Collection("remote")
remote_collection.add_task(submit_images_remote, "images")
remote_collection.add_task(submit_remote, "synthetic")
local_collection = Collection("local")
local_collection.add_task(submit_images_local, "images")
local_collection.add_task(submit_local, "synthetic")
submit_collection = Collection("submit", local_collection, remote_collection)
namespace = Collection("pytorch_experiment", submit_collection)

Просмотреть файл

@ -0,0 +1,3 @@
"""
Place Script to train your PyTorch model here
"""

Просмотреть файл

@ -87,6 +87,12 @@ ENV PYTHONPATH /workspace/TensorFlow_benchmark:$PYTHONPATH
# imagenet {% if cookiecutter.type == "imagenet" or cookiecutter.type == "all"%}
ENV PYTHONPATH /workspace/TensorFlow_imagenet:$PYTHONPATH
# ------- {% endif %}
# pytorch benchmark {% if cookiecutter.type == "pytorch_benchmark" or cookiecutter.type == "all"%}
ENV PYTHONPATH /workspace/PyTorch_benchmark:$PYTHONPATH
# ------- {% endif %}
# pytorch imagenet {% if cookiecutter.type == "pytorch_imagenet" or cookiecutter.type == "all"%}
ENV PYTHONPATH /workspace/PyTorch_imagenet:$PYTHONPATH
# ------- {% endif %}
# Completion script
COPY bash.completion /etc/bash_completion.d/
RUN echo "source /etc/bash_completion.d/bash.completion" >> /root/.bashrc

Просмотреть файл

@ -15,7 +15,7 @@ from azureml.core.conda_dependencies import (
)
from azureml.core.runconfig import EnvironmentDefinition
from azureml.tensorboard import Tensorboard
from azureml.train.dnn import TensorFlow
from azureml.train.dnn import TensorFlow, PyTorch
from config import load_config
from toolz import curry, pipe
from pprint import pformat
@ -71,7 +71,7 @@ def _create_cluster(
return compute_target
def _prepare_environment_definition(dependencies_file, distributed):
def _prepare_environment_definition(base_image, dependencies_file, distributed):
logger = logging.getLogger(__name__)
env_def = EnvironmentDefinition()
conda_dep = CondaDependencies(conda_dependencies_file_path=dependencies_file)
@ -79,7 +79,7 @@ def _prepare_environment_definition(dependencies_file, distributed):
env_def.python.conda_dependencies = conda_dep
env_def.docker.enabled = True
env_def.docker.gpu_support = True
env_def.docker.base_image = "mcr.microsoft.com/azureml/base-gpu:intelmpi2018.3-cuda9.0-cudnn7-ubuntu16.04"
env_def.docker.base_image = base_image
env_def.docker.shm_size = "8g"
env_def.environment_variables["NCCL_SOCKET_IFNAME"] = "eth0"
env_def.environment_variables["NCCL_IB_DISABLE"] = 1
@ -104,16 +104,18 @@ def _create_estimator(
entry_script,
compute_target,
script_params,
base_image,
node_count=_CLUSTER_MAX_NODES,
process_count_per_node=4,
docker_args=(),
):
logger = logging.getLogger(__name__)
logger.debug(f"Base image {base_image}")
logger.debug(f"Loading dependencies from {dependencies_file}")
# If the compute target is "local" then don't run distributed
distributed = not (isinstance(compute_target, str) and compute_target == "local")
env_def = _prepare_environment_definition(dependencies_file, distributed)
env_def = _prepare_environment_definition(base_image, dependencies_file, distributed)
env_def.docker.arguments.extend(list(docker_args))
estimator = estimator_class(
@ -362,6 +364,8 @@ class TFExperimentCLI(ExperimentCLI):
wait_for_completion,
):
self._logger.debug(script_params)
base_image = "mcr.microsoft.com/azureml/base-gpu:intelmpi2018.3-cuda9.0-cudnn7-ubuntu16.04"
estimator = _create_estimator(
TensorFlow,
dependencies_file,
@ -369,6 +373,7 @@ class TFExperimentCLI(ExperimentCLI):
entry_script,
cluster,
script_params,
base_image,
node_count=node_count,
process_count_per_node=process_count_per_node,
docker_args=docker_args
@ -398,6 +403,139 @@ class TFExperimentCLI(ExperimentCLI):
return {key: _replace(value) for key, value in script_params.items()}
class PyTorchExperimentCLI(ExperimentCLI):
"""Creates Experiment object that can be used to create clusters and submit experiments
Returns:
PyTorchExperimentCLI: Experiment object
"""
def submit_local(
self,
project_folder,
entry_script,
script_params,
dependencies_file=_DEPENDENCIES_FILE,
wait_for_completion=True,
docker_args=(),
):
"""Submit experiment for local execution
Args:
project_folder (string): Path of you source files for the experiment
entry_script (string): The filename of your script to run. Must be found in your project_folder
script_params (dict): Dictionary of script parameters
dependencies_file (string, optional): The location of your environment.yml to use to create the
environment your training script requires.
Defaults to _DEPENDENCIES_FILE.
wait_for_completion (bool, optional): Whether to block until experiment is done. Defaults to True.
docker_args (tuple, optional): Docker arguments to pass. Defaults to ().
"""
self._logger.info("Running in local mode")
self._submit(
dependencies_file,
project_folder,
entry_script,
"local",
script_params,
1,
1,
docker_args,
wait_for_completion,
)
def submit(
self,
project_folder,
entry_script,
script_params,
dependencies_file=_DEPENDENCIES_FILE,
node_count=_CLUSTER_MAX_NODES,
process_count_per_node=4,
wait_for_completion=True,
docker_args=(),
):
"""Submit experiment for remote execution on AzureML clusters
Args:
project_folder (string): Path of you source files for the experiment
entry_script (string): The filename of your script to run. Must be found in your project_folder
script_params (dict): Dictionary of script parameters
dependencies_file (string, optional): The location of your environment.yml to use to
create the environment your training script requires.
Defaults to _DEPENDENCIES_FILE.
node_count (int, optional): [description]. Defaults to _CLUSTER_MAX_NODES.
process_count_per_node (int, optional): Number of precesses to run on each node.
Usually should be the same as the number of GPU for GPU exeuction.
Defaults to 4.
wait_for_completion (bool, optional): Whether to block until experiment is done. Defaults to True.
docker_args (tuple, optional): Docker arguments to pass. Defaults to ().
Returns:
azureml.core.Run: AzureML Run object
"""
self._logger.debug(script_params)
transformed_params = self._complete_datastore(script_params)
self._logger.debug("Transformed script params")
self._logger.debug(transformed_params)
return self._submit(
dependencies_file,
project_folder,
entry_script,
self.cluster,
transformed_params,
node_count,
process_count_per_node,
docker_args,
wait_for_completion,
)
def _submit(
self,
dependencies_file,
project_folder,
entry_script,
cluster,
script_params,
node_count,
process_count_per_node,
docker_args,
wait_for_completion,
):
self._logger.debug(script_params)
base_image = "mcr.microsoft.com/azureml/base-gpu:openmpi3.1.2-cuda9.0-cudnn7-ubuntu16.04"
estimator = _create_estimator(
PyTorch,
dependencies_file,
project_folder,
entry_script,
cluster,
script_params,
base_image,
node_count=node_count,
process_count_per_node=process_count_per_node,
docker_args=docker_args,
)
self._logger.debug(estimator.conda_dependencies.__dict__)
run = self._experiment.submit(estimator)
if wait_for_completion:
run.wait_for_completion(show_output=True)
return run
def _complete_datastore(self, script_params):
def _replace(value):
if isinstance(value, str) and "{datastore}" in value:
data_path = value.replace("{datastore}/", "")
return self.datastore.path(data_path).as_mount()
else:
return value
return {key: _replace(value) for key, value in script_params.items()}
def workspace_for_user(
workspace_name=_WORKSPACE,
resource_group=_RESOURCE_GROUP,

Просмотреть файл

@ -7,11 +7,14 @@ from config import load_config
import os
# Experiment imports {% if cookiecutter.type == "template" or cookiecutter.type == "all"%}
import tensorflow_experiment # {%- endif -%} Template {% if cookiecutter.type == "benchmark" or cookiecutter.type == "all"%}
import tensorflow_benchmark # {%- endif -%} Benchmark{% if cookiecutter.type == "imagenet" or cookiecutter.type == "all"%}
import storage
import image
import tensorflow_benchmark # {%- endif -%} Benchmark {% if cookiecutter.type == "imagenet" or cookiecutter.type == "all"%}
import tfrecords
import tensorflow_imagenet # Imagenet {% endif %}
import tensorflow_imagenet # {%- endif -%} Imagenet {% if cookiecutter.type == "pytorch_imagenet" or cookiecutter.type == "all"%}
import pytorch_imagenet # {%- endif -%} PyTorch Imagenet {% if cookiecutter.type == "imagenet" or cookiecutter.type == "pytorch_imagenet" or cookiecutter.type == "all"%}
import storage
import image # {%- endif -%} {% if cookiecutter.type == "pytorch_experiment" or cookiecutter.type == "all"%}
import pytorch_experiment # {%- endif -%} {% if cookiecutter.type == "pytorch_benchmark" or cookiecutter.type == "all"%}
import pytorch_benchmark # {% endif %} PyTorch Benchmark
from invoke.executor import Executor
logging.config.fileConfig(os.getenv("LOG_CONFIG", "logging.conf"))
@ -37,7 +40,8 @@ def _prompt_sub_id_selection(c):
from prompt_toolkit import prompt
results = c.run(f"az account list", pty=True, hide="out")
sub_dict = json.loads(results.stdout)
parsestr = "["+results.stdout[1:-7]+"]" #TODO: Figure why this is necessary
sub_dict = json.loads(parsestr)
sub_list = [
{"Index": i, "Name": sub["name"], "id": sub["id"]}
for i, sub in enumerate(sub_dict)
@ -192,7 +196,7 @@ namespace.add_collection(tf_exp_collection)
tf_bench_collection = Collection.from_module(tensorflow_benchmark)
namespace.add_collection(tf_bench_collection)
#{%- endif -%}
# Imagenet{% if cookiecutter.type == "imagenet" or cookiecutter.type == "all"%}
# Imagenet {% if cookiecutter.type == "imagenet" or cookiecutter.type == "all"%}
storage_collection = Collection.from_module(storage)
storage_collection.add_collection(Collection.from_module(image))
storage_collection.add_collection(Collection.from_module(tfrecords))
@ -200,6 +204,21 @@ tf_collection = Collection.from_module(tensorflow_imagenet)
namespace.add_collection(tf_collection)
namespace.add_collection(storage_collection) # {% endif %}
# PyTorch Benchmark {% if cookiecutter.type == "pytorch_benchmark" or cookiecutter.type == "all"%}
pytorch_bench_collection = Collection.from_module(pytorch_benchmark)
namespace.add_collection(pytorch_bench_collection)
#{%- endif %}
# PyTorch Imagenet {% if cookiecutter.type == "pytorch_imagenet" or cookiecutter.type == "all"%}
pytorch_imagenet_collection = Collection.from_module(pytorch_imagenet)
namespace.add_collection(pytorch_imagenet_collection)
#{%- endif %}
# PyTorch Experiment {% if cookiecutter.type == "pytorch_experiment" or cookiecutter.type == "all"%}
pytorch_exp_collection = Collection.from_module(pytorch_experiment)
namespace.add_collection(pytorch_exp_collection)
#{%- endif %}
namespace.configure({
'root_namespace': namespace,
'invoke_execute': invoke_execute,