* Add a --test option that runs only the data prep step to test the environment is working.

* force train.py to grab the lock on the row (removing rare failure case).

* Fix snpe kubernetes scaling using the anti-node affinity pattern.
Publish new docker image.
Add mlflow integration to train.py.

* Add script that does full training pipeline for final pareto models.

* add iteration 7

* add iteration 9

* switch to bokeh so I can get nice tooltips on each dot in the scatter plot.

* add axis titles.

* Add device F1 scoring to train_pareto
Add more to readmes.

* add image

* Add helper script to do final F1 scoring on Qualcomm devices.

* fix lint errors.

* fix bugs
This commit is contained in:
Chris Lovett 2023-04-27 13:03:08 -07:00 коммит произвёл GitHub
Родитель 4a3cf62a77
Коммит 93b8ab75d7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 463 добавлений и 110 удалений

Просмотреть файл

@ -33,7 +33,8 @@ class RemoteAzureBenchmarkEvaluator(AsyncModelEvaluator):
max_retries: Optional[int] = 5,
retry_interval: Optional[int] = 120,
onnx_export_kwargs: Optional[Dict[str, Any]] = None,
verbose: bool = False
verbose: bool = False,
benchmark_only: bool = True
) -> None:
"""Initialize the evaluator.
@ -63,6 +64,7 @@ class RemoteAzureBenchmarkEvaluator(AsyncModelEvaluator):
self.onnx_export_kwargs = onnx_export_kwargs or dict()
self.verbose = verbose
self.results = {}
self.benchmark_only = benchmark_only
# Architecture list
self.archids = []
@ -107,34 +109,41 @@ class RemoteAzureBenchmarkEvaluator(AsyncModelEvaluator):
return
entity = self.store.get_status(archid) # this is a get or create operation.
entity["benchmark_only"] = 1
if self.benchmark_only:
entity["benchmark_only"] = 1
entity["model_date"] = self.store.get_utc_date()
entity["model_name"] = "model.onnx"
self.store.update_status_entity(entity) # must be an update, not a merge.
self.store.lock_entity(entity, "uploading")
try:
with TemporaryDirectory() as tmp_dir:
tmp_dir = Path(tmp_dir)
# Uploads ONNX file to blob storage and updates the table entry
arch.arch.to("cpu")
file_name = str(tmp_dir / "model.onnx")
# Exports model to ONNX
torch.onnx.export(
arch.arch,
self.sample_input,
file_name,
input_names=[f"input_{i}" for i in range(len(self.sample_input))],
**self.onnx_export_kwargs,
)
if arch.arch is not None:
try:
with TemporaryDirectory() as tmp_dir:
tmp_dir = Path(tmp_dir)
self.store.upload_blob(f'{self.experiment_name}/{archid}', file_name, "model.onnx")
entity["status"] = "new"
except Exception as e:
entity["error"] = str(e)
finally:
self.store.unlock_entity(entity)
# Uploads ONNX file to blob storage and updates the table entry
arch.arch.to("cpu")
file_name = str(tmp_dir / "model.onnx")
# Exports model to ONNX
torch.onnx.export(
arch.arch,
self.sample_input,
file_name,
input_names=[f"input_{i}" for i in range(len(self.sample_input))],
**self.onnx_export_kwargs,
)
self.store.upload_blob(f'{self.experiment_name}/{archid}', file_name, "model.onnx")
entity["status"] = "new"
except Exception as e:
entity["error"] = str(e)
else:
# then the blob store must already have a model.onnx file!
blobs = self.store.list_blobs(f'{self.experiment_name}/{archid}')
if 'model.onnx' not in blobs:
entity["error"] = "model.onnx is missing"
self.store.unlock_entity(entity)
self.archids.append(archid)
if self.verbose:

Просмотреть файл

@ -76,7 +76,7 @@ def main():
mlflow.pytorch.autolog(log_models=save_models, registered_model_name=name)
with mlflow.start_run() as run:
trainer.fit(model, data)
print_auto_logged_info(mlflow.get_run(run_id=run.info.run_id))
print_auto_logged_info(mlflow.get_run(run_id=run.info.run_id))
result = trainer.validate(model, data)
val_acc = result[0]['accuracy']

Просмотреть файл

@ -38,7 +38,6 @@ configurations based on the desired target (CPU or Snapdragon processor), `searc
* [cpu_search.yaml](confs/cpu_search.yaml)
* [snp_search.yaml](confs/snp_search.yaml)
Note: to use `snp_search.yaml` you will need to follow the [Azure ML setup instructions](aml/readme.md).
By default, `search.py` will run multiple partial training jobs using Ray (2 jobs per GPU). To change the number of gpus
per job, set `--gpus_per_job`, or use the `--serial_training` flag to disable parallel training jobs altogether.
@ -47,6 +46,15 @@ The pareto architecture files selected by the search algorithm can be found unde
A table with the partial training performance and other objectives can be found in the
`[output_dir]/search_state_XX.csv` file.
## Running the Search on Azure ML
You can run the `aml.py` script to start the search on Azure ML and perform the
partial training in parallel also on Azure ML.
See [Azure ML setup instructions](aml/readme.md).
Note: to use `snp_search.yaml` you will also need to follow these instructions.
## Final Training
To fully train one of the selected architectures by the NAS algorithm use the following command

Просмотреть файл

@ -85,7 +85,7 @@ def search_component(config, environment_name, seed, modelstore_path, output_pat
)
def main(output_dir: Path, experiment_name: str, seed: int):
def main(output_dir: Path, experiment_name: str, seed: int, data_prep_only: bool):
if output_dir.exists():
rmtree(str(output_dir))
output_dir.mkdir(parents=True)
@ -170,13 +170,18 @@ def main(output_dir: Path, experiment_name: str, seed: int):
name=experiment_name
)
search_job = search_component(config, environment_name, seed, results_path, output_dir)(
data=data_prep_job.outputs.data
)
if data_prep_only:
return {
"results": data_prep_job.outputs.data
}
else:
search_job = search_component(config, environment_name, seed, results_path, output_dir)(
data=data_prep_job.outputs.data
)
return {
"results": search_job.outputs.results
}
return {
"results": search_job.outputs.results
}
pipeline_job = ml_client.jobs.create_or_update(
archai_search_pipeline(),
@ -198,7 +203,8 @@ if __name__ == '__main__':
parser.add_argument('--output_dir', type=Path, help='Output directory for downloading results.', default='output')
parser.add_argument('--experiment_name', default='facesynthetics')
parser.add_argument('--seed', type=int, help='Random seed', default=42)
parser.add_argument('--test', help='Run only the data_prep step to test environment is working', action="store_true")
args = parser.parse_args()
rc = main(args.output_dir, args.experiment_name, args.seed)
rc = main(args.output_dir, args.experiment_name, args.seed, args.test)
sys.exit(rc)

Просмотреть файл

@ -0,0 +1,33 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import os
import json
import sys
from archai.common.store import ArchaiStore
CONNECTION_NAME = 'MODEL_STORAGE_CONNECTION_STRING'
def change_schema(store: ArchaiStore):
""" Handy script for making batch changes to the azure table """
for e in store.get_all_status_entities():
status = e['status']
if status == 'preparing':
e['status'] = 'complete'
e['epochs'] = 1
name = e['name']
print(f'fixing {name}')
store.merge_status_entity(e)
if __name__ == '__main__':
experiment_name = os.getenv("EXPERIMENT_NAME", "facesynthetics")
con_str = os.getenv(CONNECTION_NAME)
if not con_str:
print(f"Please specify your {CONNECTION_NAME} environment variable.")
sys.exit(1)
storage_account_name, storage_account_key = ArchaiStore.parse_connection_string(con_str)
store = ArchaiStore(storage_account_name, storage_account_key, table_name=experiment_name)
change_schema(store)

Просмотреть файл

@ -62,7 +62,7 @@ RUN wget -O azcopy_v10.tar.gz https://aka.ms/downloadazcopy-v10-linux && tar -xf
# this echo is a trick to bypass docker build cache.
# simply change the echo string every time you want docker build to pull down new bits.
RUN echo '04/18/2023 12:22 PM' >/dev/null && git clone https://github.com/microsoft/archai.git
RUN echo '04/25/2023 12:22 PM' >/dev/null && git clone https://github.com/microsoft/archai.git
RUN cd archai && pip install -e .[dev]
RUN echo "using this pip version: " && which pip

Просмотреть файл

@ -13,6 +13,18 @@ spec:
labels:
app: snpe-quantizer
spec:
affinity:
# See https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#more-practical-use-cases
# The quantizer is processor intensive, so we do not want more than one per node.
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- snpe-quantizer
topologyKey: "kubernetes.io/hostname"
containers:
- name: snpe-quantizer
image: snpecontainerregistry001.azurecr.io/quantizer:1.0

Двоичные данные
tasks/face_segmentation/aml/images/iteration10.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 39 KiB

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -25,7 +25,15 @@ the Qualcomm Neural Processing SDK. Quantization is time consuming so having an
things up a lot.
1. [Notebook](notebooks/results.ipynb) a simple Jupyter notebook for visualizing the
results in your Azure table.
results found in your Azure table.
The jupyter notebook can be used to visualize the results of the search iterations as they are
happening. The following is a snapshot after 10 iterations are completed where the darker colors
are the early iterations and the brighter colors are the most recent iterations. The pareto frontier
models are in yellow. This clearly shows the general trend of model improvement over time on each new
iteration.
![snapshot](images/iteration10.png)
## Workflow

Просмотреть файл

@ -50,4 +50,4 @@ quantized version of the model and a `.csv` file containing information about in
layer by layer information which is handy).
The json file contains the `ArchConfig` for the model which can be used to recreate the model
and fully train it using the [train.py](../../train.py) script.
and fully train it using the [train.py](../../train.py) script.

Просмотреть файл

@ -1,6 +1,5 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import json
import os
from pathlib import Path

Просмотреть файл

@ -0,0 +1,17 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
def calc_pareto_frontier(points):
""" Given an array of points where the first 2 coordinates define a 2D point
return a list of array indexes that define the pareto frontier for these points """
pareto = []
pareto += [0]
p1 = points[0]
for i in range(1, len(points)):
p2 = points[i]
if p2[1] > p1[1]:
pareto += [i]
p1 = p2
return pareto

Просмотреть файл

@ -0,0 +1,78 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import argparse
import sys
from archai.discrete_search.api import ArchaiModel
from archai.common.config import Config
from aml.training.aml_training_evaluator import AmlPartialTrainingEvaluator
from search_space.hgnet import HgnetSegmentationSearchSpace
from archai.discrete_search.evaluators.remote_azure_benchmark import RemoteAzureBenchmarkEvaluator
from aml.util.setup import configure_store
def main():
# input and output arguments
parser = argparse.ArgumentParser(description="Runs Snapdragon F1 scoring on the final fully trained models produced by train_pareto.py.")
parser.add_argument("--config", type=str, help="location of the aml_search.yaml file", required=True)
args = parser.parse_args()
config = Config(args.config, resolve_env_vars=True)
aml_config = config['aml']
experiment_name = aml_config['experiment_name']
metric_key = 'final_val_iou'
search_config = config['search']
ss_config = search_config['search_space']
target_config = search_config.get('target', {})
target_name = target_config.pop('name', 'cpu')
device_evaluator = None
if target_name != 'snp':
print(f"Snapdragon target is not configured in {args.config}")
sys.exit(1)
store = configure_store(aml_config)
fully_trained = []
for e in store.get_all_status_entities(status='complete'):
if metric_key in e:
fully_trained += [e]
if len(fully_trained) == 0:
print(f"No fully trained models found with required metric '{metric_key}'")
sys.exit(1)
# the RemoteAzureBenchmarkEvaluator only needs the archid actually, doesn't need the nn.Module.
models = []
for i in fully_trained:
id = e['name']
e['status'] = 'preparing'
if 'benchmark_only' in e:
del e['benchmark_only']
store.update_status_entity(e)
models += [ArchaiModel(None, archid=id[3:])]
# kick off remote device training without the benchmark_only flag so we get the
# F1 scores for these fully trained models. Note the above results_path ensures the trained
# models are uploaded back to our models blob store.
search_space = HgnetSegmentationSearchSpace(
seed=42, # not important in this case.
**ss_config.get('params', {}),
)
input_shape = (1, search_space.in_channels, *search_space.img_size[::-1])
device_evaluator = RemoteAzureBenchmarkEvaluator(
input_shape=input_shape,
store=store,
experiment_name=experiment_name,
onnx_export_kwargs={'opset_version': 11},
benchmark_only=0, # do full F1 scoring this time.
**target_config
)
for model in models:
device_evaluator.send(model)
device_evaluator.fetch_all()
if __name__ == "__main__":
main()

Просмотреть файл

@ -6,6 +6,7 @@ from argparse import ArgumentParser
import os
import time
import torch
import mlflow
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import ModelCheckpoint
@ -17,6 +18,16 @@ from archai.common.store import ArchaiStore
from archai.common.config import Config
def print_auto_logged_info(r):
tags = {k: v for k, v in r.data.tags.items() if not k.startswith("mlflow.")}
artifacts = [f.path for f in mlflow.MlflowClient().list_artifacts(r.info.run_id, "model")]
print("run_id: {}".format(r.info.run_id))
print("artifacts: {}".format(artifacts))
print("params: {}".format(r.data.params))
print("metrics: {}".format(r.data.metrics))
print("tags: {}".format(tags))
def main():
parser = ArgumentParser()
parser.add_argument('arch', type=Path)
@ -28,6 +39,7 @@ def main():
parser.add_argument('--val_check_interval', type=float, default=1.0)
parser.add_argument('--model_id', type=str, default=None)
parser.add_argument('--config', type=Path, default=None)
parser.add_argument('--register', help="Specify whether to register the trained model with your mlflow workspace", action="store_true")
args = parser.parse_args()
model_id = args.model_id
@ -55,9 +67,10 @@ def main():
print(f'Locking entity {model_id}')
e = store.lock(model_id, 'training')
if e is None:
e = store.get_status(model_id)
node = e['node']
raise Exception(f'Entity should not be locked by: "{node}"')
# force the reset of this lock so the training job can take it!
# might be a left over from previous failed job.
store.unlock_entity(store.get_status(model_id))
e = store.lock(model_id, 'training')
pipeline_id = os.getenv('AZUREML_ROOT_RUN_ID')
if pipeline_id is not None:
@ -95,7 +108,10 @@ def main():
callbacks=callbacks
)
trainer.fit(pl_model, tr_dl, val_dl)
mlflow.pytorch.autolog(log_models=args.register, registered_model_name=model_id)
with mlflow.start_run() as run:
trainer.fit(pl_model, tr_dl, val_dl)
print_auto_logged_info(mlflow.get_run(run_id=run.info.run_id))
val_result = trainer.validate(trainer.model, val_dl)
print(val_result)

Просмотреть файл

@ -0,0 +1,97 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
import argparse
import os
import numpy as np
import sys
import tempfile
from pathlib import Path
from archai.discrete_search.api import ArchaiModel
from archai.discrete_search.search_spaces.config import ArchConfig
from archai.common.config import Config
from aml.util.pareto import calc_pareto_frontier
from search_space.hgnet import StackedHourglass
from aml.training.aml_training_evaluator import AmlPartialTrainingEvaluator
from aml.util.setup import configure_store
def main():
# input and output arguments
parser = argparse.ArgumentParser(description="Fully trains the final pareto curve models in parallel on Azure ML.")
parser.add_argument("--config", type=str, help="location of the aml_search.yaml file", required=True)
parser.add_argument("--output", type=str, help="location of local output files", default='output')
parser.add_argument('--epochs', type=float, help='number of epochs to train (default 1)', default=30)
parser.add_argument('--timeout', type=int, help='Timeout for training (in seconds)(default 28800 seconds = 8 hours)', default=28800)
args = parser.parse_args()
config = Config(args.config, resolve_env_vars=True)
aml_config = config['aml']
store = configure_store(aml_config)
evaluator = AmlPartialTrainingEvaluator(config, args.output, args.epochs, args.timeout)
store = evaluator.store
experiment_name = aml_config['experiment_name']
training = config['training']
metric_key = training['metric_key']
search_config = config['search']
target_metric_key = search_config['target']['metric_key']
ss_config = search_config['search_space']
ss_config_params = ss_config.get('params', {})
num_classes = ss_config_params.get('num_classes', 18)
points = []
for e in store.get_all_status_entities(status='complete'):
if metric_key in e and target_metric_key in e:
y = float(e[metric_key])
x = float(e[target_metric_key])
points += [[x, y, e]]
if len(points) == 0:
print(f"No models found with required metrics '{metric_key}' and '{target_metric_key}'")
sys.exit(1)
points = np.array(points)
sorted = points[points[:, 0].argsort()]
pareto = calc_pareto_frontier(sorted)
print(f'Found {len(pareto)} models on pareto frontier')
# change the key so the evaluator updates a different field this time and
# does not think training is already complete.
evaluator.metric_key = 'final_val_iou'
training['metric_key'] = 'final_val_iou'
models = []
with tempfile.TemporaryDirectory() as tempdir:
for i in pareto:
x, y, e = sorted[i]
id = e['name']
iteration = int(e['iteration']) if 'iteration' in e else 0
training_metric = y
target_metric = x
file_name = f'{id}.json'
print(f'downloading {file_name} with {metric_key}={training_metric} and {target_metric_key}={target_metric} from iteration {iteration} ...')
found = store.download(f'{experiment_name}/{id}', tempdir, specific_file=file_name)
if len(found) == 1:
arch_config = ArchConfig.from_file(os.path.join(tempdir, file_name))
model = StackedHourglass(arch_config, num_classes=num_classes)
models += [ArchaiModel(model, archid=id[3:], metadata={'config' : arch_config, 'entity': e})]
else:
print("Skipping model {id} because the .json arch config file was not found in the store.")
# Ok, now fully train these models!
print(f'Kicking off full training on {len(models)} models...')
for model in models:
e = model.metadata['entity']
e = store.get_status(id)
e['status'] = 'preparing'
store.merge_status_entity(e)
evaluator.send(model)
evaluator.fetch_all()
if __name__ == "__main__":
main()