* update tutorials

* edit workflows

* fix

* fix upl
This commit is contained in:
Cody 2020-11-15 15:50:16 -08:00 коммит произвёл GitHub
Родитель c10174170f
Коммит e800ed93f4
52 изменённых файлов: 4721 добавлений и 5156 удалений

9
.github/workflows/run-tutorial-ai.yml поставляемый
Просмотреть файл

@ -28,8 +28,11 @@ jobs:
- name: attach to workspace
run: az ml folder attach -w default -g azureml-examples
- name: run 1.hello-world.ipynb
run: papermill tutorials/an-introduction/1.hello-world.ipynb - -k python
run: papermill 1.hello-world.ipynb - -k python
working-directory: tutorials/an-introduction
- name: run 2.pytorch-model.ipynb
run: papermill tutorials/an-introduction/2.pytorch-model.ipynb - -k python
run: papermill 2.pytorch-model.ipynb - -k python
working-directory: tutorials/an-introduction
- name: run 3.pytorch-model-cloud-data.ipynb
run: papermill tutorials/an-introduction/3.pytorch-model-cloud-data.ipynb - -k python
run: papermill 3.pytorch-model-cloud-data.ipynb - -k python
working-directory: tutorials/an-introduction

3
.github/workflows/run-tutorial-awp.yml поставляемый
Просмотреть файл

@ -28,4 +28,5 @@ jobs:
- name: attach to workspace
run: az ml folder attach -w default -g azureml-examples
- name: run 1.classification.ipynb
run: papermill tutorials/automl-with-pycaret/1.classification.ipynb - -k python
run: papermill 1.classification.ipynb - -k python
working-directory: tutorials/automl-with-pycaret

3
.github/workflows/run-tutorial-de.yml поставляемый
Просмотреть файл

@ -28,4 +28,5 @@ jobs:
- name: attach to workspace
run: az ml folder attach -w default -g azureml-examples
- name: run ase-gpu.ipynb
run: papermill tutorials/deploy-edge/ase-gpu.ipynb - -k python
run: papermill ase-gpu.ipynb - -k python
working-directory: tutorials/deploy-edge

8
.github/workflows/run-tutorial-dt.yml поставляемый
Просмотреть файл

@ -27,11 +27,11 @@ jobs:
run: az extension add -s https://azurecliext.blob.core.windows.net/release/azure_cli_ml-1.15.0-py3-none-any.whl -y
- name: attach to workspace
run: az ml folder attach -w default -g azureml-examples
- name: run 0.write-files.ipynb
run: papermill tutorials/deploy-triton/0.write-files.ipynb - -k python
- name: run 1.densenet-local.ipynb
run: echo "fix me :("
# run: papermill tutorials/deploy-triton/1.densenet-local.ipynb - -k python
# run: papermill 1.densenet-local.ipynb - -k python
working-directory: tutorials/deploy-triton
- name: run 2.bidaf-aks-v100.ipynb
run: echo "fix me :("
# run: papermill tutorials/deploy-triton/2.bidaf-aks-v100.ipynb - -k python
# run: papermill 2.bidaf-aks-v100.ipynb - -k python
working-directory: tutorials/deploy-triton

6
.github/workflows/run-tutorial-ud.yml поставляемый
Просмотреть файл

@ -30,6 +30,8 @@ jobs:
- name: attach to workspace
run: az ml folder attach -w default -g azureml-examples
- name: run 1.intro-to-dask.ipynb
run: papermill tutorials/using-dask/1.intro-to-dask.ipynb - -k python
run: papermill 1.intro-to-dask.ipynb - -k python
working-directory: tutorials/using-dask
- name: run 2.eds-at-scale.ipynb
run: papermill tutorials/using-dask/2.eds-at-scale.ipynb - -k python
run: papermill 2.eds-at-scale.ipynb - -k python
working-directory: tutorials/using-dask

14
.github/workflows/run-tutorial-upl.yml поставляемый
Просмотреть файл

@ -27,13 +27,15 @@ jobs:
run: az extension add -s https://azurecliext.blob.core.windows.net/release/azure_cli_ml-1.15.0-py3-none-any.whl -y
- name: attach to workspace
run: az ml folder attach -w default -g azureml-examples
- name: run 0.write-files.ipynb
run: papermill tutorials/using-pytorch-lightning/0.write-files.ipynb - -k python
- name: run 1.train-single-node.ipynb
run: papermill tutorials/using-pytorch-lightning/1.train-single-node.ipynb - -k python
run: papermill 1.train-single-node.ipynb - -k python
working-directory: tutorials/using-pytorch-lightning
- name: run 2.log-with-tensorboard.ipynb
run: papermill tutorials/using-pytorch-lightning/2.log-with-tensorboard.ipynb - -k python
run: papermill 2.log-with-tensorboard.ipynb - -k python
working-directory: tutorials/using-pytorch-lightning
- name: run 3.log-with-mlflow.ipynb
run: papermill tutorials/using-pytorch-lightning/3.log-with-mlflow.ipynb - -k python
run: papermill 3.log-with-mlflow.ipynb - -k python
working-directory: tutorials/using-pytorch-lightning
- name: run 4.train-multi-node-ddp.ipynb
run: papermill tutorials/using-pytorch-lightning/4.train-multi-node-ddp.ipynb - -k python
run: papermill 4.train-multi-node-ddp.ipynb - -k python
working-directory: tutorials/using-pytorch-lightning

8
.github/workflows/run-tutorial-ur.yml поставляемый
Просмотреть файл

@ -29,9 +29,9 @@ jobs:
run: az extension add -s https://azurecliext.blob.core.windows.net/release/azure_cli_ml-1.15.0-py3-none-any.whl -y
- name: attach to workspace
run: az ml folder attach -w default -g azureml-examples
- name: run 0.write-files.ipynb
run: papermill tutorials/using-rapids/0.write-files.ipynb - -k python
- name: run 1.train-and-hpo.ipynb
run: papermill tutorials/using-rapids/1.train-and-hpo.ipynb - -k python
run: papermill 1.train-and-hpo.ipynb - -k python
working-directory: tutorials/using-rapids
- name: run 2.train-multi-gpu.ipynb
run: papermill tutorials/using-rapids/2.train-multi-gpu.ipynb - -k python
run: papermill 2.train-multi-gpu.ipynb - -k python
working-directory: tutorials/using-rapids

Просмотреть файл

@ -63,10 +63,10 @@ path|status|notebooks|description
[an-introduction](tutorials/an-introduction)|[![an-introduction](https://github.com/Azure/azureml-examples/workflows/run-tutorial-ai/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-ai)|[1.hello-world.ipynb](tutorials/an-introduction/1.hello-world.ipynb)<br>[2.pytorch-model.ipynb](tutorials/an-introduction/2.pytorch-model.ipynb)<br>[3.pytorch-model-cloud-data.ipynb](tutorials/an-introduction/3.pytorch-model-cloud-data.ipynb)|learn the basics of Azure Machine Learning
[automl-with-pycaret](tutorials/automl-with-pycaret)|[![automl-with-pycaret](https://github.com/Azure/azureml-examples/workflows/run-tutorial-awp/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-awp)|[1.classification.ipynb](tutorials/automl-with-pycaret/1.classification.ipynb)|learn how to automate ML with [pycaret](https://github.com/pycaret/pycaret)
[deploy-edge](tutorials/deploy-edge)|[![deploy-edge](https://github.com/Azure/azureml-examples/workflows/run-tutorial-de/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-de)|[ase-gpu.ipynb](tutorials/deploy-edge/ase-gpu.ipynb)|learn how to use Edge device for model deployment and scoring
[deploy-triton](tutorials/deploy-triton)|[![deploy-triton](https://github.com/Azure/azureml-examples/workflows/run-tutorial-dt/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-dt)|[0.write-files.ipynb](tutorials/deploy-triton/0.write-files.ipynb)<br>[1.densenet-local.ipynb](tutorials/deploy-triton/1.densenet-local.ipynb)<br>[2.bidaf-aks-v100.ipynb](tutorials/deploy-triton/2.bidaf-aks-v100.ipynb)|learn how to efficiently deploy to GPUs using [triton inference server](https://github.com/triton-inference-server/server)
[deploy-triton](tutorials/deploy-triton)|[![deploy-triton](https://github.com/Azure/azureml-examples/workflows/run-tutorial-dt/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-dt)|[1.densenet-local.ipynb](tutorials/deploy-triton/1.densenet-local.ipynb)<br>[2.bidaf-aks-v100.ipynb](tutorials/deploy-triton/2.bidaf-aks-v100.ipynb)|learn how to efficiently deploy to GPUs using [triton inference server](https://github.com/triton-inference-server/server)
[using-dask](tutorials/using-dask)|[![using-dask](https://github.com/Azure/azureml-examples/workflows/run-tutorial-ud/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-ud)|[1.intro-to-dask.ipynb](tutorials/using-dask/1.intro-to-dask.ipynb)<br>[2.eds-at-scale.ipynb](tutorials/using-dask/2.eds-at-scale.ipynb)|learn how to read from cloud data and scale PyData tools (numpy, pandas, scikit-learn, etc.) with [dask](https://github.com/dask/dask)
[using-pytorch-lightning](tutorials/using-pytorch-lightning)|[![using-pytorch-lightning](https://github.com/Azure/azureml-examples/workflows/run-tutorial-upl/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-upl)|[0.write-files.ipynb](tutorials/using-pytorch-lightning/0.write-files.ipynb)<br>[1.train-single-node.ipynb](tutorials/using-pytorch-lightning/1.train-single-node.ipynb)<br>[2.log-with-tensorboard.ipynb](tutorials/using-pytorch-lightning/2.log-with-tensorboard.ipynb)<br>[3.log-with-mlflow.ipynb](tutorials/using-pytorch-lightning/3.log-with-mlflow.ipynb)<br>[4.train-multi-node-ddp.ipynb](tutorials/using-pytorch-lightning/4.train-multi-node-ddp.ipynb)|learn how to train and log metrics with [PyTorch Lightning](https://github.com/PyTorchLightning/pytorch-lightning)
[using-rapids](tutorials/using-rapids)|[![using-rapids](https://github.com/Azure/azureml-examples/workflows/run-tutorial-ur/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-ur)|[0.write-files.ipynb](tutorials/using-rapids/0.write-files.ipynb)<br>[1.train-and-hpo.ipynb](tutorials/using-rapids/1.train-and-hpo.ipynb)<br>[2.train-multi-gpu.ipynb](tutorials/using-rapids/2.train-multi-gpu.ipynb)|learn how to accelerate PyData tools (numpy, pandas, scikit-learn, etc) on NVIDIA GPUs with [rapids](https://github.com/rapidsai)
[using-pytorch-lightning](tutorials/using-pytorch-lightning)|[![using-pytorch-lightning](https://github.com/Azure/azureml-examples/workflows/run-tutorial-upl/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-upl)|[1.train-single-node.ipynb](tutorials/using-pytorch-lightning/1.train-single-node.ipynb)<br>[2.log-with-tensorboard.ipynb](tutorials/using-pytorch-lightning/2.log-with-tensorboard.ipynb)<br>[3.log-with-mlflow.ipynb](tutorials/using-pytorch-lightning/3.log-with-mlflow.ipynb)<br>[4.train-multi-node-ddp.ipynb](tutorials/using-pytorch-lightning/4.train-multi-node-ddp.ipynb)|learn how to train and log metrics with [PyTorch Lightning](https://github.com/PyTorchLightning/pytorch-lightning)
[using-rapids](tutorials/using-rapids)|[![using-rapids](https://github.com/Azure/azureml-examples/workflows/run-tutorial-ur/badge.svg)](https://github.com/Azure/azureml-examples/actions?query=workflow%3Arun-tutorial-ur)|[1.train-and-hpo.ipynb](tutorials/using-rapids/1.train-and-hpo.ipynb)<br>[2.train-multi-gpu.ipynb](tutorials/using-rapids/2.train-multi-gpu.ipynb)|learn how to accelerate PyData tools (numpy, pandas, scikit-learn, etc) on NVIDIA GPUs with [rapids](https://github.com/rapidsai)
**Notebooks**
path|description

Просмотреть файл

@ -16,23 +16,6 @@
"- Monitoring and recording runs in the Azure Machine Learning studio\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Write files "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile hello.py\n",
"print(\"hello world!\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
@ -50,7 +33,7 @@
},
"outputs": [],
"source": [
"!python hello.py"
"!python src/hello.py"
]
},
{
@ -86,7 +69,7 @@
"ws = Workspace.from_config()\n",
"exp = Experiment(workspace=ws, name=\"an-introduction-hello-world-tutorial\")\n",
"src = ScriptRunConfig(\n",
" source_directory=\".\", script=\"hello.py\", compute_target=\"cpu-cluster\"\n",
" source_directory=\"src\", script=\"hello.py\", compute_target=\"cpu-cluster\"\n",
")\n",
"\n",
"run = exp.submit(src)\n",
@ -177,7 +160,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.3-final"
"version": "3.8.5-final"
},
"notice": "Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT License.",
"nteract": {

Просмотреть файл

@ -25,130 +25,6 @@
"Note the code is based on [this introductory example from PyTorch](https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html). "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Write files"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile model.py\n",
"import torch.nn as nn\n",
"import torch.nn.functional as F\n",
"\n",
"\n",
"class Net(nn.Module):\n",
" def __init__(self):\n",
" super(Net, self).__init__()\n",
" self.conv1 = nn.Conv2d(3, 6, 5)\n",
" self.pool = nn.MaxPool2d(2, 2)\n",
" self.conv2 = nn.Conv2d(6, 16, 5)\n",
" self.fc1 = nn.Linear(16 * 5 * 5, 120)\n",
" self.fc2 = nn.Linear(120, 84)\n",
" self.fc3 = nn.Linear(84, 10)\n",
"\n",
" def forward(self, x):\n",
" x = self.pool(F.relu(self.conv1(x)))\n",
" x = self.pool(F.relu(self.conv2(x)))\n",
" x = x.view(-1, 16 * 5 * 5)\n",
" x = F.relu(self.fc1(x))\n",
" x = F.relu(self.fc2(x))\n",
" x = self.fc3(x)\n",
" return x"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile train.py\n",
"import torch\n",
"import torch.optim as optim\n",
"import torchvision\n",
"import torchvision.transforms as transforms\n",
"\n",
"from model import Net\n",
"\n",
"# download CIFAR 10 data\n",
"trainset = torchvision.datasets.CIFAR10(\n",
" root=\"./data\",\n",
" train=True,\n",
" download=True,\n",
" transform=torchvision.transforms.ToTensor(),\n",
")\n",
"trainloader = torch.utils.data.DataLoader(\n",
" trainset, batch_size=4, shuffle=True, num_workers=2\n",
")\n",
"\n",
"if __name__ == \"__main__\":\n",
"\n",
" # define convolutional network\n",
" net = Net()\n",
"\n",
" # set up pytorch loss / optimizer\n",
" criterion = torch.nn.CrossEntropyLoss()\n",
" optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)\n",
"\n",
" # train the network\n",
" for epoch in range(2):\n",
"\n",
" running_loss = 0.0\n",
" for i, data in enumerate(trainloader, 0):\n",
" # unpack the data\n",
" inputs, labels = data\n",
"\n",
" # zero the parameter gradients\n",
" optimizer.zero_grad()\n",
"\n",
" # forward + backward + optimize\n",
" outputs = net(inputs)\n",
" loss = criterion(outputs, labels)\n",
" loss.backward()\n",
" optimizer.step()\n",
"\n",
" # print statistics\n",
" running_loss += loss.item()\n",
" if i % 2000 == 1999:\n",
" loss = running_loss / 2000\n",
" print(f\"epoch={epoch + 1}, batch={i + 1:5}: loss {loss:.2f}\")\n",
" running_loss = 0.0\n",
"\n",
" print(\"Finished Training\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile pytorch.yml\n",
"name: an-introduction-tutorial\n",
"channels:\n",
" - defaults\n",
" - pytorch\n",
" - anaconda\n",
" - conda-forge\n",
"dependencies:\n",
" - python=3.6\n",
" - pip\n",
" - pip:\n",
" - torch\n",
" - torchvision\n",
" - mlflow\n",
" - matplotlib\n",
" - azureml-mlflow\n",
" - azureml-dataprep"
]
},
{
"cell_type": "markdown",
"metadata": {},
@ -179,11 +55,11 @@
"\n",
"ws = Workspace.from_config()\n",
"env = Environment.from_conda_specification(\n",
" name=\"pytorch-tutorial\", file_path=\"pytorch.yml\",\n",
" name=\"pytorch-tutorial\", file_path=\"environment.yml\",\n",
")\n",
"exp = Experiment(workspace=ws, name=\"an-introduction-train-tutorial\")\n",
"src = ScriptRunConfig(\n",
" source_directory=\".\",\n",
" source_directory=\"src\",\n",
" script=\"train.py\",\n",
" compute_target=\"cpu-cluster\",\n",
" environment=env,\n",
@ -257,7 +133,7 @@
"\n",
"### Machine learning code updates\n",
"\n",
"In the `../../code/train/pytorch/cifar10-cnn` directory you will notice the [train-with-logging.py](../../code/train/pytorch/cifar10-cnn/train-with-logging.py) script has been modified with one additional line that will log the loss to the Azure Machine Learning Studio:\n",
"In the `src` directory you will notice the [train-with-logging.py](src/train-with-logging.py) script has been modified with one additional line that will log the loss to the Azure Machine Learning Studio:\n",
"\n",
"```python\n",
"# in train.py\n",
@ -282,70 +158,6 @@
"Submit your code once more. This time the widget includes the metrics where you can now see live updates on the model training loss!"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile train-with-logging.py\n",
"import mlflow\n",
"import torch\n",
"import torch.optim as optim\n",
"import torchvision\n",
"import torchvision.transforms as transforms\n",
"\n",
"from model import Net\n",
"\n",
"# download CIFAR 10 data\n",
"trainset = torchvision.datasets.CIFAR10(\n",
" root=\"./data\",\n",
" train=True,\n",
" download=True,\n",
" transform=torchvision.transforms.ToTensor(),\n",
")\n",
"trainloader = torch.utils.data.DataLoader(\n",
" trainset, batch_size=4, shuffle=True, num_workers=2\n",
")\n",
"\n",
"if __name__ == \"__main__\":\n",
"\n",
" # define convolutional network\n",
" net = Net()\n",
"\n",
" # set up pytorch loss / optimizer\n",
" criterion = torch.nn.CrossEntropyLoss()\n",
" optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)\n",
"\n",
" # train the network\n",
" for epoch in range(2):\n",
"\n",
" running_loss = 0.0\n",
" for i, data in enumerate(trainloader, 0):\n",
" # unpack the data\n",
" inputs, labels = data\n",
"\n",
" # zero the parameter gradients\n",
" optimizer.zero_grad()\n",
"\n",
" # forward + backward + optimize\n",
" outputs = net(inputs)\n",
" loss = criterion(outputs, labels)\n",
" loss.backward()\n",
" optimizer.step()\n",
"\n",
" # print statistics\n",
" running_loss += loss.item()\n",
" if i % 2000 == 1999:\n",
" loss = running_loss / 2000\n",
" # ADDITIONAL CODE: log loss metric to AML\n",
" mlflow.log_metric(\"loss\", loss)\n",
" print(f\"epoch={epoch + 1}, batch={i + 1:5}: loss {loss:.2f}\")\n",
" running_loss = 0.0\n",
"\n",
" print(\"Finished Training\")"
]
},
{
"cell_type": "code",
"execution_count": null,
@ -365,13 +177,13 @@
"\n",
"ws = Workspace.from_config()\n",
"env = Environment.from_conda_specification(\n",
" name=\"pytorch-env-tutorial\", file_path=\"pytorch.yml\",\n",
" name=\"pytorch-env-tutorial\", file_path=\"environment.yml\",\n",
")\n",
"exp = Experiment(\n",
" workspace=ws, name=\"an-introduction-train-with-logging-tutorial\"\n",
")\n",
"src = ScriptRunConfig(\n",
" source_directory=\".\",\n",
" source_directory=\"src\",\n",
" script=\"train-with-logging.py\",\n",
" compute_target=\"cpu-cluster\",\n",
" environment=env,\n",

Просмотреть файл

@ -23,7 +23,7 @@
"By now you have your training script running in Azure Machine Learning, and can monitor the model performance. Let's _parametrize_ the training script by introducing\n",
"arguments. Using arguments will allow you to easily compare different hyperparmeters.\n",
"\n",
"Presently our training script is set to download the CIFAR10 dataset on each run. The python code in [train-with-cloud-data-and-logging.py](../../code/train/pytorch/cifar10-cnn/train-with-cloud-data-and-logging.py) now uses **`argparse` to parametize the script.**"
"Presently our training script is set to download the CIFAR10 dataset on each run. The python code in [train-with-cloud-data-and-logging.py](src/train-with-cloud-data-and-logging.py) now uses **`argparse` to parametize the script.**"
]
},
{
@ -120,102 +120,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"The `target_path` specifies the path on the datastore where the CIFAR10 data will be uploaded.\n",
"\n",
"## Write files\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile train-with-cloud-data-and-logging.py\n",
"import os\n",
"import argparse\n",
"import mlflow\n",
"import torch\n",
"import torch.optim as optim\n",
"import torchvision\n",
"import torchvision.transforms as transforms\n",
"\n",
"from model import Net\n",
"\n",
"if __name__ == \"__main__\":\n",
" parser = argparse.ArgumentParser()\n",
" parser.add_argument(\n",
" \"--data-path\", type=str, help=\"Path to the training data\"\n",
" )\n",
" parser.add_argument(\n",
" \"--learning-rate\",\n",
" type=float,\n",
" default=0.001,\n",
" help=\"Learning rate for SGD\",\n",
" )\n",
" parser.add_argument(\n",
" \"--momentum\", type=float, default=0.9, help=\"Momentum for SGD\"\n",
" )\n",
" parser.add_argument(\n",
" \"--epochs\", type=int, default=2, help=\"Number of epochs to train\"\n",
" )\n",
" args = parser.parse_args()\n",
"\n",
" print(\"===== DATA =====\")\n",
" print(\"DATA PATH: \" + args.data_path)\n",
" print(\"LIST FILES IN DATA PATH...\")\n",
" print(os.listdir(args.data_path))\n",
" print(\"================\")\n",
"\n",
" # prepare DataLoader for CIFAR10 data\n",
" transform = transforms.Compose(\n",
" [\n",
" transforms.ToTensor(),\n",
" transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),\n",
" ]\n",
" )\n",
" trainset = torchvision.datasets.CIFAR10(\n",
" root=args.data_path, train=True, download=False, transform=transform,\n",
" )\n",
" trainloader = torch.utils.data.DataLoader(\n",
" trainset, batch_size=4, shuffle=True, num_workers=2\n",
" )\n",
"\n",
" # define convolutional network\n",
" net = Net()\n",
"\n",
" # set up pytorch loss / optimizer\n",
" criterion = torch.nn.CrossEntropyLoss()\n",
" optimizer = optim.SGD(\n",
" net.parameters(), lr=args.learning_rate, momentum=args.momentum,\n",
" )\n",
"\n",
" # train the network\n",
" for epoch in range(args.epochs):\n",
"\n",
" running_loss = 0.0\n",
" for i, data in enumerate(trainloader, 0):\n",
" # unpack the data\n",
" inputs, labels = data\n",
"\n",
" # zero the parameter gradients\n",
" optimizer.zero_grad()\n",
"\n",
" # forward + backward + optimize\n",
" outputs = net(inputs)\n",
" loss = criterion(outputs, labels)\n",
" loss.backward()\n",
" optimizer.step()\n",
"\n",
" # print statistics\n",
" running_loss += loss.item()\n",
" if i % 2000 == 1999:\n",
" loss = running_loss / 2000\n",
" mlflow.log_metric(\"loss\", loss) # log loss metric to AML\n",
" print(f\"epoch={epoch + 1}, batch={i + 1:5}: loss {loss:.2f}\")\n",
" running_loss = 0.0\n",
"\n",
" print(\"Finished Training\")"
"The `target_path` specifies the path on the datastore where the CIFAR10 data will be uploaded."
]
},
{
@ -254,11 +159,11 @@
" (ws.get_default_datastore(), \"tutorials/datasets/\")\n",
")\n",
"env = Environment.from_conda_specification(\n",
" name=\"pytorch-env-tutorial\", file_path=\"pytorch.yml\",\n",
" name=\"pytorch-env-tutorial\", file_path=\"environment.yml\",\n",
")\n",
"\n",
"src = ScriptRunConfig(\n",
" source_directory=\".\",\n",
" source_directory=\"src\",\n",
" script=\"train-with-cloud-data-and-logging.py\",\n",
" compute_target=\"cpu-cluster\",\n",
" environment=env,\n",

Просмотреть файл

@ -1,3 +1,13 @@
# An introduction
description: learn the basics of Azure Machine Learning
This tutorial walks through the day 1 steps for getting started with Azure Machine Learning by running through a "hello world" and then basic PyTorch training on remote compute.
The tutorial consists of three notebooks:
1. [1.hello-world.ipynb](1.hello-world.ipynb)
1. [2.pytorch-model.ipynb](2.pytorch-model.ipynb)
1. [3.pytorch-model-cloud-data.ipynb](3.pytorch-model-cloud-data.ipynb)
After running through these, the basic concepts used throughout this repository are demonstrated. You can then go through other tutorials to learn the specifics of Azure cloud integration with various ML tools, or easily run one of the many workflow examples. If you want to run your script regularly or on triggers, see the [template](https://github.com/Azure/azureml-template) setup guide.

Просмотреть файл

@ -0,0 +1,16 @@
name: an-introduction-tutorial
channels:
- defaults
- pytorch
- anaconda
- conda-forge
dependencies:
- python=3.6
- pip
- pip:
- torch
- torchvision
- mlflow
- matplotlib
- azureml-mlflow
- azureml-dataprep

Просмотреть файл

@ -0,0 +1 @@
print("hello world!")

Просмотреть файл

@ -0,0 +1,22 @@
import torch.nn as nn
import torch.nn.functional as F
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x

Просмотреть файл

@ -0,0 +1,74 @@
import os
import argparse
import mlflow
import torch
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from model import Net
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data-path", type=str, help="Path to the training data")
parser.add_argument(
"--learning-rate", type=float, default=0.001, help="Learning rate for SGD",
)
parser.add_argument("--momentum", type=float, default=0.9, help="Momentum for SGD")
parser.add_argument(
"--epochs", type=int, default=2, help="Number of epochs to train"
)
args = parser.parse_args()
print("===== DATA =====")
print("DATA PATH: " + args.data_path)
print("LIST FILES IN DATA PATH...")
print(os.listdir(args.data_path))
print("================")
# prepare DataLoader for CIFAR10 data
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),]
)
trainset = torchvision.datasets.CIFAR10(
root=args.data_path, train=True, download=False, transform=transform,
)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=4, shuffle=True, num_workers=2
)
# define convolutional network
net = Net()
# set up pytorch loss / optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(
net.parameters(), lr=args.learning_rate, momentum=args.momentum,
)
# train the network
for epoch in range(args.epochs):
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# unpack the data
inputs, labels = data
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if i % 2000 == 1999:
loss = running_loss / 2000
mlflow.log_metric("loss", loss) # log loss metric to AML
print(f"epoch={epoch + 1}, batch={i + 1:5}: loss {loss:.2f}")
running_loss = 0.0
print("Finished Training")

Просмотреть файл

@ -0,0 +1,55 @@
import mlflow
import torch
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from model import Net
# download CIFAR 10 data
trainset = torchvision.datasets.CIFAR10(
root="./data",
train=True,
download=True,
transform=torchvision.transforms.ToTensor(),
)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=4, shuffle=True, num_workers=2
)
if __name__ == "__main__":
# define convolutional network
net = Net()
# set up pytorch loss / optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
# train the network
for epoch in range(2):
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# unpack the data
inputs, labels = data
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if i % 2000 == 1999:
loss = running_loss / 2000
# ADDITIONAL CODE: log loss metric to AML
mlflow.log_metric("loss", loss)
print(f"epoch={epoch + 1}, batch={i + 1:5}: loss {loss:.2f}")
running_loss = 0.0
print("Finished Training")

Просмотреть файл

@ -0,0 +1,52 @@
import torch
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from model import Net
# download CIFAR 10 data
trainset = torchvision.datasets.CIFAR10(
root="./data",
train=True,
download=True,
transform=torchvision.transforms.ToTensor(),
)
trainloader = torch.utils.data.DataLoader(
trainset, batch_size=4, shuffle=True, num_workers=2
)
if __name__ == "__main__":
# define convolutional network
net = Net()
# set up pytorch loss / optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
# train the network
for epoch in range(2):
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# unpack the data
inputs, labels = data
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# print statistics
running_loss += loss.item()
if i % 2000 == 1999:
loss = running_loss / 2000
print(f"epoch={epoch + 1}, batch={i + 1:5}: loss {loss:.2f}")
running_loss = 0.0
print("Finished Training")

Просмотреть файл

@ -108,70 +108,7 @@
"source": [
"# Deploy the model as a web service to Edge\n",
"\n",
"We begin by writing a score.py file that will be invoked by the web service call. The init() function is called once when the container is started so we load the model using the Tensorflow session. The run() function is called when the webservice is invoked for inferencing. After running the code below you should see a score.py file in the same folder as this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile score.py\n",
"import tensorflow as tf\n",
"import numpy as np\n",
"import json\n",
"import os\n",
"from azureml.contrib.services.aml_request import AMLRequest, rawhttp\n",
"from azureml.contrib.services.aml_response import AMLResponse\n",
"\n",
"\n",
"def init():\n",
" global session\n",
" global input_name\n",
" global output_name\n",
"\n",
" session = tf.Session()\n",
"\n",
" # AZUREML_MODEL_DIR is an environment variable created during deployment.\n",
" # It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)\n",
" # For multiple models, it points to the folder containing all deployed models (./azureml-models)\n",
" model_path = os.path.join(os.getenv(\"AZUREML_MODEL_DIR\"), \"resnet50\")\n",
" model = tf.saved_model.loader.load(session, [\"serve\"], model_path)\n",
" if len(model.signature_def[\"serving_default\"].inputs) > 1:\n",
" raise ValueError(\"This score.py only supports one input\")\n",
" input_name = [\n",
" tensor.name\n",
" for tensor in model.signature_def[\"serving_default\"].inputs.values()\n",
" ][0]\n",
" output_name = [\n",
" tensor.name\n",
" for tensor in model.signature_def[\"serving_default\"].outputs.values()\n",
" ]\n",
"\n",
"\n",
"@rawhttp\n",
"def run(request):\n",
" if request.method == \"POST\":\n",
" reqBody = request.get_data(False)\n",
" resp = score(reqBody)\n",
" return AMLResponse(resp, 200)\n",
" if request.method == \"GET\":\n",
" respBody = str.encode(\"GET is not supported\")\n",
" return AMLResponse(respBody, 405)\n",
" return AMLResponse(\"bad request\", 500)\n",
"\n",
"\n",
"def score(data):\n",
" result = session.run(output_name, {input_name: [data]})\n",
" return json.dumps(result[1].tolist())\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" init()\n",
" with open(\"test_image.jpg\", \"rb\") as f:\n",
" content = f.read()\n",
" print(score(content))"
"We begin by writing a score.py file that will be invoked by the web service call. The init() function is called once when the container is started so we load the model using the Tensorflow session. The run() function is called when the webservice is invoked for inferencing. See [score.py](src/score.py)."
]
},
{
@ -203,7 +140,9 @@
" pip_packages=[\"azureml-contrib-services\", \"azureml-defaults\"],\n",
")\n",
"\n",
"inference_config = InferenceConfig(entry_script=\"score.py\", environment=env)"
"inference_config = InferenceConfig(\n",
" source_directory=\"src\", entry_script=\"score.py\", environment=env\n",
")"
]
},
{
@ -311,80 +250,6 @@
"Create a deployment.json file using the template json. Then push the deployment json file to the IoT Hub, which will then send it to the IoT Edge device. The IoT Edge agent will then pull the Docker images and run them."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile iotedge-tf-template-gpu.json\n",
"{\n",
" \"modulesContent\": {\n",
" \"$edgeAgent\": {\n",
" \"properties.desired\": {\n",
" \"schemaVersion\": \"1.0\",\n",
" \"runtime\": {\n",
" \"type\": \"docker\",\n",
" \"settings\": {\n",
" \"minDockerVersion\": \"v1.25\",\n",
" \"loggingOptions\": \"\",\n",
" \"registryCredentials\": {\n",
" \"__REGISTRY_NAME\": {\n",
" \"username\": \"__REGISTRY_USER_NAME\",\n",
" \"password\": \"__REGISTRY_PASSWORD\",\n",
" \"address\": \"__REGISTRY_NAME.azurecr.io\",\n",
" }\n",
" },\n",
" },\n",
" },\n",
" \"systemModules\": {\n",
" \"edgeAgent\": {\n",
" \"type\": \"docker\",\n",
" \"settings\": {\n",
" \"image\": \"mcr.microsoft.com/azureiotedge-agent:1.0\",\n",
" \"createOptions\": \"{}\",\n",
" \"env\": {\"UpstreamProtocol\": {\"value\": \"MQTT\"}},\n",
" },\n",
" },\n",
" \"edgeHub\": {\n",
" \"type\": \"docker\",\n",
" \"status\": \"running\",\n",
" \"restartPolicy\": \"always\",\n",
" \"settings\": {\n",
" \"image\": \"mcr.microsoft.com/azureiotedge-hub:1.0\",\n",
" \"createOptions\": '{\"User\":\"root\",\"HostConfig\":{\"PortBindings\":{\"5671/tcp\":[{\"HostPort\":\"5671\"}], \"8883/tcp\":[{\"HostPort\":\"8883\"}],\"443/tcp\":[{\"HostPort\":\"443\"}]}}}',\n",
" \"env\": {\"UpstreamProtocol\": {\"value\": \"MQTT \"}},\n",
" },\n",
" },\n",
" },\n",
" \"modules\": {\n",
" \"__MODULE_NAME\": {\n",
" \"version\": \"1.0\",\n",
" \"type\": \"docker\",\n",
" \"status\": \"running\",\n",
" \"restartPolicy\": \"always\",\n",
" \"settings\": {\n",
" \"image\": \"__REGISTRY_IMAGE_LOCATION\",\n",
" \"createOptions\": '{\"HostConfig\":{\"PortBindings\":{\"5001/tcp\":[{\"HostPort\":\"5001\"}], \"8883/tcp\":[{\"HostPort\":\"5002\"}],},\"runtime\":\"nvidia\"},\"WorkingDir\":\"/var/azureml-app\"}',\n",
" },\n",
" }\n",
" },\n",
" }\n",
" },\n",
" \"$edgeHub\": {\n",
" \"properties.desired\": {\n",
" \"schemaVersion\": \"1.0\",\n",
" \"routes\": {\n",
" \"machineLearningToIoTHub\": \"FROM /messages/modules/__MODULE_NAME/outputs/amlOutput INTO $upstream\"\n",
" },\n",
" \"storeAndForwardConfiguration\": {\"timeToLiveSecs\": 7200},\n",
" }\n",
" },\n",
" \"__MODULE_NAME\": {\"properties.desired\": {}},\n",
" }\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
@ -393,7 +258,7 @@
"source": [
"module_name = \"tfgpu\"\n",
"\n",
"file = open(\"iotedge-tf-template-gpu.json\")\n",
"file = open(\"src/iotedge-tf-template-gpu.json\")\n",
"contents = file.read()\n",
"contents = contents.replace(\"__MODULE_NAME\", module_name)\n",
"contents = contents.replace(\"__REGISTRY_NAME\", reg_name)\n",
@ -503,7 +368,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
"version": "3.8.5-final"
}
},
"nbformat": 4,

Просмотреть файл

@ -0,0 +1,65 @@
{
"modulesContent": {
"$edgeAgent": {
"properties.desired": {
"schemaVersion": "1.0",
"runtime": {
"type": "docker",
"settings": {
"minDockerVersion": "v1.25",
"loggingOptions": "",
"registryCredentials": {
"__REGISTRY_NAME": {
"username": "__REGISTRY_USER_NAME",
"password": "__REGISTRY_PASSWORD",
"address": "__REGISTRY_NAME.azurecr.io",
}
},
},
},
"systemModules": {
"edgeAgent": {
"type": "docker",
"settings": {
"image": "mcr.microsoft.com/azureiotedge-agent:1.0",
"createOptions": "{}",
"env": {"UpstreamProtocol": {"value": "MQTT"}},
},
},
"edgeHub": {
"type": "docker",
"status": "running",
"restartPolicy": "always",
"settings": {
"image": "mcr.microsoft.com/azureiotedge-hub:1.0",
"createOptions": '{"User":"root","HostConfig":{"PortBindings":{"5671/tcp":[{"HostPort":"5671"}], "8883/tcp":[{"HostPort":"8883"}],"443/tcp":[{"HostPort":"443"}]}}}',
"env": {"UpstreamProtocol": {"value": "MQTT "}},
},
},
},
"modules": {
"__MODULE_NAME": {
"version": "1.0",
"type": "docker",
"status": "running",
"restartPolicy": "always",
"settings": {
"image": "__REGISTRY_IMAGE_LOCATION",
"createOptions": '{"HostConfig":{"PortBindings":{"5001/tcp":[{"HostPort":"5001"}], "8883/tcp":[{"HostPort":"5002"}],},"runtime":"nvidia"},"WorkingDir":"/var/azureml-app"}',
},
}
},
}
},
"$edgeHub": {
"properties.desired": {
"schemaVersion": "1.0",
"routes": {
"machineLearningToIoTHub": "FROM /messages/modules/__MODULE_NAME/outputs/amlOutput INTO $upstream"
},
"storeAndForwardConfiguration": {"timeToLiveSecs": 7200},
}
},
"__MODULE_NAME": {"properties.desired": {}},
}
}

Просмотреть файл

@ -0,0 +1,53 @@
import tensorflow as tf
import numpy as np
import json
import os
from azureml.contrib.services.aml_request import AMLRequest, rawhttp
from azureml.contrib.services.aml_response import AMLResponse
def init():
global session
global input_name
global output_name
session = tf.Session()
# AZUREML_MODEL_DIR is an environment variable created during deployment.
# It is the path to the model folder (./azureml-models/$MODEL_NAME/$VERSION)
# For multiple models, it points to the folder containing all deployed models (./azureml-models)
model_path = os.path.join(os.getenv("AZUREML_MODEL_DIR"), "resnet50")
model = tf.saved_model.loader.load(session, ["serve"], model_path)
if len(model.signature_def["serving_default"].inputs) > 1:
raise ValueError("This score.py only supports one input")
input_name = [
tensor.name for tensor in model.signature_def["serving_default"].inputs.values()
][0]
output_name = [
tensor.name
for tensor in model.signature_def["serving_default"].outputs.values()
]
@rawhttp
def run(request):
if request.method == "POST":
reqBody = request.get_data(False)
resp = score(reqBody)
return AMLResponse(resp, 200)
if request.method == "GET":
respBody = str.encode("GET is not supported")
return AMLResponse(respBody, 405)
return AMLResponse("bad request", 500)
def score(data):
result = session.run(output_name, {input_name: [data]})
return json.dumps(result[1].tolist())
if __name__ == "__main__":
init()
with open("test_image.jpg", "rb") as f:
content = f.read()
print(score(content))

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -22,7 +22,7 @@
"metadata": {},
"outputs": [],
"source": [
"!pip install azureml-core==1.17.0"
"!pip install --upgrade https://aka.ms/triton/packages/tritonclientutils-2.2.0-py3-none-any.whl"
]
},
{
@ -62,7 +62,7 @@
"from pathlib import Path\n",
"\n",
"# Enables us to import helper functions as Python modules\n",
"path_to_insert = \".\"\n",
"path_to_insert = \"src\"\n",
"if path_to_insert not in sys.path:\n",
" sys.path.insert(1, path_to_insert)\n",
"\n",
@ -131,7 +131,7 @@
"\n",
"env = Environment(\"triton-tutorial\")\n",
"env.docker.base_image = None\n",
"env.docker.base_dockerfile = \"triton.dockerfile\"\n",
"env.docker.base_dockerfile = \"DOCKERFILE\"\n",
"env.python.user_managed_dependencies = True\n",
"env.python.interpreter_path = \"/opt/miniconda/bin/python\"\n",
"\n",
@ -139,7 +139,7 @@
"\n",
"inference_config = InferenceConfig(\n",
" # this entry script is where we dispatch a call to the Triton server\n",
" source_directory=\".\",\n",
" source_directory=\"src\",\n",
" entry_script=\"score_densenet.py\",\n",
" environment=env,\n",
")\n",

Просмотреть файл

@ -22,7 +22,7 @@
"metadata": {},
"outputs": [],
"source": [
"!pip install --upgrade https://aka.ms/triton/packages/tritonclientutils-2.2.0-py3-none-any.whl gitpython"
"!pip install --upgrade https://aka.ms/triton/packages/tritonclientutils-2.2.0-py3-none-any.whl"
]
},
{
@ -56,15 +56,10 @@
},
"outputs": [],
"source": [
"import git\n",
"import sys\n",
"from pathlib import Path\n",
"\n",
"# get the root of the repo\n",
"prefix = Path(git.Repo(\".\", search_parent_directories=True).working_tree_dir)\n",
"\n",
"# Enables us to import helper functions as Python modules\n",
"path_to_insert = prefix.joinpath(\"code\", \"deploy\", \"triton\").__str__()\n",
"path_to_insert = \"src\"\n",
"if path_to_insert not in sys.path:\n",
" sys.path.insert(1, path_to_insert)\n",
"\n",
@ -266,7 +261,7 @@
"\n",
"result = postprocess(context_words=cw, answer=res)\n",
"\n",
"print(result)"
"result"
]
},
{

Просмотреть файл

@ -0,0 +1,8 @@
FROM mcr.microsoft.com/azureml/aml-triton
RUN pip install azureml-defaults
# RUN pip install numpy inference-schema[numpy-support]
RUN pip install pillow
RUN pip install nvidia-pyindex
RUN pip install tritonclient[http]
# RUN apt-get update && apt-get install -y libcurl4-openssl-dev

Просмотреть файл

@ -0,0 +1,68 @@
"""score_bidaf.py
Scoring script for use with the Bi-directional Attention Flow model from the ONNX model zoo.
https://github.com/onnx/models/tree/master/text/machine_comprehension/bidirectional_attention_flow
"""
import json
import nltk
import numpy as np
import os
from nltk import word_tokenize
from utils import get_model_info, parse_model_http, triton_init, triton_infer
from tritonclientutils import triton_to_np_dtype
def preprocess(text, dtype):
"""Tokenizes text for use in the bidirectional attention flow model
Parameters
---------
text : str
Text to be tokenized
dtype : numpy datatype
Datatype of the resulting array
Returns
---------
(np.array(), np.array())
Tuple containing two numpy arrays with the tokenized
words and chars, respectively.
From: https://github.com/onnx/models/tree/master/text/machine_comprehension/bidirectional_attention_flow # noqa
"""
nltk.download("punkt")
tokens = word_tokenize(text)
# split into lower-case word tokens, in numpy array with shape of (seq, 1)
words = np.array([w.lower() for w in tokens], dtype=dtype).reshape(-1, 1)
# split words into chars, in numpy array with shape of (seq, 1, 1, 16)
chars = [[c for c in t][:16] for t in tokens]
chars = [cs + [""] * (16 - len(cs)) for cs in chars]
chars = np.array(chars, dtype=dtype).reshape(-1, 1, 1, 16)
return words, chars
def postprocess(context_words, answer):
"""Post-process results to show the chosen result
Parameters
--------
context_words : str
Original context
answer : InferResult
Triton inference result containing start and
end positions of desired answer
Returns
--------
Numpy array containing the words from the context that
answer the given query.
"""
start = answer.as_numpy("start_pos")[0]
end = answer.as_numpy("end_pos")[0]
print(f"start is {start}, end is {end}")
return [w.encode() for w in context_words[start : end + 1].reshape(-1)]

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,5 @@
{
"computeType": "aks",
"containerResourceRequirements": {"cpu": 1, "memoryInGB": 4},
"gpuCores": 1,
}

Просмотреть файл

@ -0,0 +1,47 @@
"""download_models
Downloads models needed for Triton example notebooks.
"""
import os
import urllib
from azure.storage.blob import BlobClient
model_names = ["densenet_onnx", "bidaf-9"]
def download_triton_models(prefix):
for model in model_names:
folder_path, model_file_path = _generate_paths(model, prefix)
url = f"https://aka.ms/{model}-model"
_download_model(model_file_path, folder_path, url)
print(f"successfully downloaded model: {model}")
def delete_triton_models(prefix):
for model in model_names:
_, model_file_path = _generate_paths(model, prefix)
try:
os.remove(model_file_path)
print(f"successfully deleted model: {model}")
except FileNotFoundError:
print(f"model: {model} was already deleted")
def _download_model(model_file_path, folder_path, url):
response = urllib.request.urlopen(url)
blob_client = BlobClient.from_blob_url(response.url)
# save the model if it does not already exist
if not os.path.exists(model_file_path):
os.makedirs(folder_path, exist_ok=True)
with open(model_file_path, "wb") as my_blob:
download_stream = blob_client.download_blob()
my_blob.write(download_stream.readall())
def _generate_paths(model, prefix):
folder_path = prefix.joinpath("models", "triton", model, "1")
model_file_path = prefix.joinpath(folder_path, "model.onnx")
return folder_path, model_file_path

Просмотреть файл

@ -0,0 +1,70 @@
"""
onnxruntimetriton
Offers the class InferenceSession which can be used as a drop-in replacement for the ONNX Runtime
session object.
"""
import tritonclient.http as tritonhttpclient
import numpy as np
class NodeArg:
def __init__(self, name, shape):
self.name = name
self.shape = shape
class InferenceSession:
def __init__(self, path_or_bytes, sess_options=None, providers=[]):
self.client = tritonhttpclient.InferenceServerClient("localhost:8000")
model_metadata = self.client.get_model_metadata(model_name=path_or_bytes)
self.request_count = 0
self.model_name = path_or_bytes
self.inputs = []
self.outputs = []
self.dtype_mapping = {}
for (src, dest) in (
(model_metadata["inputs"], self.inputs),
(model_metadata["outputs"], self.outputs),
):
for element in src:
dest.append(NodeArg(element["name"], element["shape"]))
self.dtype_mapping[element["name"]] = element["datatype"]
self.triton_enabled = True
def get_inputs(self):
return self.inputs
def get_outputs(self):
return self.outputs
def run(self, output_names, input_feed, run_options=None):
inputs = []
for key, val in input_feed.items():
val = np.expand_dims(val, axis=0)
input = tritonhttpclient.InferInput(key, val.shape, self.dtype_mapping[key])
input.set_data_from_numpy(val)
inputs.append(input)
outputs = []
for output_name in output_names:
output = tritonhttpclient.InferRequestedOutput(output_name)
outputs.append(output)
res = self.client.infer(
self.model_name,
inputs,
request_id=str(self.request_count),
outputs=outputs,
)
results = []
for output_name in output_names:
results.append(res.as_numpy(output_name))
return results

Просмотреть файл

@ -0,0 +1,108 @@
import io
import numpy as np
import os
from azureml.core import Model
from azureml.contrib.services.aml_request import rawhttp
from azureml.contrib.services.aml_response import AMLResponse
from PIL import Image
from utils import get_model_info, parse_model_http, triton_init, triton_infer
from onnxruntimetriton import InferenceSession
def preprocess(img, scaling): # , dtype):
"""Pre-process an image to meet the size, type and format
requirements specified by the parameters.
"""
c = 3
h = 224
w = 224
format = "FORMAT_NCHW"
if c == 1:
sample_img = img.convert("L")
else:
sample_img = img.convert("RGB")
resized_img = sample_img.resize((w, h), Image.BILINEAR)
resized = np.array(resized_img)
if resized.ndim == 2:
resized = resized[:, :, np.newaxis]
# npdtype = triton_to_np_dtype(dtype)
typed = resized.astype(np.float32)
# typed = resized
if scaling == "INCEPTION":
scaled = (typed / 128) - 1
elif scaling == "VGG":
if c == 1:
scaled = typed - np.asarray((128,), dtype=npdtype)
else:
scaled = typed - np.asarray((123, 117, 104), dtype=npdtype)
else:
scaled = typed
# Swap to CHW if necessary
if format == "FORMAT_NCHW":
ordered = np.transpose(scaled, (2, 0, 1))
else:
ordered = scaled
# Channels are in RGB order. Currently model configuration data
# doesn't provide any information as to other channel orderings
# (like BGR) so we just assume RGB.
return ordered
def postprocess(output_array):
"""Post-process results to show the predicted label."""
output_array = output_array[0]
max_label = np.argmax(output_array)
final_label = label_dict[max_label]
return f"{max_label} : {final_label}"
def init():
global session, label_dict
session = InferenceSession(path_or_bytes="densenet_onnx")
model_dir = os.path.join(os.environ["AZUREML_MODEL_DIR"], "models")
folder_path = os.path.join(model_dir, "triton", "densenet_onnx")
label_path = os.path.join(
model_dir, "triton", "densenet_onnx", "densenet_labels.txt"
)
label_file = open(label_path, "r")
labels = label_file.read().split("\n")
label_dict = dict(enumerate(labels))
@rawhttp
def run(request):
"""This function is called every time your webservice receives a request.
Notice you need to know the names and data types of the model inputs and
outputs. You can get these values by reading the model configuration file
or by querying the model metadata endpoint.
"""
if request.method == "POST":
outputs = []
for output in session.get_outputs():
outputs.append(output.name)
input_name = session.get_inputs()[0].name
reqBody = request.get_data(False)
img = Image.open(io.BytesIO(reqBody))
image_data = preprocess(img, scaling="INCEPTION")
res = session.run(outputs, {input_name: image_data})
result = postprocess(output_array=res)
return AMLResponse(result, 200)
else:
return AMLResponse("bad request", 500)

Просмотреть файл

@ -0,0 +1,47 @@
"""test_service.py
Sends a specified image from the data directory to a deployed ML model
and returns the result.
"""
import argparse
import os
import requests
from azureml.core.webservice import AksWebservice
from azureml.core.workspace import Workspace
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test a deployed endpoint.")
parser.add_argument(
"--endpoint_name",
type=str,
default="triton-densenet-onnx",
help="name of the endpoint to test",
)
parser.add_argument(
"--data_file",
type=str,
default="../../data/raw/triton/peacock.jpg",
help="filename to run through the classifier",
)
args = parser.parse_args()
ws = Workspace.from_config()
aks_service = AksWebservice(ws, args.endpoint_name)
# if (key) auth is enabled, fetch keys and include in the request
key1, _ = aks_service.get_keys()
headers = {
"Content-Type": "application/octet-stream",
"Authorization": "Bearer " + key1,
}
file_name = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "..", "data", args.data_file,
)
test_sample = open(file_name, "rb").read()
resp = requests.post(aks_service.scoring_uri, test_sample, headers=headers)
print(resp.text)

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,121 @@
import tritonhttpclient
def triton_init(url="localhost:8000"):
"""Initializes the triton client to point at the specified URL
Parameter
----------
url : str
The URL on which to address the Triton server, defaults to
localhost:8000
"""
global triton_client
triton_client = tritonhttpclient.InferenceServerClient(url)
return triton_client
def get_model_info():
"""Gets metadata for all models hosted behind the Triton endpoint.
Useful for confirming that your models were loaded into memory.
Prints the data to STDOUT.
"""
repo_index = triton_client.get_model_repository_index()
for model in repo_index:
model_name = model["name"]
model_version = model["version"]
(input_meta, input_config, output_meta, output_config,) = parse_model_http(
model_name=model_name, model_version=model_version
)
print(
f"Found model: {model_name}, version: {model_version}, \
input meta: {input_meta}, input config: {input_config}, \
output_meta: {output_meta}, output config: {output_config}"
)
def parse_model_http(model_name, model_version=""):
"""Check the configuration of a model to make sure it meets the
requirements for an image classification network (as expected by
this client)
Arguments
--------
model_name : str
Name of the model whose metadata you want to fetch
model_version : str
Optional, the version of the model, defaults to empty string.
From https://github.com/triton-inference-server/server/blob/master/src/clients/python/examples/image_client.py # noqa
"""
model_metadata = triton_client.get_model_metadata(
model_name=model_name, model_version=model_version
)
model_config = triton_client.get_model_config(
model_name=model_name, model_version=model_version
)
return (
model_metadata["inputs"],
model_config["input"],
model_metadata["outputs"],
model_config["output"],
)
def triton_infer(
input_mapping, model_name, binary_data=False, binary_output=False, class_count=0,
):
"""Helper function for setting Triton inputs and executing a request
Arguments
----------
input_mapping : dict
A dictionary mapping strings to numpy arrays. The keys should
be the names of the model inputs, and the values should be the
inputs themselves.
model_name : str
The name of the model on which you are running inference.
binary_data : bool
Whether you are expecting binary input and output. Defaults to False
class_count : int
If the model is a classification model, the number of output classes.
Defaults to 0, indicating this is not a classification model.
Returns
----------
res : InferResult
Triton inference result containing output from running prediction
"""
input_meta, _, output_meta, _ = parse_model_http(model_name)
inputs = []
outputs = []
# Populate the inputs array
for in_meta in input_meta:
input_name = in_meta["name"]
data = input_mapping[input_name]
input = tritonhttpclient.InferInput(input_name, data.shape, in_meta["datatype"])
input.set_data_from_numpy(data, binary_data=binary_data)
inputs.append(input)
# Populate the outputs array
for out_meta in output_meta:
output_name = out_meta["name"]
output = tritonhttpclient.InferRequestedOutput(
output_name, binary_data=binary_output, class_count=class_count
)
outputs.append(output)
# Run inference
res = triton_client.infer(model_name, inputs, request_id="0", outputs=outputs)
return res

Просмотреть файл

@ -1,584 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile pytorch-lightning.yml\n",
"channels:\n",
" - conda-forge\n",
"dependencies:\n",
" - python=3.7\n",
" - pip\n",
" - pip:\n",
" - azureml-defaults\n",
" - torch==1.6.0\n",
" - torchvision==0.7.0\n",
" - pytorch-lightning==1.0.4\n",
" - mlflow\n",
" - azureml-mlflow"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile azureml_env_adapter.py\n",
"import os\n",
"\n",
"\n",
"def set_environment_variables(single_node=False, master_port=6105):\n",
" # os.environ[\"RANK\"] = os.environ[\"OMPI_COMM_WORLD_RANK\"]\n",
" # os.environ[\"WORLD_SIZE\"] = os.environ[\"OMPI_COMM_WORLD_SIZE\"]\n",
"\n",
" if not single_node:\n",
" master_node_params = os.environ[\"AZ_BATCH_MASTER_NODE\"].split(\":\")\n",
" os.environ[\"MASTER_ADDR\"] = master_node_params[0]\n",
"\n",
" # Do not overwrite master port with that defined in AZ_BATCH_MASTER_NODE\n",
" if \"MASTER_PORT\" not in os.environ:\n",
" os.environ[\"MASTER_PORT\"] = str(master_port)\n",
" else:\n",
" os.environ[\"MASTER_ADDR\"] = os.environ[\"AZ_BATCHAI_MPI_MASTER_NODE\"]\n",
" os.environ[\"MASTER_PORT\"] = \"54965\"\n",
" print(\n",
" \"NCCL_SOCKET_IFNAME original value = {}\".format(\n",
" os.environ[\"NCCL_SOCKET_IFNAME\"]\n",
" )\n",
" )\n",
"\n",
" os.environ[\"NCCL_SOCKET_IFNAME\"] = \"^docker0,lo\"\n",
" os.environ[\"NODE_RANK\"] = os.environ[\n",
" \"OMPI_COMM_WORLD_RANK\"\n",
" ] # node rank is the world_rank from mpi run\n",
" # print(\"RANK = {}\".format(os.environ[\"RANK\"]))\n",
" # print(\"WORLD_SIZE = {}\".format(os.environ[\"WORLD_SIZE\"]))\n",
" print(\"MASTER_ADDR = {}\".format(os.environ[\"MASTER_ADDR\"]))\n",
" print(\"MASTER_PORT = {}\".format(os.environ[\"MASTER_PORT\"]))\n",
" print(\"NODE_RANK = {}\".format(os.environ[\"NODE_RANK\"]))\n",
" print(\n",
" \"NCCL_SOCKET_IFNAME new value = {}\".format(\n",
" os.environ[\"NCCL_SOCKET_IFNAME\"]\n",
" )\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile train.py\n",
"# Copyright The PyTorch Lightning team.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"#\n",
"# Script from: https://github.com/PyTorchLightning/pytorch-lightning/blob/1.0.0rc2/pl_examples/basic_examples/autoencoder.py\n",
"\n",
"from argparse import ArgumentParser\n",
"import torch\n",
"from torch import nn\n",
"import torch.nn.functional as F\n",
"from torch.utils.data import DataLoader\n",
"import pytorch_lightning as pl\n",
"from torch.utils.data import random_split\n",
"\n",
"try:\n",
" from torchvision.datasets.mnist import MNIST\n",
" from torchvision import transforms\n",
"except ModuleNotFoundError:\n",
" from tests.base.datasets import MNIST\n",
"\n",
"\n",
"class LitAutoEncoder(pl.LightningModule):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.encoder = nn.Sequential(\n",
" nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3)\n",
" )\n",
" self.decoder = nn.Sequential(\n",
" nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28)\n",
" )\n",
"\n",
" def forward(self, x):\n",
" # in lightning, forward defines the prediction/inference actions\n",
" embedding = self.encoder(x)\n",
" return embedding\n",
"\n",
" def training_step(self, batch, batch_idx):\n",
" x, y = batch\n",
" x = x.view(x.size(0), -1)\n",
" z = self.encoder(x)\n",
" x_hat = self.decoder(z)\n",
" loss = F.mse_loss(x_hat, x)\n",
" return loss\n",
"\n",
" def configure_optimizers(self):\n",
" optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)\n",
" return optimizer\n",
"\n",
"\n",
"def cli_main():\n",
" pl.seed_everything(1234)\n",
"\n",
" # ------------\n",
" # args\n",
" # ------------\n",
" parser = ArgumentParser()\n",
" parser.add_argument(\"--batch_size\", default=32, type=int)\n",
" parser.add_argument(\"--hidden_dim\", type=int, default=128)\n",
" parser = pl.Trainer.add_argparse_args(parser)\n",
" args = parser.parse_args()\n",
"\n",
" # ------------\n",
" # data\n",
" # ------------\n",
" dataset = MNIST(\n",
" \"\", train=True, download=True, transform=transforms.ToTensor()\n",
" )\n",
" mnist_test = MNIST(\n",
" \"\", train=False, download=True, transform=transforms.ToTensor()\n",
" )\n",
" mnist_train, mnist_val = random_split(dataset, [55000, 5000])\n",
"\n",
" train_loader = DataLoader(mnist_train, batch_size=args.batch_size)\n",
" val_loader = DataLoader(mnist_val, batch_size=args.batch_size)\n",
" test_loader = DataLoader(mnist_test, batch_size=args.batch_size)\n",
"\n",
" # ------------\n",
" # model\n",
" # ------------\n",
" model = LitAutoEncoder()\n",
"\n",
" # ------------\n",
" # training\n",
" # ------------\n",
" trainer = pl.Trainer.from_argparse_args(args)\n",
" trainer.fit(model, train_loader, val_loader)\n",
"\n",
" # ------------\n",
" # testing\n",
" # ------------\n",
" result = trainer.test(test_dataloaders=test_loader)\n",
" print(result)\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" cli_main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile train-multi-node.py\n",
"# Copyright The PyTorch Lightning team.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"#\n",
"# Script from: https://github.com/PyTorchLightning/pytorch-lightning/blob/1.0.0rc2/pl_examples/basic_examples/autoencoder.py\n",
"\n",
"from argparse import ArgumentParser\n",
"import torch\n",
"from torch import nn\n",
"import torch.nn.functional as F\n",
"from torch.utils.data import DataLoader\n",
"import pytorch_lightning as pl\n",
"from torch.utils.data import random_split\n",
"from azureml_env_adapter import set_environment_variables\n",
"\n",
"try:\n",
" from torchvision.datasets.mnist import MNIST\n",
" from torchvision import transforms\n",
"except ModuleNotFoundError:\n",
" from tests.base.datasets import MNIST\n",
"\n",
"\n",
"class LitAutoEncoder(pl.LightningModule):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.encoder = nn.Sequential(\n",
" nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3)\n",
" )\n",
" self.decoder = nn.Sequential(\n",
" nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28)\n",
" )\n",
"\n",
" def forward(self, x):\n",
" # in lightning, forward defines the prediction/inference actions\n",
" embedding = self.encoder(x)\n",
" return embedding\n",
"\n",
" def training_step(self, batch, batch_idx):\n",
" x, y = batch\n",
" x = x.view(x.size(0), -1)\n",
" z = self.encoder(x)\n",
" x_hat = self.decoder(z)\n",
" loss = F.mse_loss(x_hat, x)\n",
" return loss\n",
"\n",
" def configure_optimizers(self):\n",
" optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)\n",
" return optimizer\n",
"\n",
"\n",
"def cli_main():\n",
" pl.seed_everything(1234)\n",
"\n",
" # ------------\n",
" # args\n",
" # ------------\n",
" parser = ArgumentParser()\n",
" parser.add_argument(\"--batch_size\", default=32, type=int)\n",
" parser.add_argument(\"--hidden_dim\", type=int, default=128)\n",
" parser = pl.Trainer.add_argparse_args(parser)\n",
" args = parser.parse_args()\n",
"\n",
" # set azureml env vars for multi-node ddp\n",
" set_environment_variables(single_node=int(args.num_nodes) > 1)\n",
"\n",
" # ------------\n",
" # data\n",
" # ------------\n",
" dataset = MNIST(\n",
" \"\", train=True, download=True, transform=transforms.ToTensor()\n",
" )\n",
" mnist_test = MNIST(\n",
" \"\", train=False, download=True, transform=transforms.ToTensor()\n",
" )\n",
" mnist_train, mnist_val = random_split(dataset, [55000, 5000])\n",
"\n",
" train_loader = DataLoader(mnist_train, batch_size=args.batch_size)\n",
" val_loader = DataLoader(mnist_val, batch_size=args.batch_size)\n",
" test_loader = DataLoader(mnist_test, batch_size=args.batch_size)\n",
"\n",
" # ------------\n",
" # model\n",
" # ------------\n",
" model = LitAutoEncoder()\n",
"\n",
" # ------------\n",
" # training\n",
" # ------------\n",
" trainer = pl.Trainer.from_argparse_args(args)\n",
" trainer.fit(model, train_loader, val_loader)\n",
"\n",
" # ------------\n",
" # testing\n",
" # ------------\n",
" result = trainer.test(test_dataloaders=test_loader)\n",
" print(result)\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" cli_main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile train-with-mlflow-logging.py\n",
"# Copyright The PyTorch Lightning team.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"#\n",
"# Script from: https://github.com/PyTorchLightning/pytorch-lightning/blob/1.0.0rc2/pl_examples/basic_examples/autoencoder.py\n",
"\n",
"from argparse import ArgumentParser\n",
"import torch\n",
"from torch import nn\n",
"import torch.nn.functional as F\n",
"from torch.utils.data import DataLoader\n",
"import pytorch_lightning as pl\n",
"from torch.utils.data import random_split\n",
"from pytorch_lightning.loggers import MLFlowLogger\n",
"import mlflow\n",
"from azureml.core import Run\n",
"\n",
"try:\n",
" from torchvision.datasets.mnist import MNIST\n",
" from torchvision import transforms\n",
"except ModuleNotFoundError:\n",
" from tests.base.datasets import MNIST\n",
"\n",
"\n",
"class LitAutoEncoder(pl.LightningModule):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.encoder = nn.Sequential(\n",
" nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3)\n",
" )\n",
" self.decoder = nn.Sequential(\n",
" nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28)\n",
" )\n",
"\n",
" def forward(self, x):\n",
" # in lightning, forward defines the prediction/inference actions\n",
" embedding = self.encoder(x)\n",
" return embedding\n",
"\n",
" def training_step(self, batch, batch_idx):\n",
" x, y = batch\n",
" x = x.view(x.size(0), -1)\n",
" z = self.encoder(x)\n",
" x_hat = self.decoder(z)\n",
" loss = F.mse_loss(x_hat, x)\n",
" self.log(\"loss\", loss, on_epoch=True, on_step=False)\n",
" return loss\n",
"\n",
" def configure_optimizers(self):\n",
" optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)\n",
" return optimizer\n",
"\n",
"\n",
"def cli_main():\n",
" pl.seed_everything(1234)\n",
"\n",
" # ------------\n",
" # args\n",
" # ------------\n",
" parser = ArgumentParser()\n",
" parser.add_argument(\"--batch_size\", default=32, type=int)\n",
" parser.add_argument(\"--hidden_dim\", type=int, default=128)\n",
" parser = pl.Trainer.add_argparse_args(parser)\n",
" args = parser.parse_args()\n",
"\n",
" # ------------\n",
" # data\n",
" # ------------\n",
" dataset = MNIST(\n",
" \"\", train=True, download=True, transform=transforms.ToTensor()\n",
" )\n",
" mnist_test = MNIST(\n",
" \"\", train=False, download=True, transform=transforms.ToTensor()\n",
" )\n",
" mnist_train, mnist_val = random_split(dataset, [55000, 5000])\n",
"\n",
" train_loader = DataLoader(mnist_train, batch_size=args.batch_size)\n",
" val_loader = DataLoader(mnist_val, batch_size=args.batch_size)\n",
" test_loader = DataLoader(mnist_test, batch_size=args.batch_size)\n",
"\n",
" # ------------\n",
" # model\n",
" # ------------\n",
" model = LitAutoEncoder()\n",
"\n",
" # ------------\n",
" # logging\n",
" # ------------\n",
" # get azureml run object\n",
" run = Run.get_context()\n",
" # get the tracking uri for the azureml workspace\n",
" mlflow_uri = run.experiment.workspace.get_mlflow_tracking_uri()\n",
" # get the azureml experiment name\n",
" exp_name = run.experiment.name\n",
"\n",
" mlf_logger = MLFlowLogger(\n",
" experiment_name=exp_name, tracking_uri=mlflow_uri\n",
" )\n",
" # link the mlflowlogger run ID to the azureml run ID\n",
" mlf_logger._run_id = run.id\n",
"\n",
" # ------------\n",
" # training\n",
" # ------------\n",
" trainer = pl.Trainer.from_argparse_args(args, logger=mlf_logger)\n",
" trainer.fit(model, train_loader, val_loader)\n",
"\n",
" # ------------\n",
" # testing\n",
" # ------------\n",
" result = trainer.test(test_dataloaders=test_loader)\n",
" print(result)\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" cli_main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile train-with-tensorboard-logging.py\n",
"# Copyright The PyTorch Lightning team.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"#\n",
"# Script from: https://github.com/PyTorchLightning/pytorch-lightning/blob/1.0.0rc2/pl_examples/basic_examples/autoencoder.py\n",
"\n",
"from argparse import ArgumentParser\n",
"import torch\n",
"from torch import nn\n",
"import torch.nn.functional as F\n",
"from torch.utils.data import DataLoader\n",
"import pytorch_lightning as pl\n",
"from torch.utils.data import random_split\n",
"from pytorch_lightning.loggers import TensorBoardLogger\n",
"\n",
"try:\n",
" from torchvision.datasets.mnist import MNIST\n",
" from torchvision import transforms\n",
"except ModuleNotFoundError:\n",
" from tests.base.datasets import MNIST\n",
"\n",
"\n",
"class LitAutoEncoder(pl.LightningModule):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.encoder = nn.Sequential(\n",
" nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3)\n",
" )\n",
" self.decoder = nn.Sequential(\n",
" nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28)\n",
" )\n",
"\n",
" def forward(self, x):\n",
" # in lightning, forward defines the prediction/inference actions\n",
" embedding = self.encoder(x)\n",
" return embedding\n",
"\n",
" def training_step(self, batch, batch_idx):\n",
" x, y = batch\n",
" x = x.view(x.size(0), -1)\n",
" z = self.encoder(x)\n",
" x_hat = self.decoder(z)\n",
" loss = F.mse_loss(x_hat, x)\n",
" self.log(\"loss\", loss)\n",
" return loss\n",
"\n",
" def configure_optimizers(self):\n",
" optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)\n",
" return optimizer\n",
"\n",
"\n",
"def cli_main():\n",
" pl.seed_everything(1234)\n",
"\n",
" # ------------\n",
" # args\n",
" # ------------\n",
" parser = ArgumentParser()\n",
" parser.add_argument(\"--batch_size\", default=32, type=int)\n",
" parser.add_argument(\"--hidden_dim\", type=int, default=128)\n",
" parser.add_argument(\"--logdir\", type=str, default=\"./logs\")\n",
" parser = pl.Trainer.add_argparse_args(parser)\n",
" args = parser.parse_args()\n",
"\n",
" # ------------\n",
" # data\n",
" # ------------\n",
" dataset = MNIST(\n",
" \"\", train=True, download=True, transform=transforms.ToTensor()\n",
" )\n",
" mnist_test = MNIST(\n",
" \"\", train=False, download=True, transform=transforms.ToTensor()\n",
" )\n",
" mnist_train, mnist_val = random_split(dataset, [55000, 5000])\n",
"\n",
" train_loader = DataLoader(mnist_train, batch_size=args.batch_size)\n",
" val_loader = DataLoader(mnist_val, batch_size=args.batch_size)\n",
" test_loader = DataLoader(mnist_test, batch_size=args.batch_size)\n",
"\n",
" # ------------\n",
" # model\n",
" # ------------\n",
" model = LitAutoEncoder()\n",
"\n",
" # ------------\n",
" # logging\n",
" # ------------\n",
" tb_logger = TensorBoardLogger(args.logdir)\n",
"\n",
" # ------------\n",
" # training\n",
" # ------------\n",
" trainer = pl.Trainer.from_argparse_args(args, logger=tb_logger)\n",
" trainer.fit(model, train_loader, val_loader)\n",
"\n",
" # ------------\n",
" # testing\n",
" # ------------\n",
" result = trainer.test(test_dataloaders=test_loader)\n",
" print(result)\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" cli_main()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.8",
"language": "python",
"name": "python3.8"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.7"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

Просмотреть файл

@ -41,15 +41,15 @@
"outputs": [],
"source": [
"# training script\n",
"source_dir = \".\"\n",
"source_dir = \"src\"\n",
"script_name = \"train.py\"\n",
"\n",
"# environment file\n",
"environment_file = \"pytorch-lightning.yml\"\n",
"environment_file = \"environment.yml\"\n",
"\n",
"# azure ml settings\n",
"environment_name = \"pt-lightning\"\n",
"experiment_name = \"pt-lightning-example\"\n",
"experiment_name = \"pt-lightning-tutorial\"\n",
"compute_name = \"gpu-K80-2\""
]
},

Просмотреть файл

@ -30,15 +30,15 @@
"outputs": [],
"source": [
"# training script\n",
"source_dir = \".\"\n",
"source_dir = \"src\"\n",
"script_name = \"train-with-tensorboard-logging.py\"\n",
"\n",
"# environment file\n",
"environment_file = \"pytorch-lightning.yml\"\n",
"environment_file = \"environment.yml\"\n",
"\n",
"# azure ml settings\n",
"environment_name = \"pt-lightning\"\n",
"experiment_name = \"pt-lightning-tensorboard-example\"\n",
"experiment_name = \"pt-lightning-tensorboard-tutorial\"\n",
"compute_name = \"gpu-K80-2\""
]
},

Просмотреть файл

@ -30,15 +30,15 @@
"outputs": [],
"source": [
"# training script\n",
"source_dir = \".\"\n",
"source_dir = \"src\"\n",
"script_name = \"train-with-mlflow-logging.py\"\n",
"\n",
"# environment file\n",
"environment_file = \"pytorch-lightning.yml\"\n",
"environment_file = \"environment.yml\"\n",
"\n",
"# azure ml settings\n",
"environment_name = \"pt-lightning\"\n",
"experiment_name = \"pt-lightning-mlflow-example\"\n",
"experiment_name = \"pt-lightning-mlflow-tutorial\"\n",
"compute_name = \"gpu-K80-2\""
]
},

Просмотреть файл

@ -28,15 +28,15 @@
"outputs": [],
"source": [
"# training script\n",
"source_dir = \".\"\n",
"source_dir = \"src\"\n",
"script_name = \"train-multi-node.py\"\n",
"\n",
"# environment file\n",
"environment_file = \"pytorch-lightning.yml\"\n",
"environment_file = \"environment.yml\"\n",
"\n",
"# azure ml settings\n",
"environment_name = \"pt-lightning\"\n",
"experiment_name = \"pt-lightning-ddp-example\"\n",
"experiment_name = \"pt-lightning-ddp-tutorial\"\n",
"compute_name = \"gpu-K80-2\""
]
},

Просмотреть файл

@ -4,7 +4,7 @@ description: learn how to train and log metrics with [PyTorch Lightning](https:/
[PyTorch Lightning](https://github.com/PyTorchLightning/pytorch-lightning) is a lightweight open-source library that provides a high-level interface for PyTorch.
The model training code for this tutorial can be found [here](../../code/train/pytorch-lightning). This tutorial goes over the steps to run PyTorch Lightning on Azure ML, and it includes the following parts:
The model training code for this tutorial can be found in [`src`](src). This tutorial goes over the steps to run PyTorch Lightning on Azure ML, and it includes the following parts:
1. [train-single-node](1.train-single-node.ipynb): Train single-node and single-node, multi-GPU PyTorch Lightning on Azure ML.
2. [log-with-tensorboard](2.log-with-tensorboard.ipynb): Use Lightning's built-in TensorBoardLogger to log metrics and leverage Azure ML's TensorBoard integration.

Просмотреть файл

@ -0,0 +1,12 @@
channels:
- conda-forge
dependencies:
- python=3.7
- pip
- pip:
- azureml-defaults
- torch==1.6.0
- torchvision==0.7.0
- pytorch-lightning==1.0.4
- mlflow
- azureml-mlflow

Просмотреть файл

@ -0,0 +1,33 @@
import os
def set_environment_variables(single_node=False, master_port=6105):
# os.environ["RANK"] = os.environ["OMPI_COMM_WORLD_RANK"]
# os.environ["WORLD_SIZE"] = os.environ["OMPI_COMM_WORLD_SIZE"]
if not single_node:
master_node_params = os.environ["AZ_BATCH_MASTER_NODE"].split(":")
os.environ["MASTER_ADDR"] = master_node_params[0]
# Do not overwrite master port with that defined in AZ_BATCH_MASTER_NODE
if "MASTER_PORT" not in os.environ:
os.environ["MASTER_PORT"] = str(master_port)
else:
os.environ["MASTER_ADDR"] = os.environ["AZ_BATCHAI_MPI_MASTER_NODE"]
os.environ["MASTER_PORT"] = "54965"
print(
"NCCL_SOCKET_IFNAME original value = {}".format(
os.environ["NCCL_SOCKET_IFNAME"]
)
)
os.environ["NCCL_SOCKET_IFNAME"] = "^docker0,lo"
os.environ["NODE_RANK"] = os.environ[
"OMPI_COMM_WORLD_RANK"
] # node rank is the world_rank from mpi run
# print("RANK = {}".format(os.environ["RANK"]))
# print("WORLD_SIZE = {}".format(os.environ["WORLD_SIZE"]))
print("MASTER_ADDR = {}".format(os.environ["MASTER_ADDR"]))
print("MASTER_PORT = {}".format(os.environ["MASTER_PORT"]))
print("NODE_RANK = {}".format(os.environ["NODE_RANK"]))
print("NCCL_SOCKET_IFNAME new value = {}".format(os.environ["NCCL_SOCKET_IFNAME"]))

Просмотреть файл

@ -0,0 +1,106 @@
# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script from: https://github.com/PyTorchLightning/pytorch-lightning/blob/1.0.0rc2/pl_examples/basic_examples/autoencoder.py
from argparse import ArgumentParser
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import pytorch_lightning as pl
from torch.utils.data import random_split
from azureml_env_adapter import set_environment_variables
try:
from torchvision.datasets.mnist import MNIST
from torchvision import transforms
except ModuleNotFoundError:
from tests.base.datasets import MNIST
class LitAutoEncoder(pl.LightningModule):
def __init__(self):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3)
)
self.decoder = nn.Sequential(
nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28)
)
def forward(self, x):
# in lightning, forward defines the prediction/inference actions
embedding = self.encoder(x)
return embedding
def training_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
return loss
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
def cli_main():
pl.seed_everything(1234)
# ------------
# args
# ------------
parser = ArgumentParser()
parser.add_argument("--batch_size", default=32, type=int)
parser.add_argument("--hidden_dim", type=int, default=128)
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# set azureml env vars for multi-node ddp
set_environment_variables(single_node=int(args.num_nodes) > 1)
# ------------
# data
# ------------
dataset = MNIST("", train=True, download=True, transform=transforms.ToTensor())
mnist_test = MNIST("", train=False, download=True, transform=transforms.ToTensor())
mnist_train, mnist_val = random_split(dataset, [55000, 5000])
train_loader = DataLoader(mnist_train, batch_size=args.batch_size)
val_loader = DataLoader(mnist_val, batch_size=args.batch_size)
test_loader = DataLoader(mnist_test, batch_size=args.batch_size)
# ------------
# model
# ------------
model = LitAutoEncoder()
# ------------
# training
# ------------
trainer = pl.Trainer.from_argparse_args(args)
trainer.fit(model, train_loader, val_loader)
# ------------
# testing
# ------------
result = trainer.test(test_dataloaders=test_loader)
print(result)
if __name__ == "__main__":
cli_main()

Просмотреть файл

@ -0,0 +1,120 @@
# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script from: https://github.com/PyTorchLightning/pytorch-lightning/blob/1.0.0rc2/pl_examples/basic_examples/autoencoder.py
from argparse import ArgumentParser
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import pytorch_lightning as pl
from torch.utils.data import random_split
from pytorch_lightning.loggers import MLFlowLogger
import mlflow
from azureml.core import Run
try:
from torchvision.datasets.mnist import MNIST
from torchvision import transforms
except ModuleNotFoundError:
from tests.base.datasets import MNIST
class LitAutoEncoder(pl.LightningModule):
def __init__(self):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3)
)
self.decoder = nn.Sequential(
nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28)
)
def forward(self, x):
# in lightning, forward defines the prediction/inference actions
embedding = self.encoder(x)
return embedding
def training_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log("loss", loss, on_epoch=True, on_step=False)
return loss
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
def cli_main():
pl.seed_everything(1234)
# ------------
# args
# ------------
parser = ArgumentParser()
parser.add_argument("--batch_size", default=32, type=int)
parser.add_argument("--hidden_dim", type=int, default=128)
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# ------------
# data
# ------------
dataset = MNIST("", train=True, download=True, transform=transforms.ToTensor())
mnist_test = MNIST("", train=False, download=True, transform=transforms.ToTensor())
mnist_train, mnist_val = random_split(dataset, [55000, 5000])
train_loader = DataLoader(mnist_train, batch_size=args.batch_size)
val_loader = DataLoader(mnist_val, batch_size=args.batch_size)
test_loader = DataLoader(mnist_test, batch_size=args.batch_size)
# ------------
# model
# ------------
model = LitAutoEncoder()
# ------------
# logging
# ------------
# get azureml run object
run = Run.get_context()
# get the tracking uri for the azureml workspace
mlflow_uri = run.experiment.workspace.get_mlflow_tracking_uri()
# get the azureml experiment name
exp_name = run.experiment.name
mlf_logger = MLFlowLogger(experiment_name=exp_name, tracking_uri=mlflow_uri)
# link the mlflowlogger run ID to the azureml run ID
mlf_logger._run_id = run.id
# ------------
# training
# ------------
trainer = pl.Trainer.from_argparse_args(args, logger=mlf_logger)
trainer.fit(model, train_loader, val_loader)
# ------------
# testing
# ------------
result = trainer.test(test_dataloaders=test_loader)
print(result)
if __name__ == "__main__":
cli_main()

Просмотреть файл

@ -0,0 +1,110 @@
# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script from: https://github.com/PyTorchLightning/pytorch-lightning/blob/1.0.0rc2/pl_examples/basic_examples/autoencoder.py
from argparse import ArgumentParser
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import pytorch_lightning as pl
from torch.utils.data import random_split
from pytorch_lightning.loggers import TensorBoardLogger
try:
from torchvision.datasets.mnist import MNIST
from torchvision import transforms
except ModuleNotFoundError:
from tests.base.datasets import MNIST
class LitAutoEncoder(pl.LightningModule):
def __init__(self):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3)
)
self.decoder = nn.Sequential(
nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28)
)
def forward(self, x):
# in lightning, forward defines the prediction/inference actions
embedding = self.encoder(x)
return embedding
def training_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
self.log("loss", loss)
return loss
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
def cli_main():
pl.seed_everything(1234)
# ------------
# args
# ------------
parser = ArgumentParser()
parser.add_argument("--batch_size", default=32, type=int)
parser.add_argument("--hidden_dim", type=int, default=128)
parser.add_argument("--logdir", type=str, default="./logs")
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# ------------
# data
# ------------
dataset = MNIST("", train=True, download=True, transform=transforms.ToTensor())
mnist_test = MNIST("", train=False, download=True, transform=transforms.ToTensor())
mnist_train, mnist_val = random_split(dataset, [55000, 5000])
train_loader = DataLoader(mnist_train, batch_size=args.batch_size)
val_loader = DataLoader(mnist_val, batch_size=args.batch_size)
test_loader = DataLoader(mnist_test, batch_size=args.batch_size)
# ------------
# model
# ------------
model = LitAutoEncoder()
# ------------
# logging
# ------------
tb_logger = TensorBoardLogger(args.logdir)
# ------------
# training
# ------------
trainer = pl.Trainer.from_argparse_args(args, logger=tb_logger)
trainer.fit(model, train_loader, val_loader)
# ------------
# testing
# ------------
result = trainer.test(test_dataloaders=test_loader)
print(result)
if __name__ == "__main__":
cli_main()

Просмотреть файл

@ -0,0 +1,102 @@
# Copyright The PyTorch Lightning team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Script from: https://github.com/PyTorchLightning/pytorch-lightning/blob/1.0.0rc2/pl_examples/basic_examples/autoencoder.py
from argparse import ArgumentParser
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import pytorch_lightning as pl
from torch.utils.data import random_split
try:
from torchvision.datasets.mnist import MNIST
from torchvision import transforms
except ModuleNotFoundError:
from tests.base.datasets import MNIST
class LitAutoEncoder(pl.LightningModule):
def __init__(self):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(28 * 28, 64), nn.ReLU(), nn.Linear(64, 3)
)
self.decoder = nn.Sequential(
nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 28 * 28)
)
def forward(self, x):
# in lightning, forward defines the prediction/inference actions
embedding = self.encoder(x)
return embedding
def training_step(self, batch, batch_idx):
x, y = batch
x = x.view(x.size(0), -1)
z = self.encoder(x)
x_hat = self.decoder(z)
loss = F.mse_loss(x_hat, x)
return loss
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
return optimizer
def cli_main():
pl.seed_everything(1234)
# ------------
# args
# ------------
parser = ArgumentParser()
parser.add_argument("--batch_size", default=32, type=int)
parser.add_argument("--hidden_dim", type=int, default=128)
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# ------------
# data
# ------------
dataset = MNIST("", train=True, download=True, transform=transforms.ToTensor())
mnist_test = MNIST("", train=False, download=True, transform=transforms.ToTensor())
mnist_train, mnist_val = random_split(dataset, [55000, 5000])
train_loader = DataLoader(mnist_train, batch_size=args.batch_size)
val_loader = DataLoader(mnist_val, batch_size=args.batch_size)
test_loader = DataLoader(mnist_test, batch_size=args.batch_size)
# ------------
# model
# ------------
model = LitAutoEncoder()
# ------------
# training
# ------------
trainer = pl.Trainer.from_argparse_args(args)
trainer.fit(model, train_loader, val_loader)
# ------------
# testing
# ------------
result = trainer.test(test_dataloaders=test_loader)
print(result)
if __name__ == "__main__":
cli_main()

Просмотреть файл

@ -1,775 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile rapids.dockerfile\n",
"FROM rapidsai/rapidsai:0.16-cuda10.2-runtime-ubuntu18.04-py3.7\n",
"RUN apt-get update && \\\n",
"apt-get install -y fuse && \\\n",
"source activate rapids && \\\n",
"pip install azureml-mlflow && \\\n",
"pip install azureml-dataprep"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile train.py\n",
"#\n",
"# Copyright (c) 2019-2020, NVIDIA CORPORATION.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"#\n",
"\n",
"import argparse\n",
"import os\n",
"import time\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"import cudf\n",
"import cuml\n",
"import mlflow\n",
"\n",
"from cuml import RandomForestClassifier as cuRF\n",
"from cuml.preprocessing.model_selection import train_test_split\n",
"from cuml.metrics.accuracy import accuracy_score\n",
"\n",
"from rapids_csp_azure import RapidsCloudML, PerfTimer\n",
"\n",
"\n",
"def main():\n",
" parser = argparse.ArgumentParser()\n",
"\n",
" parser.add_argument(\"--data_dir\", type=str, help=\"location of data\")\n",
" parser.add_argument(\n",
" \"--n_estimators\", type=int, default=100, help=\"Number of trees in RF\"\n",
" )\n",
" parser.add_argument(\n",
" \"--max_depth\", type=int, default=16, help=\"Max depth of each tree\"\n",
" )\n",
" parser.add_argument(\n",
" \"--n_bins\",\n",
" type=int,\n",
" default=8,\n",
" help=\"Number of bins used in split point calculation\",\n",
" )\n",
" parser.add_argument(\n",
" \"--max_features\",\n",
" type=float,\n",
" default=1.0,\n",
" help=\"Number of features for best split\",\n",
" )\n",
" parser.add_argument(\n",
" \"--compute\",\n",
" type=str,\n",
" default=\"single-GPU\",\n",
" help=\"set to multi-GPU for algorithms via dask\",\n",
" )\n",
" parser.add_argument(\n",
" \"--cv-folds\", type=int, default=5, help=\"Number of CV fold splits\"\n",
" )\n",
"\n",
" args = parser.parse_args()\n",
" data_dir = args.data_dir\n",
"\n",
" n_estimators = args.n_estimators\n",
" mlflow.log_param(\"n_estimators\", np.int(args.n_estimators))\n",
" max_depth = args.max_depth\n",
" mlflow.log_param(\"max_depth\", np.int(args.max_depth))\n",
" n_bins = args.n_bins\n",
" mlflow.log_param(\"n_bins\", np.int(args.n_bins))\n",
" max_features = args.max_features\n",
" mlflow.log_param(\"max_features\", np.str(args.max_features))\n",
"\n",
" print(\"\\n---->>>> cuDF version <<<<----\\n\", cudf.__version__)\n",
" print(\"\\n---->>>> cuML version <<<<----\\n\", cuml.__version__)\n",
"\n",
" azure_ml = RapidsCloudML(\n",
" cloud_type=\"Azure\",\n",
" model_type=\"RandomForest\",\n",
" data_type=\"Parquet\",\n",
" compute_type=args.compute,\n",
" )\n",
" print(args.compute)\n",
"\n",
" if args.compute == \"single-GPU\":\n",
" dataset, _, y_label, _ = azure_ml.load_data(\n",
" filename=os.path.join(data_dir, \"airline_20m.parquet\")\n",
" )\n",
" else:\n",
" # use parquet files from 'https://airlinedataset.blob.core.windows.net/airline-10years' for multi-GPU training\n",
" dataset, _, y_label, _ = azure_ml.load_data(\n",
" filename=os.path.join(data_dir, \"part*.parquet\"),\n",
" col_labels=[\n",
" \"Flight_Number_Reporting_Airline\",\n",
" \"Year\",\n",
" \"Quarter\",\n",
" \"Month\",\n",
" \"DayOfWeek\",\n",
" \"DOT_ID_Reporting_Airline\",\n",
" \"OriginCityMarketID\",\n",
" \"DestCityMarketID\",\n",
" \"DepTime\",\n",
" \"DepDelay\",\n",
" \"DepDel15\",\n",
" \"ArrDel15\",\n",
" \"ArrDelay\",\n",
" \"AirTime\",\n",
" \"Distance\",\n",
" ],\n",
" y_label=\"ArrDel15\",\n",
" )\n",
"\n",
" X = dataset[dataset.columns.difference([\"ArrDelay\", y_label])]\n",
" y = dataset[y_label]\n",
" del dataset\n",
"\n",
" print(\"\\n---->>>> Training using GPUs <<<<----\\n\")\n",
"\n",
" # ----------------------------------------------------------------------------------------------------\n",
" # cross-validation folds\n",
" # ----------------------------------------------------------------------------------------------------\n",
" accuracy_per_fold = []\n",
" train_time_per_fold = []\n",
" infer_time_per_fold = []\n",
" trained_model = []\n",
" global_best_test_accuracy = 0\n",
"\n",
" model_params = {\n",
" \"n_estimators\": n_estimators,\n",
" \"max_depth\": max_depth,\n",
" \"max_features\": max_features,\n",
" \"n_bins\": n_bins,\n",
" }\n",
"\n",
" # optional cross-validation w/ model_params['n_train_folds'] > 1\n",
" for i_train_fold in range(args.cv_folds):\n",
" print(f\"\\n CV fold {i_train_fold} of {args.cv_folds}\\n\")\n",
"\n",
" # split data\n",
" X_train, X_test, y_train, y_test, _ = azure_ml.split_data(\n",
" X, y, random_state=i_train_fold\n",
" )\n",
" # train model\n",
" trained_model, training_time = azure_ml.train_model(\n",
" X_train, y_train, model_params\n",
" )\n",
"\n",
" train_time_per_fold += [round(training_time, 4)]\n",
"\n",
" # evaluate perf\n",
" test_accuracy, infer_time = azure_ml.evaluate_test_perf(\n",
" trained_model, X_test, y_test\n",
" )\n",
" accuracy_per_fold += [round(test_accuracy, 4)]\n",
" infer_time_per_fold += [round(infer_time, 4)]\n",
"\n",
" # update best model [ assumes maximization of perf metric ]\n",
" if test_accuracy > global_best_test_accuracy:\n",
" global_best_test_accuracy = test_accuracy\n",
"\n",
" mlflow.log_metric(\n",
" \"Total training inference time\", np.float(training_time + infer_time)\n",
" )\n",
" mlflow.log_metric(\"Accuracy\", np.float(global_best_test_accuracy))\n",
" print(\"\\n Accuracy :\", global_best_test_accuracy)\n",
" print(\"\\n accuracy per fold :\", accuracy_per_fold)\n",
" print(\"\\n train-time per fold :\", train_time_per_fold)\n",
" print(\"\\n train-time all folds :\", sum(train_time_per_fold))\n",
" print(\"\\n infer-time per fold :\", infer_time_per_fold)\n",
" print(\"\\n infer-time all folds :\", sum(infer_time_per_fold))\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" with PerfTimer() as total_script_time:\n",
" main()\n",
" print(\"Total runtime: {:.2f}\".format(total_script_time.duration))\n",
" mlflow.log_metric(\"Total runtime\", np.float(total_script_time.duration))\n",
" print(\"\\n Exiting script\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%writefile rapids_csp_azure.py\n",
"#\n",
"# Copyright (c) 2019-2020, NVIDIA CORPORATION.\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# http://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"#\n",
"\n",
"import json\n",
"import logging\n",
"import os\n",
"import pprint\n",
"import sys\n",
"import time\n",
"\n",
"import dask\n",
"import numpy as np\n",
"import pandas as pd\n",
"import psutil\n",
"import sklearn\n",
"from dask.distributed import Client, wait\n",
"from sklearn import ensemble\n",
"from sklearn.model_selection import (\n",
" train_test_split as sklearn_train_test_split,\n",
")\n",
"\n",
"import cudf\n",
"import cuml\n",
"import cupy\n",
"import dask_cudf\n",
"import pynvml\n",
"import xgboost\n",
"from cuml.dask.common import utils as dask_utils\n",
"\n",
"from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF\n",
"from cuml.metrics.accuracy import accuracy_score\n",
"from cuml.preprocessing.model_selection import (\n",
" train_test_split as cuml_train_test_split,\n",
")\n",
"from dask_cuda import LocalCUDACluster\n",
"from dask_ml.model_selection import train_test_split as dask_train_test_split\n",
"\n",
"default_azureml_paths = {\n",
" \"train_script\": \"./train_script\",\n",
" \"train_data\": \"./data_airline\",\n",
" \"output\": \"./output\",\n",
"}\n",
"\n",
"\n",
"class RapidsCloudML(object):\n",
" def __init__(\n",
" self,\n",
" cloud_type=\"Azure\",\n",
" model_type=\"RandomForest\",\n",
" data_type=\"Parquet\",\n",
" compute_type=\"single-GPU\",\n",
" verbose_estimator=False,\n",
" CSP_paths=default_azureml_paths,\n",
" ):\n",
"\n",
" self.CSP_paths = CSP_paths\n",
" self.cloud_type = cloud_type\n",
" self.model_type = model_type\n",
" self.data_type = data_type\n",
" self.compute_type = compute_type\n",
" self.verbose_estimator = verbose_estimator\n",
" self.log_to_file(\n",
" f\"\\n> RapidsCloudML\\n\\tCompute, Data , Model, Cloud types {self.compute_type, self.data_type, self.model_type, self.cloud_type}\"\n",
" )\n",
"\n",
" # Setting up client for multi-GPU option\n",
" if \"multi\" in self.compute_type:\n",
" self.log_to_file(\"\\n\\tMulti-GPU selected\")\n",
" # This will use all GPUs on the local host by default\n",
" cluster = LocalCUDACluster(threads_per_worker=1)\n",
" self.client = Client(cluster)\n",
"\n",
" # Query the client for all connected workers\n",
" self.workers = self.client.has_what().keys()\n",
" self.n_workers = len(self.workers)\n",
" self.log_to_file(f\"\\n\\tClient information {self.client}\")\n",
"\n",
" def load_hyperparams(self, model_name=\"XGBoost\"):\n",
" \"\"\"\n",
" Selecting model paramters based on the model we select for execution.\n",
" Checks if there is a config file present in the path self.CSP_paths['hyperparams'] with\n",
" the parameters for the experiment. If not present, it returns the default parameters.\n",
"\n",
" Parameters\n",
" ----------\n",
" model_name : string\n",
" Selects which model to set the parameters for. Takes either 'XGBoost' or 'RandomForest'.\n",
"\n",
" Returns\n",
" ----------\n",
" model_params : dict\n",
" Loaded model parameters (dict)\n",
" \"\"\"\n",
"\n",
" self.log_to_file(\"\\n> Loading Hyperparameters\")\n",
"\n",
" # Default parameters of the models\n",
" if self.model_type == \"XGBoost\":\n",
" # https://xgboost.readthedocs.io/en/latest/parameter.html\n",
" model_params = {\n",
" \"max_depth\": 6,\n",
" \"num_boost_round\": 100,\n",
" \"learning_rate\": 0.3,\n",
" \"gamma\": 0.0,\n",
" \"lambda\": 1.0,\n",
" \"alpha\": 0.0,\n",
" \"objective\": \"binary:logistic\",\n",
" \"random_state\": 0,\n",
" }\n",
"\n",
" elif self.model_type == \"RandomForest\":\n",
" # https://docs.rapids.ai/api/cuml/stable/ -> cuml.ensemble.RandomForestClassifier\n",
" model_params = {\n",
" \"n_estimators\": 10,\n",
" \"max_depth\": 10,\n",
" \"n_bins\": 16,\n",
" \"max_features\": 1.0,\n",
" \"seed\": 0,\n",
" }\n",
"\n",
" hyperparameters = {}\n",
" try:\n",
" with open(self.CSP_paths[\"hyperparams\"], \"r\") as file_handle:\n",
" hyperparameters = json.load(file_handle)\n",
" for key, value in hyperparameters.items():\n",
" model_params[key] = value\n",
" pprint.pprint(model_params)\n",
" return model_params\n",
"\n",
" except Exception as error:\n",
" self.log_to_file(str(error))\n",
" return\n",
"\n",
" def load_data(\n",
" self, filename=\"dataset.orc\", col_labels=None, y_label=\"ArrDelayBinary\"\n",
" ):\n",
" \"\"\"\n",
" Loading the data into the object from the filename and based on the columns that we are\n",
" interested in. Also, generates y_label from 'ArrDelay' column to convert this into a binary\n",
" classification problem.\n",
"\n",
" Parameters\n",
" ----------\n",
" filename : string\n",
" the path of the dataset to be loaded\n",
"\n",
" col_labels : list of strings\n",
" The input columns that we are interested in. None selects all the columns\n",
"\n",
" y_label : string\n",
" The column to perform the prediction task in.\n",
"\n",
" Returns\n",
" ----------\n",
" dataset : dataframe (Pandas, cudf or dask-cudf)\n",
" Ingested dataset in the format of a dataframe\n",
"\n",
" col_labels : list of strings\n",
" The input columns selected\n",
"\n",
" y_label : string\n",
" The generated y_label name for binary classification\n",
"\n",
" duration : float\n",
" The time it took to execute the function\n",
" \"\"\"\n",
" target_filename = filename\n",
" self.log_to_file(f\"\\n> Loading dataset from {target_filename}\")\n",
"\n",
" with PerfTimer() as ingestion_timer:\n",
" if \"CPU\" in self.compute_type:\n",
" # CPU Reading options\n",
" self.log_to_file(f\"\\n\\tCPU read\")\n",
"\n",
" if self.data_type == \"ORC\":\n",
" with open(target_filename, mode=\"rb\") as file:\n",
" dataset = pyarrow_orc.ORCFile(file).read().to_pandas()\n",
" elif self.data_type == \"CSV\":\n",
" dataset = pd.read_csv(target_filename, names=col_labels)\n",
"\n",
" elif self.data_type == \"Parquet\":\n",
"\n",
" if \"single\" in self.compute_type:\n",
" dataset = pd.read_parquet(target_filename)\n",
"\n",
" elif \"multi\" in self.compute_type:\n",
" self.log_to_file(f\"\\n\\tReading using dask dataframe\")\n",
" dataset = dask.dataframe.read_parquet(\n",
" target_filename, columns=columns\n",
" )\n",
"\n",
" elif \"GPU\" in self.compute_type:\n",
" # GPU Reading Option\n",
"\n",
" self.log_to_file(f\"\\n\\tGPU read\")\n",
" if self.data_type == \"ORC\":\n",
" dataset = cudf.read_orc(target_filename)\n",
"\n",
" elif self.data_type == \"CSV\":\n",
" dataset = cudf.read_csv(target_filename, names=col_labels)\n",
"\n",
" elif self.data_type == \"Parquet\":\n",
"\n",
" if \"single\" in self.compute_type:\n",
" dataset = cudf.read_parquet(target_filename)\n",
"\n",
" elif \"multi\" in self.compute_type:\n",
" self.log_to_file(f\"\\n\\tReading using dask_cudf\")\n",
" dataset = dask_cudf.read_parquet(\n",
" target_filename, columns=col_labels\n",
" )\n",
"\n",
" # cast all columns to float32\n",
" for col in dataset.columns:\n",
" dataset[col] = dataset[col].astype(\n",
" np.float32\n",
" ) # needed for random forest\n",
"\n",
" # Adding y_label column if it is not present\n",
" if y_label not in dataset.columns:\n",
" dataset[y_label] = 1.0 * (dataset[\"ArrDelay\"] > 10)\n",
"\n",
" dataset[y_label] = dataset[y_label].astype(\n",
" np.int32\n",
" ) # Needed for cuml RF\n",
"\n",
" dataset = dataset.fillna(\n",
" 0.0\n",
" ) # Filling the null values. Needed for dask-cudf\n",
"\n",
" self.log_to_file(\n",
" f\"\\n\\tIngestion completed in {ingestion_timer.duration}\"\n",
" )\n",
" self.log_to_file(\n",
" f\"\\n\\tDataset descriptors: {dataset.shape}\\n\\t{dataset.dtypes}\"\n",
" )\n",
" return dataset, col_labels, y_label, ingestion_timer.duration\n",
"\n",
" def split_data(\n",
" self, dataset, y_label, train_size=0.8, random_state=0, shuffle=True\n",
" ):\n",
" \"\"\"\n",
" Splitting data into train and test split, has appropriate imports for different compute modes.\n",
" CPU compute - Uses sklearn, we manually filter y_label column in the split call\n",
" GPU Compute - Single GPU uses cuml and multi GPU uses dask, both split y_label internally.\n",
"\n",
" Parameters\n",
" ----------\n",
" dataset : dataframe\n",
" The dataframe on which we wish to perform the split\n",
" y_label : string\n",
" The name of the column (not the series itself)\n",
" train_size : float\n",
" The size for the split. Takes values between 0 to 1.\n",
" random_state : int\n",
" Useful for running reproducible splits.\n",
" shuffle : binary\n",
" Specifies if the data must be shuffled before splitting.\n",
"\n",
" Returns\n",
" ----------\n",
" X_train : dataframe\n",
" The data to be used for training. Has same type as input dataset.\n",
" X_test : dataframe\n",
" The data to be used for testing. Has same type as input dataset.\n",
" y_train : dataframe\n",
" The label to be used for training. Has same type as input dataset.\n",
" y_test : dataframe\n",
" The label to be used for testing. Has same type as input dataset.\n",
" duration : float\n",
" The time it took to perform the split\n",
" \"\"\"\n",
" self.log_to_file(\"\\n> Splitting train and test data\")\n",
" start_time = time.perf_counter()\n",
"\n",
" with PerfTimer() as split_timer:\n",
" if \"CPU\" in self.compute_type:\n",
" X_train, X_test, y_train, y_test = sklearn_train_test_split(\n",
" dataset.loc[:, dataset.columns != y_label],\n",
" dataset[y_label],\n",
" train_size=train_size,\n",
" shuffle=shuffle,\n",
" random_state=random_state,\n",
" )\n",
"\n",
" elif \"GPU\" in self.compute_type:\n",
" if \"single\" in self.compute_type:\n",
" X_train, X_test, y_train, y_test = cuml_train_test_split(\n",
" X=dataset,\n",
" y=y_label,\n",
" train_size=train_size,\n",
" shuffle=shuffle,\n",
" random_state=random_state,\n",
" )\n",
" elif \"multi\" in self.compute_type:\n",
" X_train, X_test, y_train, y_test = dask_train_test_split(\n",
" dataset,\n",
" y_label,\n",
" train_size=train_size,\n",
" shuffle=False, # shuffle not available for dask_cudf yet\n",
" random_state=random_state,\n",
" )\n",
"\n",
" self.log_to_file(\n",
" f\"\\n\\tX_train shape and type{X_train.shape} {type(X_train)}\"\n",
" )\n",
" self.log_to_file(f\"\\n\\tSplit completed in {split_timer.duration}\")\n",
" return X_train, X_test, y_train, y_test, split_timer.duration\n",
"\n",
" def train_model(self, X_train, y_train, model_params):\n",
" \"\"\"\n",
" Trains a model with the model_params specified by calling fit_xgboost or\n",
" fit_random_forest depending on the model_type.\n",
"\n",
" Parameters\n",
" ----------\n",
" X_train : dataframe\n",
" The data for traning\n",
" y_train : dataframe\n",
" The label to be used for training.\n",
" model_params : dict\n",
" The model params to use for this training\n",
" Returns\n",
" ----------\n",
" trained_model : The object of the trained model either of XGBoost or RandomForest\n",
"\n",
" training_time : float\n",
" The time it took to train the model\n",
" \"\"\"\n",
" self.log_to_file(\n",
" f\"\\n> Training {self.model_type} estimator w/ hyper-params\"\n",
" )\n",
" training_time = 0\n",
"\n",
" try:\n",
" if self.model_type == \"XGBoost\":\n",
" trained_model, training_time = self.fit_xgboost(\n",
" X_train, y_train, model_params\n",
" )\n",
" elif self.model_type == \"RandomForest\":\n",
" trained_model, training_time = self.fit_random_forest(\n",
" X_train, y_train, model_params\n",
" )\n",
" except Exception as error:\n",
" self.log_to_file(\"\\n\\n!error during model training: \" + str(error))\n",
" self.log_to_file(f\"\\n\\tFinished training in {training_time:.4f} s\")\n",
" return trained_model, training_time\n",
"\n",
" def fit_xgboost(self, X_train, y_train, model_params):\n",
" \"\"\"\n",
" Trains a XGBoost model on X_train and y_train with model_params\n",
"\n",
" Parameters and Objects returned are same as trained_model\n",
" \"\"\"\n",
" if \"GPU\" in self.compute_type:\n",
" model_params.update({\"tree_method\": \"gpu_hist\"})\n",
" else:\n",
" model_params.update({\"tree_method\": \"hist\"})\n",
"\n",
" with PerfTimer() as train_timer:\n",
" if \"single\" in self.compute_type:\n",
" train_DMatrix = xgboost.DMatrix(data=X_train, label=y_train)\n",
" trained_model = xgboost.train(\n",
" dtrain=train_DMatrix,\n",
" params=model_params,\n",
" num_boost_round=model_params[\"num_boost_round\"],\n",
" )\n",
" elif \"multi\" in self.compute_type:\n",
" self.log_to_file(\"\\n\\tTraining multi-GPU XGBoost\")\n",
" train_DMatrix = xgboost.dask.DaskDMatrix(\n",
" self.client, data=X_train, label=y_train\n",
" )\n",
" trained_model = xgboost.dask.train(\n",
" self.client,\n",
" dtrain=train_DMatrix,\n",
" params=model_params,\n",
" num_boost_round=model_params[\"num_boost_round\"],\n",
" )\n",
" return trained_model, train_timer.duration\n",
"\n",
" def fit_random_forest(self, X_train, y_train, model_params):\n",
" \"\"\"\n",
" Trains a RandomForest model on X_train and y_train with model_params.\n",
" Depending on compute_type, estimators from appropriate packages are used.\n",
" CPU - sklearn\n",
" Single-GPU - cuml\n",
" multi_gpu - cuml.dask\n",
"\n",
" Parameters and Objects returned are same as trained_model\n",
" \"\"\"\n",
" if \"CPU\" in self.compute_type:\n",
" rf_model = sklearn.ensemble.RandomForestClassifier(\n",
" n_estimators=model_params[\"n_estimators\"],\n",
" max_depth=model_params[\"max_depth\"],\n",
" max_features=model_params[\"max_features\"],\n",
" n_jobs=int(self.n_workers),\n",
" verbose=self.verbose_estimator,\n",
" )\n",
" elif \"GPU\" in self.compute_type:\n",
" if \"single\" in self.compute_type:\n",
" rf_model = cuml.ensemble.RandomForestClassifier(\n",
" n_estimators=model_params[\"n_estimators\"],\n",
" max_depth=model_params[\"max_depth\"],\n",
" n_bins=model_params[\"n_bins\"],\n",
" max_features=model_params[\"max_features\"],\n",
" verbose=self.verbose_estimator,\n",
" )\n",
" elif \"multi\" in self.compute_type:\n",
" self.log_to_file(\"\\n\\tFitting multi-GPU daskRF\")\n",
" X_train, y_train = dask_utils.persist_across_workers(\n",
" self.client,\n",
" [X_train.fillna(0.0), y_train.fillna(0.0)],\n",
" workers=self.workers,\n",
" )\n",
" rf_model = cuml.dask.ensemble.RandomForestClassifier(\n",
" n_estimators=model_params[\"n_estimators\"],\n",
" max_depth=model_params[\"max_depth\"],\n",
" n_bins=model_params[\"n_bins\"],\n",
" max_features=model_params[\"max_features\"],\n",
" verbose=self.verbose_estimator,\n",
" )\n",
" with PerfTimer() as train_timer:\n",
" try:\n",
" trained_model = rf_model.fit(X_train, y_train)\n",
" except Exception as error:\n",
" self.log_to_file(\"\\n\\n! Error during fit \" + str(error))\n",
" return trained_model, train_timer.duration\n",
"\n",
" def evaluate_test_perf(self, trained_model, X_test, y_test, threshold=0.5):\n",
" \"\"\"\n",
" Evaluates the model performance on the inference set. For XGBoost we need\n",
" to generate a DMatrix and then we can evaluate the model.\n",
" For Random Forest, in single GPU case, we can just call .score function.\n",
" And multi-GPU Random Forest needs to predict on the model and then compute\n",
" the accuracy score.\n",
"\n",
" Parameters\n",
" ----------\n",
" trained_model : The object of the trained model either of XGBoost or RandomForest\n",
" X_test : dataframe\n",
" The data for testing\n",
" y_test : dataframe\n",
" The label to be used for testing.\n",
" Returns\n",
" ----------\n",
" test_accuracy : float\n",
" The accuracy achieved on test set\n",
" duration : float\n",
" The time it took to evaluate the model\n",
" \"\"\"\n",
" self.log_to_file(f\"\\n> Inferencing on test set\")\n",
" test_accuracy = None\n",
" with PerfTimer() as inference_timer:\n",
" try:\n",
" if self.model_type == \"XGBoost\":\n",
" if \"multi\" in self.compute_type:\n",
" test_DMatrix = xgboost.dask.DaskDMatrix(\n",
" self.client, data=X_test, label=y_test\n",
" )\n",
" xgb_pred = xgboost.dask.predict(\n",
" self.client, trained_model, test_DMatrix\n",
" ).compute()\n",
" xgb_pred = (xgb_pred > threshold) * 1.0\n",
" test_accuracy = accuracy_score(\n",
" y_test.compute(), xgb_pred\n",
" )\n",
" elif \"single\" in self.compute_type:\n",
" test_DMatrix = xgboost.DMatrix(\n",
" data=X_test, label=y_test\n",
" )\n",
" xgb_pred = trained_model.predict(test_DMatrix)\n",
" xgb_pred = (xgb_pred > threshold) * 1.0\n",
" test_accuracy = accuracy_score(y_test, xgb_pred)\n",
"\n",
" elif self.model_type == \"RandomForest\":\n",
" if \"multi\" in self.compute_type:\n",
" cuml_pred = trained_model.predict(X_test).compute()\n",
" self.log_to_file(\"\\n\\tPrediction complete\")\n",
" test_accuracy = accuracy_score(\n",
" y_test.compute(), cuml_pred, convert_dtype=True\n",
" )\n",
" elif \"single\" in self.compute_type:\n",
" test_accuracy = trained_model.score(\n",
" X_test, y_test.astype(\"int32\")\n",
" )\n",
"\n",
" except Exception as error:\n",
" self.log_to_file(\"\\n\\n!error during inference: \" + str(error))\n",
"\n",
" self.log_to_file(\n",
" f\"\\n\\tFinished inference in {inference_timer.duration:.4f} s\"\n",
" )\n",
" self.log_to_file(f\"\\n\\tTest-accuracy: {test_accuracy}\")\n",
" return test_accuracy, inference_timer.duration\n",
"\n",
" def set_up_logging(self):\n",
" \"\"\"\n",
" Function to set up logging for the object.\n",
" \"\"\"\n",
" logging_path = self.CSP_paths[\"output\"] + \"/log.txt\"\n",
" logging.basicConfig(filename=logging_path, level=logging.INFO)\n",
"\n",
" def log_to_file(self, text):\n",
" \"\"\"\n",
" Logs the text that comes in as input.\n",
" \"\"\"\n",
" logging.info(text)\n",
" print(text)\n",
"\n",
"\n",
"# perf_counter = highest available timer resolution\n",
"class PerfTimer:\n",
" def __init__(self):\n",
" self.start = None\n",
" self.duration = None\n",
"\n",
" def __enter__(self):\n",
" self.start = time.perf_counter()\n",
" return self\n",
"\n",
" def __exit__(self, *args):\n",
" self.duration = time.perf_counter() - self.start"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.8",
"language": "python",
"name": "python3.8"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.3-final"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

Просмотреть файл

@ -28,11 +28,11 @@
"outputs": [],
"source": [
"# training script\n",
"script_dir = \".\"\n",
"script_dir = \"src\"\n",
"script_name = \"train.py\"\n",
"\n",
"# environment file\n",
"environment_file = \"rapids.dockerfile\"\n",
"environment_file = \"DOCKERFILE\"\n",
"\n",
"# azure ml settings\n",
"environment_name = \"rapids-tutorial\"\n",

Просмотреть файл

@ -30,11 +30,11 @@
"outputs": [],
"source": [
"# training script\n",
"script_dir = \".\"\n",
"script_dir = \"src\"\n",
"script_name = \"train.py\"\n",
"\n",
"# environment file\n",
"environment_file = \"rapids.dockerfile\"\n",
"environment_file = \"DOCKERFILE\"\n",
"\n",
"# azure ml settings\n",
"environment_name = \"rapids-tutorial\"\n",

Просмотреть файл

@ -0,0 +1,6 @@
FROM rapidsai/rapidsai:0.16-cuda10.2-runtime-ubuntu18.04-py3.7
RUN apt-get update && \
apt-get install -y fuse && \
source activate rapids && \
pip install azureml-mlflow && \
pip install azureml-dataprep

Просмотреть файл

@ -0,0 +1,513 @@
#
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging
import os
import pprint
import sys
import time
import dask
import numpy as np
import pandas as pd
import psutil
import sklearn
from dask.distributed import Client, wait
from sklearn import ensemble
from sklearn.model_selection import train_test_split as sklearn_train_test_split
import cudf
import cuml
import cupy
import dask_cudf
import pynvml
import xgboost
from cuml.dask.common import utils as dask_utils
from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF
from cuml.metrics.accuracy import accuracy_score
from cuml.preprocessing.model_selection import train_test_split as cuml_train_test_split
from dask_cuda import LocalCUDACluster
from dask_ml.model_selection import train_test_split as dask_train_test_split
default_azureml_paths = {
"train_script": "./train_script",
"train_data": "./data_airline",
"output": "./output",
}
class RapidsCloudML(object):
def __init__(
self,
cloud_type="Azure",
model_type="RandomForest",
data_type="Parquet",
compute_type="single-GPU",
verbose_estimator=False,
CSP_paths=default_azureml_paths,
):
self.CSP_paths = CSP_paths
self.cloud_type = cloud_type
self.model_type = model_type
self.data_type = data_type
self.compute_type = compute_type
self.verbose_estimator = verbose_estimator
self.log_to_file(
f"\n> RapidsCloudML\n\tCompute, Data , Model, Cloud types {self.compute_type, self.data_type, self.model_type, self.cloud_type}"
)
# Setting up client for multi-GPU option
if "multi" in self.compute_type:
self.log_to_file("\n\tMulti-GPU selected")
# This will use all GPUs on the local host by default
cluster = LocalCUDACluster(threads_per_worker=1)
self.client = Client(cluster)
# Query the client for all connected workers
self.workers = self.client.has_what().keys()
self.n_workers = len(self.workers)
self.log_to_file(f"\n\tClient information {self.client}")
def load_hyperparams(self, model_name="XGBoost"):
"""
Selecting model paramters based on the model we select for execution.
Checks if there is a config file present in the path self.CSP_paths['hyperparams'] with
the parameters for the experiment. If not present, it returns the default parameters.
Parameters
----------
model_name : string
Selects which model to set the parameters for. Takes either 'XGBoost' or 'RandomForest'.
Returns
----------
model_params : dict
Loaded model parameters (dict)
"""
self.log_to_file("\n> Loading Hyperparameters")
# Default parameters of the models
if self.model_type == "XGBoost":
# https://xgboost.readthedocs.io/en/latest/parameter.html
model_params = {
"max_depth": 6,
"num_boost_round": 100,
"learning_rate": 0.3,
"gamma": 0.0,
"lambda": 1.0,
"alpha": 0.0,
"objective": "binary:logistic",
"random_state": 0,
}
elif self.model_type == "RandomForest":
# https://docs.rapids.ai/api/cuml/stable/ -> cuml.ensemble.RandomForestClassifier
model_params = {
"n_estimators": 10,
"max_depth": 10,
"n_bins": 16,
"max_features": 1.0,
"seed": 0,
}
hyperparameters = {}
try:
with open(self.CSP_paths["hyperparams"], "r") as file_handle:
hyperparameters = json.load(file_handle)
for key, value in hyperparameters.items():
model_params[key] = value
pprint.pprint(model_params)
return model_params
except Exception as error:
self.log_to_file(str(error))
return
def load_data(
self, filename="dataset.orc", col_labels=None, y_label="ArrDelayBinary"
):
"""
Loading the data into the object from the filename and based on the columns that we are
interested in. Also, generates y_label from 'ArrDelay' column to convert this into a binary
classification problem.
Parameters
----------
filename : string
the path of the dataset to be loaded
col_labels : list of strings
The input columns that we are interested in. None selects all the columns
y_label : string
The column to perform the prediction task in.
Returns
----------
dataset : dataframe (Pandas, cudf or dask-cudf)
Ingested dataset in the format of a dataframe
col_labels : list of strings
The input columns selected
y_label : string
The generated y_label name for binary classification
duration : float
The time it took to execute the function
"""
target_filename = filename
self.log_to_file(f"\n> Loading dataset from {target_filename}")
with PerfTimer() as ingestion_timer:
if "CPU" in self.compute_type:
# CPU Reading options
self.log_to_file(f"\n\tCPU read")
if self.data_type == "ORC":
with open(target_filename, mode="rb") as file:
dataset = pyarrow_orc.ORCFile(file).read().to_pandas()
elif self.data_type == "CSV":
dataset = pd.read_csv(target_filename, names=col_labels)
elif self.data_type == "Parquet":
if "single" in self.compute_type:
dataset = pd.read_parquet(target_filename)
elif "multi" in self.compute_type:
self.log_to_file(f"\n\tReading using dask dataframe")
dataset = dask.dataframe.read_parquet(
target_filename, columns=columns
)
elif "GPU" in self.compute_type:
# GPU Reading Option
self.log_to_file(f"\n\tGPU read")
if self.data_type == "ORC":
dataset = cudf.read_orc(target_filename)
elif self.data_type == "CSV":
dataset = cudf.read_csv(target_filename, names=col_labels)
elif self.data_type == "Parquet":
if "single" in self.compute_type:
dataset = cudf.read_parquet(target_filename)
elif "multi" in self.compute_type:
self.log_to_file(f"\n\tReading using dask_cudf")
dataset = dask_cudf.read_parquet(
target_filename, columns=col_labels
)
# cast all columns to float32
for col in dataset.columns:
dataset[col] = dataset[col].astype(np.float32) # needed for random forest
# Adding y_label column if it is not present
if y_label not in dataset.columns:
dataset[y_label] = 1.0 * (dataset["ArrDelay"] > 10)
dataset[y_label] = dataset[y_label].astype(np.int32) # Needed for cuml RF
dataset = dataset.fillna(0.0) # Filling the null values. Needed for dask-cudf
self.log_to_file(f"\n\tIngestion completed in {ingestion_timer.duration}")
self.log_to_file(
f"\n\tDataset descriptors: {dataset.shape}\n\t{dataset.dtypes}"
)
return dataset, col_labels, y_label, ingestion_timer.duration
def split_data(
self, dataset, y_label, train_size=0.8, random_state=0, shuffle=True
):
"""
Splitting data into train and test split, has appropriate imports for different compute modes.
CPU compute - Uses sklearn, we manually filter y_label column in the split call
GPU Compute - Single GPU uses cuml and multi GPU uses dask, both split y_label internally.
Parameters
----------
dataset : dataframe
The dataframe on which we wish to perform the split
y_label : string
The name of the column (not the series itself)
train_size : float
The size for the split. Takes values between 0 to 1.
random_state : int
Useful for running reproducible splits.
shuffle : binary
Specifies if the data must be shuffled before splitting.
Returns
----------
X_train : dataframe
The data to be used for training. Has same type as input dataset.
X_test : dataframe
The data to be used for testing. Has same type as input dataset.
y_train : dataframe
The label to be used for training. Has same type as input dataset.
y_test : dataframe
The label to be used for testing. Has same type as input dataset.
duration : float
The time it took to perform the split
"""
self.log_to_file("\n> Splitting train and test data")
start_time = time.perf_counter()
with PerfTimer() as split_timer:
if "CPU" in self.compute_type:
X_train, X_test, y_train, y_test = sklearn_train_test_split(
dataset.loc[:, dataset.columns != y_label],
dataset[y_label],
train_size=train_size,
shuffle=shuffle,
random_state=random_state,
)
elif "GPU" in self.compute_type:
if "single" in self.compute_type:
X_train, X_test, y_train, y_test = cuml_train_test_split(
X=dataset,
y=y_label,
train_size=train_size,
shuffle=shuffle,
random_state=random_state,
)
elif "multi" in self.compute_type:
X_train, X_test, y_train, y_test = dask_train_test_split(
dataset,
y_label,
train_size=train_size,
shuffle=False, # shuffle not available for dask_cudf yet
random_state=random_state,
)
self.log_to_file(f"\n\tX_train shape and type{X_train.shape} {type(X_train)}")
self.log_to_file(f"\n\tSplit completed in {split_timer.duration}")
return X_train, X_test, y_train, y_test, split_timer.duration
def train_model(self, X_train, y_train, model_params):
"""
Trains a model with the model_params specified by calling fit_xgboost or
fit_random_forest depending on the model_type.
Parameters
----------
X_train : dataframe
The data for traning
y_train : dataframe
The label to be used for training.
model_params : dict
The model params to use for this training
Returns
----------
trained_model : The object of the trained model either of XGBoost or RandomForest
training_time : float
The time it took to train the model
"""
self.log_to_file(f"\n> Training {self.model_type} estimator w/ hyper-params")
training_time = 0
try:
if self.model_type == "XGBoost":
trained_model, training_time = self.fit_xgboost(
X_train, y_train, model_params
)
elif self.model_type == "RandomForest":
trained_model, training_time = self.fit_random_forest(
X_train, y_train, model_params
)
except Exception as error:
self.log_to_file("\n\n!error during model training: " + str(error))
self.log_to_file(f"\n\tFinished training in {training_time:.4f} s")
return trained_model, training_time
def fit_xgboost(self, X_train, y_train, model_params):
"""
Trains a XGBoost model on X_train and y_train with model_params
Parameters and Objects returned are same as trained_model
"""
if "GPU" in self.compute_type:
model_params.update({"tree_method": "gpu_hist"})
else:
model_params.update({"tree_method": "hist"})
with PerfTimer() as train_timer:
if "single" in self.compute_type:
train_DMatrix = xgboost.DMatrix(data=X_train, label=y_train)
trained_model = xgboost.train(
dtrain=train_DMatrix,
params=model_params,
num_boost_round=model_params["num_boost_round"],
)
elif "multi" in self.compute_type:
self.log_to_file("\n\tTraining multi-GPU XGBoost")
train_DMatrix = xgboost.dask.DaskDMatrix(
self.client, data=X_train, label=y_train
)
trained_model = xgboost.dask.train(
self.client,
dtrain=train_DMatrix,
params=model_params,
num_boost_round=model_params["num_boost_round"],
)
return trained_model, train_timer.duration
def fit_random_forest(self, X_train, y_train, model_params):
"""
Trains a RandomForest model on X_train and y_train with model_params.
Depending on compute_type, estimators from appropriate packages are used.
CPU - sklearn
Single-GPU - cuml
multi_gpu - cuml.dask
Parameters and Objects returned are same as trained_model
"""
if "CPU" in self.compute_type:
rf_model = sklearn.ensemble.RandomForestClassifier(
n_estimators=model_params["n_estimators"],
max_depth=model_params["max_depth"],
max_features=model_params["max_features"],
n_jobs=int(self.n_workers),
verbose=self.verbose_estimator,
)
elif "GPU" in self.compute_type:
if "single" in self.compute_type:
rf_model = cuml.ensemble.RandomForestClassifier(
n_estimators=model_params["n_estimators"],
max_depth=model_params["max_depth"],
n_bins=model_params["n_bins"],
max_features=model_params["max_features"],
verbose=self.verbose_estimator,
)
elif "multi" in self.compute_type:
self.log_to_file("\n\tFitting multi-GPU daskRF")
X_train, y_train = dask_utils.persist_across_workers(
self.client,
[X_train.fillna(0.0), y_train.fillna(0.0)],
workers=self.workers,
)
rf_model = cuml.dask.ensemble.RandomForestClassifier(
n_estimators=model_params["n_estimators"],
max_depth=model_params["max_depth"],
n_bins=model_params["n_bins"],
max_features=model_params["max_features"],
verbose=self.verbose_estimator,
)
with PerfTimer() as train_timer:
try:
trained_model = rf_model.fit(X_train, y_train)
except Exception as error:
self.log_to_file("\n\n! Error during fit " + str(error))
return trained_model, train_timer.duration
def evaluate_test_perf(self, trained_model, X_test, y_test, threshold=0.5):
"""
Evaluates the model performance on the inference set. For XGBoost we need
to generate a DMatrix and then we can evaluate the model.
For Random Forest, in single GPU case, we can just call .score function.
And multi-GPU Random Forest needs to predict on the model and then compute
the accuracy score.
Parameters
----------
trained_model : The object of the trained model either of XGBoost or RandomForest
X_test : dataframe
The data for testing
y_test : dataframe
The label to be used for testing.
Returns
----------
test_accuracy : float
The accuracy achieved on test set
duration : float
The time it took to evaluate the model
"""
self.log_to_file(f"\n> Inferencing on test set")
test_accuracy = None
with PerfTimer() as inference_timer:
try:
if self.model_type == "XGBoost":
if "multi" in self.compute_type:
test_DMatrix = xgboost.dask.DaskDMatrix(
self.client, data=X_test, label=y_test
)
xgb_pred = xgboost.dask.predict(
self.client, trained_model, test_DMatrix
).compute()
xgb_pred = (xgb_pred > threshold) * 1.0
test_accuracy = accuracy_score(y_test.compute(), xgb_pred)
elif "single" in self.compute_type:
test_DMatrix = xgboost.DMatrix(data=X_test, label=y_test)
xgb_pred = trained_model.predict(test_DMatrix)
xgb_pred = (xgb_pred > threshold) * 1.0
test_accuracy = accuracy_score(y_test, xgb_pred)
elif self.model_type == "RandomForest":
if "multi" in self.compute_type:
cuml_pred = trained_model.predict(X_test).compute()
self.log_to_file("\n\tPrediction complete")
test_accuracy = accuracy_score(
y_test.compute(), cuml_pred, convert_dtype=True
)
elif "single" in self.compute_type:
test_accuracy = trained_model.score(
X_test, y_test.astype("int32")
)
except Exception as error:
self.log_to_file("\n\n!error during inference: " + str(error))
self.log_to_file(f"\n\tFinished inference in {inference_timer.duration:.4f} s")
self.log_to_file(f"\n\tTest-accuracy: {test_accuracy}")
return test_accuracy, inference_timer.duration
def set_up_logging(self):
"""
Function to set up logging for the object.
"""
logging_path = self.CSP_paths["output"] + "/log.txt"
logging.basicConfig(filename=logging_path, level=logging.INFO)
def log_to_file(self, text):
"""
Logs the text that comes in as input.
"""
logging.info(text)
print(text)
# perf_counter = highest available timer resolution
class PerfTimer:
def __init__(self):
self.start = None
self.duration = None
def __enter__(self):
self.start = time.perf_counter()
return self
def __exit__(self, *args):
self.duration = time.perf_counter() - self.start

Просмотреть файл

@ -0,0 +1,182 @@
#
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
import os
import time
import numpy as np
import pandas as pd
import cudf
import cuml
import mlflow
from cuml import RandomForestClassifier as cuRF
from cuml.preprocessing.model_selection import train_test_split
from cuml.metrics.accuracy import accuracy_score
from rapids_csp_azure import RapidsCloudML, PerfTimer
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--data_dir", type=str, help="location of data")
parser.add_argument(
"--n_estimators", type=int, default=100, help="Number of trees in RF"
)
parser.add_argument(
"--max_depth", type=int, default=16, help="Max depth of each tree"
)
parser.add_argument(
"--n_bins",
type=int,
default=8,
help="Number of bins used in split point calculation",
)
parser.add_argument(
"--max_features",
type=float,
default=1.0,
help="Number of features for best split",
)
parser.add_argument(
"--compute",
type=str,
default="single-GPU",
help="set to multi-GPU for algorithms via dask",
)
parser.add_argument(
"--cv-folds", type=int, default=5, help="Number of CV fold splits"
)
args = parser.parse_args()
data_dir = args.data_dir
n_estimators = args.n_estimators
mlflow.log_param("n_estimators", np.int(args.n_estimators))
max_depth = args.max_depth
mlflow.log_param("max_depth", np.int(args.max_depth))
n_bins = args.n_bins
mlflow.log_param("n_bins", np.int(args.n_bins))
max_features = args.max_features
mlflow.log_param("max_features", np.str(args.max_features))
print("\n---->>>> cuDF version <<<<----\n", cudf.__version__)
print("\n---->>>> cuML version <<<<----\n", cuml.__version__)
azure_ml = RapidsCloudML(
cloud_type="Azure",
model_type="RandomForest",
data_type="Parquet",
compute_type=args.compute,
)
print(args.compute)
if args.compute == "single-GPU":
dataset, _, y_label, _ = azure_ml.load_data(
filename=os.path.join(data_dir, "airline_20m.parquet")
)
else:
# use parquet files from 'https://airlinedataset.blob.core.windows.net/airline-10years' for multi-GPU training
dataset, _, y_label, _ = azure_ml.load_data(
filename=os.path.join(data_dir, "part*.parquet"),
col_labels=[
"Flight_Number_Reporting_Airline",
"Year",
"Quarter",
"Month",
"DayOfWeek",
"DOT_ID_Reporting_Airline",
"OriginCityMarketID",
"DestCityMarketID",
"DepTime",
"DepDelay",
"DepDel15",
"ArrDel15",
"ArrDelay",
"AirTime",
"Distance",
],
y_label="ArrDel15",
)
X = dataset[dataset.columns.difference(["ArrDelay", y_label])]
y = dataset[y_label]
del dataset
print("\n---->>>> Training using GPUs <<<<----\n")
# ----------------------------------------------------------------------------------------------------
# cross-validation folds
# ----------------------------------------------------------------------------------------------------
accuracy_per_fold = []
train_time_per_fold = []
infer_time_per_fold = []
trained_model = []
global_best_test_accuracy = 0
model_params = {
"n_estimators": n_estimators,
"max_depth": max_depth,
"max_features": max_features,
"n_bins": n_bins,
}
# optional cross-validation w/ model_params['n_train_folds'] > 1
for i_train_fold in range(args.cv_folds):
print(f"\n CV fold {i_train_fold} of {args.cv_folds}\n")
# split data
X_train, X_test, y_train, y_test, _ = azure_ml.split_data(
X, y, random_state=i_train_fold
)
# train model
trained_model, training_time = azure_ml.train_model(
X_train, y_train, model_params
)
train_time_per_fold += [round(training_time, 4)]
# evaluate perf
test_accuracy, infer_time = azure_ml.evaluate_test_perf(
trained_model, X_test, y_test
)
accuracy_per_fold += [round(test_accuracy, 4)]
infer_time_per_fold += [round(infer_time, 4)]
# update best model [ assumes maximization of perf metric ]
if test_accuracy > global_best_test_accuracy:
global_best_test_accuracy = test_accuracy
mlflow.log_metric(
"Total training inference time", np.float(training_time + infer_time)
)
mlflow.log_metric("Accuracy", np.float(global_best_test_accuracy))
print("\n Accuracy :", global_best_test_accuracy)
print("\n accuracy per fold :", accuracy_per_fold)
print("\n train-time per fold :", train_time_per_fold)
print("\n train-time all folds :", sum(train_time_per_fold))
print("\n infer-time per fold :", infer_time_per_fold)
print("\n infer-time all folds :", sum(infer_time_per_fold))
if __name__ == "__main__":
with PerfTimer() as total_script_time:
main()
print("Total runtime: {:.2f}".format(total_script_time.duration))
mlflow.log_metric("Total runtime", np.float(total_script_time.duration))
print("\n Exiting script")