Merge pull request #116 from microsoft/abhiram-mlflow

Integrating Mlflow.
This commit is contained in:
Said Bleik 2019-06-25 08:55:44 -07:00 коммит произвёл GitHub
Родитель 7a8e4cf4d4 5d86d03d10
Коммит 4dac5f1cff
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 415 добавлений и 406 удалений

Просмотреть файл

@ -132,6 +132,13 @@
"scrolled": true "scrolled": true
}, },
"outputs": [ "outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Failure while loading azureml_run_type_providers. Failed to load entrypoint hyperdrive = azureml.train.hyperdrive:HyperDriveRun._from_run_dto with exception module 'azureml.train.hyperdrive' has no attribute 'HyperDriveRun'.\n"
]
},
{ {
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
@ -188,7 +195,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 12, "execution_count": 2,
"metadata": { "metadata": {
"scrolled": true "scrolled": true
}, },
@ -242,7 +249,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 13, "execution_count": 3,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -285,7 +292,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 19, "execution_count": 4,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -301,9 +308,9 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 22, "execution_count": 5,
"metadata": { "metadata": {
"scrolled": true "scrolled": false
}, },
"outputs": [ "outputs": [
{ {
@ -484,7 +491,7 @@
"4 2267923837.jpg#2r1e entailment NaN NaN NaN NaN " "4 2267923837.jpg#2r1e entailment NaN NaN NaN NaN "
] ]
}, },
"execution_count": 22, "execution_count": 5,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@ -511,7 +518,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 23, "execution_count": 6,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -591,7 +598,7 @@
"4 There are children present " "4 There are children present "
] ]
}, },
"execution_count": 23, "execution_count": 6,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@ -622,7 +629,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 24, "execution_count": 7,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -643,7 +650,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 25, "execution_count": 8,
"metadata": { "metadata": {
"scrolled": true "scrolled": true
}, },
@ -654,7 +661,7 @@
"'../../data\\\\clean/snli_1.0'" "'../../data\\\\clean/snli_1.0'"
] ]
}, },
"execution_count": 25, "execution_count": 8,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@ -682,7 +689,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 26, "execution_count": 9,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -691,7 +698,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 27, "execution_count": 10,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -713,29 +720,29 @@
"source": [ "source": [
"**Prerequisites:**\n", "**Prerequisites:**\n",
"\n", "\n",
"Upload the all the local files under `data_folder` to the path `./data/processed/` on the default datastore.\n", "Upload the all the local files under `data_folder` to the default datastore.\n",
"\n", "\n",
"**Note: To download data required to train a GenSen model in the original paper, run code [here](https://github.com/Maluuba/gensen/blob/master/get_data.sh). By training on the original datasets (training time around 20 hours), it will reproduce the results in the [paper](https://arxiv.org/abs/1804.00079). For simplicity, we will train on a smaller dataset, which is SNLI preprocessed in [1 Data Loading and Preprocessing](#1-Data-Loading-and-Preprocessing) for showcasing the example.**" "**Note: To download data required to train a GenSen model in the original paper, run code [here](https://github.com/Maluuba/gensen/blob/master/get_data.sh). By training on the original datasets (training time around 20 hours), it will reproduce the results in the [paper](https://arxiv.org/abs/1804.00079). For simplicity, we will train on a smaller dataset, which is SNLI preprocessed in [1 Data Loading and Preprocessing](#1-Data-Loading-and-Preprocessing) for showcasing the example.**"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 29, "execution_count": 11,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"data": { "data": {
"text/plain": [ "text/plain": [
"$AZUREML_DATAREFERENCE_6faee69b569b4268b8bf027b0bb4fd73" "$AZUREML_DATAREFERENCE_liqungensen"
] ]
}, },
"execution_count": 29, "execution_count": 11,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
], ],
"source": [ "source": [
"ds.upload(src_dir=data_folder, target_path=\"data/processed\", overwrite=True, show_progress=False)" "ds.upload(src_dir=data_folder, overwrite=True, show_progress=False)"
] ]
}, },
{ {
@ -769,7 +776,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 30, "execution_count": 4,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -777,13 +784,13 @@
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"Found existing compute target.\n", "Found existing compute target.\n",
"{'currentNodeCount': 0, 'targetNodeCount': 0, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2019-06-14T20:39:31.676000+00:00', 'errors': None, 'creationTime': '2019-06-03T21:18:34.507970+00:00', 'modifiedTime': '2019-06-03T21:18:50.790782+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 8, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_NC6'}\n" "{'currentNodeCount': 2, 'targetNodeCount': 2, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 2, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2019-06-21T20:14:04.778000+00:00', 'errors': None, 'creationTime': '2019-06-19T02:57:39.833104+00:00', 'modifiedTime': '2019-06-19T02:58:11.339451+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 4, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_NC6'}\n"
] ]
} }
], ],
"source": [ "source": [
"# choose a name for your cluster\n", "# choose a name for your cluster\n",
"cluster_name = \"gpugensen\"\n", "cluster_name = \"gensen-mlflow\"\n",
"\n", "\n",
"try:\n", "try:\n",
" compute_target = ComputeTarget(workspace=ws, name=cluster_name)\n", " compute_target = ComputeTarget(workspace=ws, name=cluster_name)\n",
@ -791,7 +798,7 @@
"except ComputeTargetException:\n", "except ComputeTargetException:\n",
" print('Creating a new compute target...')\n", " print('Creating a new compute target...')\n",
" compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_NC6',\n", " compute_config = AmlCompute.provisioning_configuration(vm_size='STANDARD_NC6',\n",
" max_nodes=8)\n", " max_nodes=2)\n",
"\n", "\n",
" # create the cluster\n", " # create the cluster\n",
" compute_target = ComputeTarget.create(ws, cluster_name, compute_config)\n", " compute_target = ComputeTarget.create(ws, cluster_name, compute_config)\n",
@ -814,7 +821,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 31, "execution_count": 5,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -836,19 +843,11 @@
"metadata": {}, "metadata": {},
"source": [ "source": [
"### 2.3.1 Prepare Training Script\n", "### 2.3.1 Prepare Training Script\n",
"Now you will need to create your training script. In this tutorial, the script for distributed training of GENSEN is already provided for you at `train.py`. In practice, you should be able to take any custom PyTorch training script as is and run it with Azure ML without having to modify your code.\n", "Now you will need to create your training script. In this tutorial, the script for distributed training of GENSEN is already provided for you at `gensen_train.py`. \n",
"\n", "\n",
"However, if you would like to use Azure ML's [metric logging](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#logging) capabilities, you will have to add a small amount of Azure ML logic inside your training script. In this example, at each logging interval, we will log the loss for that minibatch to our Azure ML run.\n", "In this example, we use MLflow to log your metrics. We also use the [Azure ML-Mlflow](https://pypi.org/project/azureml-mlflow/) package to log these metrics to the Azure Portal. This is done with no change to the provided training script!\n",
"\n", "\n",
"To do so, in `train.py`, we will first access the Azure ML `Run` object within the script:\n", "In this example the script provided logs the loss for that minibatch to our Azure ML portal."
"```Python\n",
"from azureml.core.run import Run\n",
"run = Run.get_context()\n",
"```\n",
"Later within the script, we log the loss metric to our run:\n",
"```Python\n",
"run.log('loss', loss.item())\n",
"```"
] ]
}, },
{ {
@ -872,7 +871,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 36, "execution_count": 6,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -881,7 +880,7 @@
"'../../utils_nlp/gensen/gensen_config.json'" "'../../utils_nlp/gensen/gensen_config.json'"
] ]
}, },
"execution_count": 36, "execution_count": 6,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@ -902,7 +901,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 37, "execution_count": 7,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -917,12 +916,12 @@
"### 2.3.3 Create a PyTorch Estimator\n", "### 2.3.3 Create a PyTorch Estimator\n",
"The Azure ML SDK's PyTorch estimator enables you to easily submit PyTorch training jobs for both single-node and distributed runs. For more information on the PyTorch estimator, refer [here](https://docs.microsoft.com/azure/machine-learning/service/how-to-train-pytorch).\n", "The Azure ML SDK's PyTorch estimator enables you to easily submit PyTorch training jobs for both single-node and distributed runs. For more information on the PyTorch estimator, refer [here](https://docs.microsoft.com/azure/machine-learning/service/how-to-train-pytorch).\n",
"\n", "\n",
"`sample_config.json` defines all the hyperparameters and paths when training GenSen model. The trained model will be saved in `data/models/example` to Azure Blob Storage. **Remember to clean `data/models/example` folder in order to save new models.**" "`gensen_config.json` defines all the hyperparameters and paths when training GenSen model. The trained model will be saved in `models` to Azure Blob Storage. **Remember to clean `models` folder in order to save new models.**"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 34, "execution_count": 8,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -942,11 +941,12 @@
" script_params=script_params,\n", " script_params=script_params,\n",
" compute_target=compute_target,\n", " compute_target=compute_target,\n",
" entry_script='utils_nlp/gensen/gensen_train.py',\n", " entry_script='utils_nlp/gensen/gensen_train.py',\n",
" node_count=4,\n", " node_count=2,\n",
" process_count_per_node=1,\n", " process_count_per_node=1,\n",
" distributed_training=MpiConfiguration(),\n", " distributed_training=MpiConfiguration(),\n",
" use_gpu=True,\n", " use_gpu=True,\n",
" conda_packages=['scikit-learn=0.20.3', 'h5py', 'nltk']\n", " conda_packages=['scikit-learn=0.20.3', 'h5py', 'nltk'],\n",
" pip_packages=['azureml-mlflow>=1.0.43.1','numpy>=1.16.0']\n",
" )" " )"
] ]
}, },
@ -979,7 +979,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 38, "execution_count": 9,
"metadata": { "metadata": {
"scrolled": true "scrolled": true
}, },
@ -988,7 +988,7 @@
"name": "stderr", "name": "stderr",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"Submitting C:\\Users\\lishao\\Project\\Rotation2\\NLP directory for run. The size of the directory >= 25 MB, so it can take a few minutes.\n" "Submitting E:\\Projects\\NLP-BP\\temp\\nlp directory for run. The size of the directory >= 25 MB, so it can take a few minutes.\n"
] ]
}, },
{ {
@ -996,9 +996,9 @@
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"Run(Experiment: pytorch-gensen,\n", "Run(Experiment: pytorch-gensen,\n",
"Id: pytorch-gensen_1560797674_e36e44f4,\n", "Id: pytorch-gensen_1561150688_f84eab04,\n",
"Type: azureml.scriptrun,\n", "Type: azureml.scriptrun,\n",
"Status: Preparing)\n" "Status: Queued)\n"
] ]
} }
], ],
@ -1019,7 +1019,7 @@
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"**Horovod on AzureML**\n", "#### Horovod on AzureML\n",
"\n", "\n",
"[Horovod](https://github.com/horovod/horovod) is a distributed training framework for TensorFlow, PyTorch etc. to make distributed Deep Learning fast and easy to use. We have created 2 nodes in the GPU cluster on AzureML. By using Horovod, we can use those two machines to train the model in parallel. In theory, the model trains faster on AzureML than on VM which uses single machine because it converges faster which we will get lower loss. However, by using more nodes, the model may take more time in communicating with each node. The communication time could be ignored when the model is trained on the large datasets.\n", "[Horovod](https://github.com/horovod/horovod) is a distributed training framework for TensorFlow, PyTorch etc. to make distributed Deep Learning fast and easy to use. We have created 2 nodes in the GPU cluster on AzureML. By using Horovod, we can use those two machines to train the model in parallel. In theory, the model trains faster on AzureML than on VM which uses single machine because it converges faster which we will get lower loss. However, by using more nodes, the model may take more time in communicating with each node. The communication time could be ignored when the model is trained on the large datasets.\n",
"\n", "\n",
@ -1031,7 +1031,7 @@
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
"source": [ "source": [
"**Interpret the Training Results**\n", "#### Interpret the Training Results\n",
"\n", "\n",
"The following chart shows the model validation loss (the less loss, the better performance) with different nodes with AmlCompute:\n", "The following chart shows the model validation loss (the less loss, the better performance) with different nodes with AmlCompute:\n",
"\n", "\n",
@ -1042,28 +1042,29 @@
"From the chart, we can tell training with more nodes, the performance is getting better with lower loss." "From the chart, we can tell training with more nodes, the performance is getting better with lower loss."
] ]
}, },
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The Azureml Widget allows an easy way to stream updates of the logged metrics right into your notebook. To use this feature install the widget by running the commands below. \n",
"\n",
"```\n",
"conda install ipywidgets\n",
"\n",
"jupyter nbextension install --py --user azureml.widgets\n",
"\n",
"jupyter nbextension enable azureml.widgets --user --py\n",
"\n",
"```"
]
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 39, "execution_count": null,
"metadata": { "metadata": {
"scrolled": true "scrolled": false
}, },
"outputs": [ "outputs": [],
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "19d55fcc0871444da604b1d828d9eac4",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': True, 'log_level': 'INFO', 's…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [ "source": [
"RunDetails(run).show()" "RunDetails(run).show()"
] ]
@ -1085,7 +1086,9 @@
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"metadata": {}, "metadata": {
"scrolled": true
},
"outputs": [], "outputs": [],
"source": [ "source": [
"run.wait_for_completion(show_output=True) # this provides a verbose log" "run.wait_for_completion(show_output=True) # this provides a verbose log"
@ -1114,6 +1117,35 @@
" ```" " ```"
] ]
}, },
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2.3.6 Clean up after training\n",
"\n",
"We finally delete the training script `gensen_train.py` and config file `gensen_config.json` from the project directory."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"gensen_train = os.path.join(project_folder,'utils_nlp/gensen/gensen_train.py')\n",
"gensen_config = os.path.join(project_folder,'utils_nlp/gensen/gensen_config.json')\n",
"\n",
"if os.path.isfile(gensen_train):\n",
" os.remove(gensen_train)\n",
"else:\n",
" print(\"Error: %s file not found\" % gensen_train)\n",
" \n",
"if os.path.isfile(gensen_config):\n",
" os.remove(gensen_config)\n",
"else:\n",
" print(\"Error: %s file not found\" % gensen_config)"
]
},
{ {
"cell_type": "markdown", "cell_type": "markdown",
"metadata": {}, "metadata": {},
@ -1135,7 +1167,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 130, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -1164,7 +1196,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 131, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -1194,54 +1226,11 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 132, "execution_count": null,
"metadata": { "metadata": {
"scrolled": false "scrolled": false
}, },
"outputs": [ "outputs": [],
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "c61e610d4601486e9f41fd852320b47b",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"_HyperDriveWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': True, 'log_level': 'INFO',…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': True, 'log_level': 'INFO', 's…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "5c47f13e11c646cd865d4f286b70ab0c",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': True, 'log_level': 'INFO', 's…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [ "source": [
"RunDetails(hyperdrive_run).show()" "RunDetails(hyperdrive_run).show()"
] ]
@ -1331,9 +1320,9 @@
} }
], ],
"kernelspec": { "kernelspec": {
"display_name": "Python 3", "display_name": "Python NLP CPU",
"language": "python", "language": "python",
"name": "python3" "name": "nlp_cpu"
}, },
"language_info": { "language_info": {
"codemirror_mode": { "codemirror_mode": {

Просмотреть файл

@ -15,20 +15,20 @@
}, },
"data": {"paths": [ "data": {"paths": [
{ {
"train_src": "data/processed/snli_1.0_train.txt.s1.tok", "train_src": "snli_1.0_train.txt.s1.tok",
"train_trg": "data/processed/snli_1.0_train.txt.s2.tok", "train_trg": "snli_1.0_train.txt.s2.tok",
"val_src": "data/processed/snli_1.0_dev.txt.s1.tok", "val_src": "snli_1.0_dev.txt.s1.tok",
"val_trg": "data/processed/snli_1.0_dev.txt.s1.tok", "val_trg": "snli_1.0_dev.txt.s1.tok",
"taskname": "snli" "taskname": "snli"
} }
], ],
"max_src_length": 90, "max_src_length": 90,
"max_trg_length": 90, "max_trg_length": 90,
"task": "multi-seq2seq-nli", "task": "multi-seq2seq-nli",
"save_dir": "data/models/example", "save_dir": "models/",
"nli_train": "data/processed/snli_1.0_train.txt.clean.noblank", "nli_train": "snli_1.0_train.txt.clean.noblank",
"nli_dev": "data/processed/snli_1.0_dev.txt.clean.noblank", "nli_dev": "snli_1.0_dev.txt.clean.noblank",
"nli_test": "data/processed/snli_1.0_test.txt.clean.noblank" "nli_test": "snli_1.0_test.txt.clean.noblank"
}, },
"model": { "model": {
"dim_src": 2048, "dim_src": 2048,

Просмотреть файл

@ -15,20 +15,20 @@ AzureML provides AI Compute to train the model and track the performance.
This training process is based on GPU only. This training process is based on GPU only.
""" """
import logging
import argparse import argparse
import os
import json import json
import logging
import os
import time import time
import horovod.torch as hvd
import mlflow
import numpy as np import numpy as np
import torch import torch
import torch.backends.cudnn as cudnn import torch.backends.cudnn as cudnn
import torch.nn as nn import torch.nn as nn
import torch.nn.functional as f import torch.nn.functional as f
import torch.optim as optim import torch.optim as optim
from azureml.core.run import Run
import horovod.torch as hvd
from utils_nlp.gensen.multi_task_model import MultitaskModel from utils_nlp.gensen.multi_task_model import MultitaskModel
from utils_nlp.gensen.utils import ( from utils_nlp.gensen.utils import (
@ -37,10 +37,8 @@ from utils_nlp.gensen.utils import (
compute_validation_loss, compute_validation_loss,
) )
# get the Azure ML run object
run = Run.get_context()
cudnn.benchmark = True cudnn.benchmark = True
logger = logging.getLogger(__name__)
hvd.init() hvd.init()
if torch.cuda.is_available(): if torch.cuda.is_available():
@ -172,7 +170,9 @@ def evaluate(
# Horovod: print output only on first rank. # Horovod: print output only on first rank.
if hvd.rank() == 0: if hvd.rank() == 0:
# log the best val accuracy to AML run # log the best val accuracy to AML run
run.log("Best Validation Loss", np.float(validation_loss)) logging.info(
"Best Validation Loss: {}".format(np.float(validation_loss))
)
# If the validation loss is small enough, and it starts to go up. # If the validation loss is small enough, and it starts to go up.
# Should stop training. # Should stop training.
@ -182,8 +182,6 @@ def evaluate(
min_val_loss_epoch = monitor_epoch min_val_loss_epoch = monitor_epoch
model_state = model.state_dict() model_state = model.state_dict()
run.log("Validation Loss", validation_loss)
print(monitor_epoch, min_val_loss_epoch, min_val_loss)
logging.info( logging.info(
"Monitor epoch: %d Validation Loss: %.3f Min Validation Epoch: " "Monitor epoch: %d Validation Loss: %.3f Min Validation Epoch: "
"%d Loss : %.3f " "%d Loss : %.3f "
@ -275,312 +273,333 @@ def train(config, data_folder, learning_rate=0.0001):
config(dict): Loaded json file as a python object. config(dict): Loaded json file as a python object.
data_folder(str): Path to the folder containing the data. data_folder(str): Path to the folder containing the data.
learning_rate(float): Learning rate for the model. learning_rate(float): Learning rate for the model.
""" """
owd = os.getcwd() owd = os.getcwd()
os.chdir(data_folder)
try: try:
save_dir = config["data"]["save_dir"] with mlflow.start_run():
save_dir = config["data"]["save_dir"]
if not os.path.exists("./log"):
os.makedirs("./log")
os.chdir(data_folder) os.makedirs(save_dir, exist_ok=True)
if not os.path.exists("./log"): setup_logging(config)
os.makedirs("./log")
os.makedirs(save_dir, exist_ok=True) batch_size = config["training"]["batch_size"]
src_vocab_size = config["model"]["n_words_src"]
trg_vocab_size = config["model"]["n_words_trg"]
max_len_src = config["data"]["max_src_length"]
max_len_trg = config["data"]["max_trg_length"]
model_state = {}
setup_logging(config) train_src = [item["train_src"] for item in config["data"]["paths"]]
train_trg = [item["train_trg"] for item in config["data"]["paths"]]
tasknames = [item["taskname"] for item in config["data"]["paths"]]
batch_size = config["training"]["batch_size"] # Keep track of indicies to train forward and backward jointly
src_vocab_size = config["model"]["n_words_src"] if (
trg_vocab_size = config["model"]["n_words_trg"] "skipthought_next" in tasknames
max_len_src = config["data"]["max_src_length"] and "skipthought_previous" in tasknames
max_len_trg = config["data"]["max_trg_length"] ):
model_state = {} skipthought_idx = tasknames.index("skipthought_next")
skipthought_backward_idx = tasknames.index(
train_src = [item["train_src"] for item in config["data"]["paths"]] "skipthought_previous"
train_trg = [item["train_trg"] for item in config["data"]["paths"]]
tasknames = [item["taskname"] for item in config["data"]["paths"]]
# Keep track of indicies to train forward and backward jointly
if (
"skipthought_next" in tasknames
and "skipthought_previous" in tasknames
):
skipthought_idx = tasknames.index("skipthought_next")
skipthought_backward_idx = tasknames.index("skipthought_previous")
paired_tasks = {
skipthought_idx: skipthought_backward_idx,
skipthought_backward_idx: skipthought_idx,
}
else:
paired_tasks = None
skipthought_idx = None
skipthought_backward_idx = None
train_iterator = BufferedDataIterator(
train_src,
train_trg,
src_vocab_size,
trg_vocab_size,
tasknames,
save_dir,
buffer_size=1e6,
lowercase=True,
seed=(hvd.rank() + 1) * 12345,
)
nli_iterator = NLIIterator(
train=config["data"]["nli_train"],
dev=config["data"]["nli_dev"],
test=config["data"]["nli_test"],
vocab_size=-1,
vocab=os.path.join(save_dir, "src_vocab.pkl"),
seed=(hvd.rank() + 1) * 12345,
)
src_vocab_size = len(train_iterator.src[0]["word2id"])
trg_vocab_size = len(train_iterator.trg[0]["word2id"])
# Logging set up.
logging.info("Finished creating iterator ...")
log_config(config)
logging.info(
"Found %d words in source : "
% (len(train_iterator.src[0]["id2word"]))
)
for idx, taskname in enumerate(tasknames):
logging.info(
"Found %d target words in task %s "
% (len(train_iterator.trg[idx]["id2word"]), taskname)
)
logging.info("Found %d words in src " % src_vocab_size)
logging.info("Found %d words in trg " % trg_vocab_size)
weight_mask = torch.ones(trg_vocab_size).cuda()
weight_mask[train_iterator.trg[0]["word2id"]["<pad>"]] = 0
loss_criterion = nn.CrossEntropyLoss(weight=weight_mask).cuda()
nli_criterion = nn.CrossEntropyLoss().cuda()
model = MultitaskModel(
src_emb_dim=config["model"]["dim_word_src"],
trg_emb_dim=config["model"]["dim_word_trg"],
src_vocab_size=src_vocab_size,
trg_vocab_size=trg_vocab_size,
src_hidden_dim=config["model"]["dim_src"],
trg_hidden_dim=config["model"]["dim_trg"],
bidirectional=config["model"]["bidirectional"],
pad_token_src=train_iterator.src[0]["word2id"]["<pad>"],
pad_token_trg=train_iterator.trg[0]["word2id"]["<pad>"],
nlayers_src=config["model"]["n_layers_src"],
dropout=config["model"]["dropout"],
num_tasks=len(train_iterator.src),
paired_tasks=paired_tasks,
).cuda()
optimizer = setup_horovod(model, learning_rate=learning_rate)
logging.info(model)
n_gpus = config["training"]["n_gpus"]
model = torch.nn.DataParallel(model, device_ids=range(n_gpus))
task_losses = [[] for _ in tasknames]
task_idxs = [0 for _ in tasknames]
nli_losses = []
updates = 0
nli_ctr = 0
nli_epoch = 0
monitor_epoch = 0
nli_mbatch_ctr = 0
mbatch_times = []
min_val_loss = 10000000
min_val_loss_epoch = -1
rng_num_tasks = len(tasknames) - 1 if paired_tasks else len(tasknames)
logging.info("Commencing Training ...")
start = time.time()
while True:
# Train NLI once every 10 minibatches of other tasks
if nli_ctr % 10 == 0:
minibatch = nli_iterator.get_parallel_minibatch(
nli_mbatch_ctr, batch_size * n_gpus
) )
optimizer.zero_grad() paired_tasks = {
class_logits = model( skipthought_idx: skipthought_backward_idx,
minibatch, -1, return_hidden=False, paired_trg=None skipthought_backward_idx: skipthought_idx,
) }
loss = nli_criterion(
class_logits.contiguous().view(-1, class_logits.size(1)),
minibatch["labels"].contiguous().view(-1),
)
# nli_losses.append(loss.data[0])
nli_losses.append(loss.item())
loss.backward()
torch.nn.utils.clip_grad_norm(model.parameters(), 1.0)
optimizer.step()
# For AML.
run.log("loss", loss.item())
nli_mbatch_ctr += batch_size * n_gpus
if nli_mbatch_ctr >= len(nli_iterator.train_lines):
nli_mbatch_ctr = 0
nli_epoch += 1
else: else:
# Sample a random task paired_tasks = None
task_idx = np.random.randint(low=0, high=rng_num_tasks) skipthought_idx = None
skipthought_backward_idx = None
# Get a minibatch corresponding to the sampled task train_iterator = BufferedDataIterator(
minibatch = train_iterator.get_parallel_minibatch( train_src,
task_idx, train_trg,
task_idxs[task_idx], src_vocab_size,
batch_size * n_gpus, trg_vocab_size,
max_len_src, tasknames,
max_len_trg, save_dir,
buffer_size=1e6,
lowercase=True,
seed=(hvd.rank() + 1) * 12345,
)
nli_iterator = NLIIterator(
train=config["data"]["nli_train"],
dev=config["data"]["nli_dev"],
test=config["data"]["nli_test"],
vocab_size=-1,
vocab=os.path.join(save_dir, "src_vocab.pkl"),
seed=(hvd.rank() + 1) * 12345,
)
src_vocab_size = len(train_iterator.src[0]["word2id"])
trg_vocab_size = len(train_iterator.trg[0]["word2id"])
# Logging set up.
logging.info("Finished creating iterator ...")
log_config(config)
logging.info(
"Found %d words in source : "
% (len(train_iterator.src[0]["id2word"]))
)
for idx, taskname in enumerate(tasknames):
logging.info(
"Found %d target words in task %s "
% (len(train_iterator.trg[idx]["id2word"]), taskname)
) )
logging.info("Found %d words in src " % src_vocab_size)
logging.info("Found %d words in trg " % trg_vocab_size)
"""Increment pointer into task and if current buffer is weight_mask = torch.ones(trg_vocab_size).cuda()
exhausted, fetch new buffer. """ weight_mask[train_iterator.trg[0]["word2id"]["<pad>"]] = 0
task_idxs[task_idx] += batch_size * n_gpus loss_criterion = nn.CrossEntropyLoss(weight=weight_mask).cuda()
if task_idxs[task_idx] >= train_iterator.buffer_size: nli_criterion = nn.CrossEntropyLoss().cuda()
train_iterator.fetch_buffer(task_idx)
task_idxs[task_idx] = 0
if task_idx == skipthought_idx: model = MultitaskModel(
minibatch_back = train_iterator.get_parallel_minibatch( src_emb_dim=config["model"]["dim_word_src"],
skipthought_backward_idx, trg_emb_dim=config["model"]["dim_word_trg"],
task_idxs[skipthought_backward_idx], src_vocab_size=src_vocab_size,
trg_vocab_size=trg_vocab_size,
src_hidden_dim=config["model"]["dim_src"],
trg_hidden_dim=config["model"]["dim_trg"],
bidirectional=config["model"]["bidirectional"],
pad_token_src=train_iterator.src[0]["word2id"]["<pad>"],
pad_token_trg=train_iterator.trg[0]["word2id"]["<pad>"],
nlayers_src=config["model"]["n_layers_src"],
dropout=config["model"]["dropout"],
num_tasks=len(train_iterator.src),
paired_tasks=paired_tasks,
).cuda()
optimizer = setup_horovod(model, learning_rate=learning_rate)
logging.info(model)
n_gpus = config["training"]["n_gpus"]
model = torch.nn.DataParallel(model, device_ids=range(n_gpus))
task_losses = [[] for _ in tasknames]
task_idxs = [0 for _ in tasknames]
nli_losses = []
updates = 0
nli_ctr = 0
nli_epoch = 0
monitor_epoch = 0
nli_mbatch_ctr = 0
mbatch_times = []
min_val_loss = 10000000
min_val_loss_epoch = -1
rng_num_tasks = (
len(tasknames) - 1 if paired_tasks else len(tasknames)
)
logging.info("OS Environ: \n {} \n\n".format(os.environ))
mlflow.log_param("learning_rate", learning_rate)
logging.info("Commencing Training ...")
start = time.time()
while True:
batch_start_time = time.time()
# Train NLI once every 10 minibatches of other tasks
if nli_ctr % 10 == 0:
minibatch = nli_iterator.get_parallel_minibatch(
nli_mbatch_ctr, batch_size * n_gpus
)
optimizer.zero_grad()
class_logits = model(
minibatch, -1, return_hidden=False, paired_trg=None
)
loss = nli_criterion(
class_logits.contiguous().view(
-1, class_logits.size(1)
),
minibatch["labels"].contiguous().view(-1),
)
# nli_losses.append(loss.data[0])
nli_losses.append(loss.item())
loss.backward()
torch.nn.utils.clip_grad_norm(model.parameters(), 1.0)
optimizer.step()
nli_mbatch_ctr += batch_size * n_gpus
if nli_mbatch_ctr >= len(nli_iterator.train_lines):
nli_mbatch_ctr = 0
nli_epoch += 1
else:
# Sample a random task
task_idx = np.random.randint(low=0, high=rng_num_tasks)
# Get a minibatch corresponding to the sampled task
minibatch = train_iterator.get_parallel_minibatch(
task_idx,
task_idxs[task_idx],
batch_size * n_gpus, batch_size * n_gpus,
max_len_src, max_len_src,
max_len_trg, max_len_trg,
) )
task_idxs[skipthought_backward_idx] += batch_size * n_gpus
if (
task_idxs[skipthought_backward_idx]
>= train_iterator.buffer_size
):
train_iterator.fetch_buffer(skipthought_backward_idx)
task_idxs[skipthought_backward_idx] = 0
optimizer.zero_grad() """Increment pointer into task and if current buffer is
decoder_logit, decoder_logit_2 = model( exhausted, fetch new buffer. """
minibatch, task_idxs[task_idx] += batch_size * n_gpus
task_idx, if task_idxs[task_idx] >= train_iterator.buffer_size:
paired_trg=minibatch_back["input_trg"], train_iterator.fetch_buffer(task_idx)
) task_idxs[task_idx] = 0
loss_f = loss_criterion( if task_idx == skipthought_idx:
decoder_logit.contiguous().view( minibatch_back = train_iterator.get_parallel_minibatch(
-1, decoder_logit.size(2) skipthought_backward_idx,
), task_idxs[skipthought_backward_idx],
minibatch["output_trg"].contiguous().view(-1), batch_size * n_gpus,
) max_len_src,
max_len_trg,
)
task_idxs[skipthought_backward_idx] += (
batch_size * n_gpus
)
if (
task_idxs[skipthought_backward_idx]
>= train_iterator.buffer_size
):
train_iterator.fetch_buffer(
skipthought_backward_idx
)
task_idxs[skipthought_backward_idx] = 0
loss_b = loss_criterion( optimizer.zero_grad()
decoder_logit_2.contiguous().view( decoder_logit, decoder_logit_2 = model(
-1, decoder_logit_2.size(2) minibatch,
), task_idx,
minibatch_back["output_trg"].contiguous().view(-1), paired_trg=minibatch_back["input_trg"],
) )
task_losses[task_idx].append(loss_f.data[0]) loss_f = loss_criterion(
task_losses[skipthought_backward_idx].append( decoder_logit.contiguous().view(
loss_b.data[0] -1, decoder_logit.size(2)
) ),
loss = loss_f + loss_b minibatch["output_trg"].contiguous().view(-1),
)
else: loss_b = loss_criterion(
optimizer.zero_grad() decoder_logit_2.contiguous().view(
decoder_logit = model(minibatch, task_idx) -1, decoder_logit_2.size(2)
),
minibatch_back["output_trg"].contiguous().view(-1),
)
loss = loss_criterion( task_losses[task_idx].append(loss_f.data[0])
decoder_logit.contiguous().view( task_losses[skipthought_backward_idx].append(
-1, decoder_logit.size(2) loss_b.data[0]
), )
minibatch["output_trg"].contiguous().view(-1), loss = loss_f + loss_b
)
task_losses[task_idx].append(loss.item()) else:
optimizer.zero_grad()
decoder_logit = model(minibatch, task_idx)
loss.backward() loss = loss_criterion(
# For distributed optimizer need to sync before gradient decoder_logit.contiguous().view(
# clipping. -1, decoder_logit.size(2)
optimizer.synchronize() ),
minibatch["output_trg"].contiguous().view(-1),
)
torch.nn.utils.clip_grad_norm(model.parameters(), 1.0) task_losses[task_idx].append(loss.item())
optimizer.step()
end = time.time() loss.backward()
mbatch_times.append(end - start) # For distributed optimizer need to sync before gradient
# clipping.
optimizer.synchronize()
# Validations torch.nn.utils.clip_grad_norm(model.parameters(), 1.0)
if ( optimizer.step()
updates % config["management"]["monitor_loss"] == 0
and updates != 0 end = time.time()
): mbatch_times.append(end - batch_start_time)
monitor_epoch += 1
for idx, task in enumerate(tasknames): # Validations
logging.info( if (
"Seq2Seq Examples Processed : %d %s Loss : %.5f Num %s " updates % config["management"]["monitor_loss"] == 0
"minibatches : %d" and updates != 0
% ( ):
updates, monitor_epoch += 1
task, for idx, task in enumerate(tasknames):
logging.info(
"Seq2Seq Examples Processed : %d %s Loss : %.5f Num %s "
"minibatches : %d"
% (
updates,
task,
np.mean(task_losses[idx]),
task,
len(task_losses[idx]),
)
)
mlflow.log_metric(
"validation_loss",
np.mean(task_losses[idx]), np.mean(task_losses[idx]),
task, step=monitor_epoch,
len(task_losses[idx]), )
logging.info(
"Round: %d NLI Epoch : %d NLI Examples Processed : %d NLI "
"Loss : %.5f "
% (
nli_ctr,
nli_epoch,
nli_mbatch_ctr,
np.mean(nli_losses),
) )
) )
run.log("Task Loss", np.mean(task_losses[idx])) mlflow.log_metric(
"nli_loss", np.mean(nli_losses), step=nli_epoch
)
logging.info( logging.info(
"Round: %d NLI Epoch : %d NLI Examples Processed : %d NLI " "Average time per mininbatch : %.5f"
"Loss : %.5f " % (np.mean(mbatch_times))
% (nli_ctr, nli_epoch, nli_mbatch_ctr, np.mean(nli_losses)) )
) mlflow.log_metric(
run.log("NLI Loss", np.mean(nli_losses)) "minibatch_avg_duration", np.mean(mbatch_times)
logging.info( )
"Average time per mininbatch : %.5f"
% (np.mean(mbatch_times))
)
run.log(
"Average time per mininbatch : ", np.mean(mbatch_times)
)
task_losses = [[] for _ in tasknames]
mbatch_times = []
nli_losses = []
# For validate and break if done. task_losses = [[] for _ in tasknames]
logging.info("############################") mbatch_times = []
logging.info("##### Evaluating model #####") nli_losses = []
logging.info("############################")
training_complete, min_val_loss_epoch, min_val_loss, model_state = evaluate(
config=config,
train_iterator=train_iterator,
model=model,
loss_criterion=loss_criterion,
monitor_epoch=monitor_epoch,
min_val_loss=min_val_loss,
min_val_loss_epoch=min_val_loss_epoch,
save_dir=save_dir,
starting_time=start,
model_state=model_state,
)
if training_complete:
break
logging.info("Evaluating on NLI") # For validate and break if done.
evaluate_nli( logging.info("############################")
nli_iterator=nli_iterator, logging.info("##### Evaluating model #####")
model=model, logging.info("############################")
n_gpus=n_gpus, training_complete, min_val_loss_epoch, min_val_loss, model_state = evaluate(
batch_size=batch_size, config=config,
) train_iterator=train_iterator,
model=model,
loss_criterion=loss_criterion,
monitor_epoch=monitor_epoch,
min_val_loss=min_val_loss,
min_val_loss_epoch=min_val_loss_epoch,
save_dir=save_dir,
starting_time=start,
model_state=model_state,
)
if training_complete:
break
updates += batch_size * n_gpus logging.info("Evaluating on NLI")
nli_ctr += 1 evaluate_nli(
logging.info("Updates: %d" % updates) nli_iterator=nli_iterator,
model=model,
n_gpus=n_gpus,
batch_size=batch_size,
)
updates += batch_size * n_gpus
nli_ctr += 1
logging.info("Updates: %d" % updates)
finally: finally:
os.chdir(owd) os.chdir(owd)

Просмотреть файл

@ -38,6 +38,7 @@ def test_load_pretrained_vectors_word2vec():
assert isinstance(load_word2vec(dir_path), Word2VecKeyedVectors) assert isinstance(load_word2vec(dir_path), Word2VecKeyedVectors)
def test_load_pretrained_vectors_glove(): def test_load_pretrained_vectors_glove():
dir_path = "temp_data/" dir_path = "temp_data/"
file_path = os.path.join( file_path = os.path.join(
@ -58,7 +59,8 @@ def test_load_pretrained_vectors_glove():
def test_load_pretrained_vectors_fasttext(): def test_load_pretrained_vectors_fasttext():
dir_path = "temp_data/" dir_path = "temp_data/"
file_path = os.path.join(os.path.join(dir_path, "fastText"), "wiki.simple.bin") file_path = os.path.join(os.path.join(dir_path, "fastText"),
"wiki.simple.bin")
assert isinstance(load_fasttext(dir_path), FastText) assert isinstance(load_fasttext(dir_path), FastText)

Просмотреть файл

@ -54,9 +54,7 @@ CONDA_GPU = {
} }
PIP_BASE = { PIP_BASE = {
"azureml-sdk[notebooks,tensorboard]": ( "azureml-sdk[notebooks,tensorboard]": "azureml-sdk[notebooks,tensorboard]==1.0.43",
"azureml-sdk[notebooks,tensorboard]==1.0.33"
),
"azureml-dataprep": "azureml-dataprep==1.1.4", "azureml-dataprep": "azureml-dataprep==1.1.4",
"black": "black>=18.6b4", "black": "black>=18.6b4",
"dask": "dask[dataframe]==1.2.2", "dask": "dask[dataframe]==1.2.2",
@ -75,6 +73,7 @@ PIP_BASE = {
"nltk": "nltk>=3.4", "nltk": "nltk>=3.4",
"pytorch-pretrained-bert": "pytorch-pretrained-bert>=0.6", "pytorch-pretrained-bert": "pytorch-pretrained-bert>=0.6",
"seqeval": "seqeval>=0.0.12", "seqeval": "seqeval>=0.0.12",
"azureml-mlflow": "azureml-mlflow>=1.0.43.1",
} }
PIP_GPU = {"horovod": "horovod>=0.16.1"} PIP_GPU = {"horovod": "horovod>=0.16.1"}