Merged PR 566: Implement remote execution on Azure VM

Related work items: #440
This commit is contained in:
Sergiy Matusevych 2022-08-10 19:36:24 +00:00
Родитель b85ef791e7
Коммит ba665e5b58
16 изменённых файлов: 304 добавлений и 137 удалений

2
.flake8 Normal file
Просмотреть файл

@ -0,0 +1,2 @@
[flake8]
max-line-length = 132

Просмотреть файл

@ -9,7 +9,7 @@
"class": "mlos_bench.environment.azure.AzureVMService",
"config": {
"template_path": "./config/azure/azuredeploy.json",
"deploy_template_path": "./mlos_bench/config/azure/azuredeploy-ubuntu-vm.json",
"subscription": "...",
"resource_group": "sergiym-os-autotune",
@ -78,6 +78,8 @@
"config": {
"pollDelay": 10,
"cost": 1,
"tunable_params": {
@ -90,11 +92,19 @@
},
"const_args": {
"commandId": "RunBenchmark",
"commandId": "RunShellScript",
"script": [
"ls -l /",
"uname -a",
"sysctl kernel.sched_migration_cost_ns=${kernel.sched_migration_cost_ns}"
"ls -l /"
],
"parameters": [
{
"name": "param1",
"value": "111"
},
{
"name": "param2",
"value": "222"
}
]
}
}

Просмотреть файл

@ -1,15 +0,0 @@
{
"$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentParameters.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"adminUsername": {
"value": "GEN-UNIQUE"
},
"adminPasswordOrKey": {
"value": "GEN-PASSWORD"
},
"dnsLabelPrefix": {
"value": "GEN-UNIQUE"
}
}
}

Просмотреть файл

@ -3,12 +3,11 @@ Benchmarking environments for OS Autotune.
"""
from mlos_bench.environment.status import Status
from mlos_bench.environment.base_svc import Service
from mlos_bench.environment.base_env import Environment
from mlos_bench.environment.base_service import Service
from mlos_bench.environment.base_environment import Environment
from mlos_bench.environment.app import AppEnv
from mlos_bench.environment.composite import CompositeEnv
from mlos_bench.environment import azure
__all__ = [
@ -17,5 +16,4 @@ __all__ = [
'Environment',
'AppEnv',
'CompositeEnv',
'azure',
]

Просмотреть файл

@ -1,15 +1,43 @@
"Application-specific benchmark environment."
"""
Application-specific benchmark environment.
"""
import json
import time
import logging
from mlos_bench.environment import Environment, Status
from mlos_bench.environment.status import Status
from mlos_bench.environment.base_environment import Environment
_LOG = logging.getLogger(__name__)
class AppEnv(Environment):
"Application-level benchmark environment."
"""
Application-level benchmark environment.
"""
_POLL_DELAY = 5 # Default polling interval in seconds.
def __init__(self, name, config, service=None):
"""
Create a new application environment with a given config.
Parameters
----------
name: str
Human-readable name of the environment.
config : dict
Free-format dictionary that contains the benchmark environment
configuration. Each config must have at least the "tunable_params"
and the "const_args" sections; the "cost" field can be omitted
and is 0 by default.
service: Service
An optional service object (e.g., providing methods to
deploy or reboot a VM, etc.).
"""
super().__init__(name, config, service)
self._poll_delay = self.config.get("pollDelay", AppEnv._POLL_DELAY)
def setup(self):
"""
@ -51,8 +79,41 @@ class AppEnv(Environment):
_LOG.debug("Benchmark:\n%s", json.dumps(params, indent=2))
# TODO: Configure the application and start the benchmark
(status, _output) = self._service.remote_exec(params)
return status in {Status.PENDING, Status.READY}
(status, output) = self._service.remote_exec(params)
self._result = (status, None)
if status not in {Status.PENDING, Status.READY}:
return False
self.config.update(output)
return True
def _check_results(self):
"""
Check if the results of the benchmark are available.
Returns
-------
(benchmark_status, benchmark_result) : (enum, float)
A pair of (benchmark status, benchmark result) values.
"""
_LOG.debug("Check results: %s", self)
(status, output) = self._service.get_remote_exec_results(self.config)
# TODO: extract the results from `output`.
_LOG.debug("Benchmark status: %s :: %s", status, output)
return (status, None)
def status(self):
"""
Get the status of the environment.
Returns
-------
status : mlos_bench.environment.Status
Current status of the benchmark environment.
"""
self._result = self._check_results()
return self._result[0]
def result(self):
"""
@ -69,6 +130,14 @@ class AppEnv(Environment):
benchmark_result is a floating point time of the benchmark in
seconds or None if the status is not COMPLETED.
"""
self._result = (Status.COMPLETED, 123.456)
_LOG.info("Benchmark result: %s", self._result)
output = None
while True:
(status, output) = self._check_results()
if status != Status.RUNNING:
break
_LOG.debug("Sleep: %d", self._poll_delay)
time.sleep(self._poll_delay)
_LOG.info("Benchmark result: %s", output)
self._result = (status, 123.456) # FIXME: use the actual data
return self._result

Просмотреть файл

@ -1,4 +1,6 @@
"OS-level benchmark environment on Azure."
"""
OS-level benchmark environment on Azure.
"""
import json
import logging
@ -9,18 +11,20 @@ _LOG = logging.getLogger(__name__)
class OSEnv(Environment):
"Boot-time environment for Azure VM."
"""
Boot-time environment for Azure VM.
"""
def setup(self):
"""
Check if the Azure VM is provisioned and can be booted.
Check if the Azure VM is up and running; boot it, if necessary.
Returns
-------
is_success : bool
True if operation is successful, false otherwise.
"""
_LOG.info("Set up")
_LOG.info("OS set up")
return True
def teardown(self):
@ -32,7 +36,7 @@ class OSEnv(Environment):
is_success : bool
True if operation is successful, false otherwise.
"""
_LOG.info("Tear down")
_LOG.info("OS tear down")
return True
def run(self, tunables):

Просмотреть файл

@ -1,42 +1,55 @@
"OS-level benchmark environment on Azure."
"""
A collection Service functions for managing VMs on Azure.
"""
import json
import logging
import requests
from mlos_bench.environment.status import Status
from mlos_bench.environment.base_svc import Service
from mlos_bench.environment import Service, Status
_LOG = logging.getLogger(__name__)
class AzureVMService(Service):
"Helper methods to manage VMs on Azure."
"""
Helper methods to manage VMs on Azure.
"""
# Azure REST API calls as described in
# Azure Resources Deployment REST API as described in
# https://docs.microsoft.com/en-us/rest/api/resources/deployments
_URL_DEPLOY = (
"https://management.azure.com"
"/subscriptions/{subscription}"
"/resourceGroups/{resource_group}"
"/providers/Microsoft.Resources"
"/deployments/{deployment_name}"
"?api-version=2022-05-01"
)
# Azure Compute REST API calls as described in
# https://docs.microsoft.com/en-us/rest/api/compute/virtual-machines
_URL_DEPLOY = "https://management.azure.com" \
"/subscriptions/%s" \
"/resourceGroups/%s" \
"/providers/Microsoft.Resources" \
"/deployments/%s" \
"?api-version=2022-05-01"
_URL_START = (
"https://management.azure.com"
"/subscriptions/{subscription}"
"/resourceGroups/{resource_group}"
"/providers/Microsoft.Compute"
"/virtualMachines/{vm_name}"
"/start"
"?api-version=2022-03-01"
)
_URL_START = "https://management.azure.com" \
"/subscriptions/%s" \
"/resourceGroups/%s" \
"/providers/Microsoft.Compute" \
"/virtualMachines/%s" \
"/start?api-version=2022-03-01"
_URL_RUN = "https://management.azure.com/" \
"/subscriptions/%s" \
"/resourceGroups/%s" \
"/providers/Microsoft.Compute" \
"/virtualMachines/%s" \
"/runCommand?api-version=2022-03-01"
_URL_REXEC_RUN = (
"https://management.azure.com"
"/subscriptions/{subscription}"
"/resourceGroups/{resource_group}"
"/providers/Microsoft.Compute"
"/virtualMachines/{vm_name}"
"/runCommand"
"?api-version=2022-03-01"
)
def __init__(self, config):
"""
@ -49,32 +62,34 @@ class AzureVMService(Service):
configuration.
"""
super().__init__(config)
self.register([self.vm_deploy, self.vm_start, self.remote_exec])
with open(config['template_path']) as fh_json:
self._template = json.load(fh_json)
# Register methods that we want to expose to the Environment objects.
self.register([
self.vm_provision,
self.vm_start,
self.remote_exec,
self.get_remote_exec_results
])
self._url_deploy = AzureVMService._URL_DEPLOY % (
config["subscription"],
config["resource_group"],
config["deployment_name"]
with open(config['deploy_template_path']) as fh_json:
self._deploy_template = json.load(fh_json)
self._url_deploy = AzureVMService._URL_DEPLOY.format(
subscription=config["subscription"],
resource_group=config["resource_group"],
deployment_name=config["deployment_name"]
)
self._headers = {
# Access token from `az account get-access-token`:
"Authorization": "Bearer " + config["accessToken"]
}
self._url_start = AzureVMService._URL_START % (
config["subscription"],
config["resource_group"],
config["vmName"]
self._url_start = AzureVMService._URL_START.format(
subscription=config["subscription"],
resource_group=config["resource_group"],
vm_name=config["vmName"]
)
self._url_run = AzureVMService._URL_RUN % (
config["subscription"],
config["resource_group"],
config["vmName"]
self._url_rexec_run = AzureVMService._URL_REXEC_RUN.format(
subscription=config["subscription"],
resource_group=config["resource_group"],
vm_name=config["vmName"]
)
self._headers = {
@ -83,15 +98,15 @@ class AzureVMService(Service):
}
@staticmethod
def _build_parameters(tunables):
def _build_arm_parameters(params):
"""
Merge tunables with other parameters and convert into
ARM Template format.
Merge input with config parameters and convert the results
into the ARM Template format.
"""
return {key: {"value": val} for (key, val) in tunables.items()}
return {key: {"value": val} for (key, val) in params.items()}
@staticmethod
def _extract_parameters(json_data):
def _extract_arm_parameters(json_data):
"""
Extract parameters from the ARM Template REST response JSON.
@ -102,17 +117,17 @@ class AzureVMService(Service):
"""
return {
key: val.get("value")
for (key, val) in json_data.get(
"properties", {}).get("parameters", {}).items()
for (key, val) in json_data.get("properties", {}).get("parameters", {}).items()
if val.get("value") is not None
}
def vm_deploy(self, tunables):
def vm_provision(self, params):
"""
Check if Azure VM is ready. (Re)provision it, if necessary.
Check if Azure VM is ready. Deploy a new VM, if necessary.
Parameters
----------
tunables : dict
params : dict
Flat dictionary of (key, value) pairs of tunable parameters.
VMEnv tunables are variable parameters that, together with the
VMEnv configuration, are sufficient to provision a VM.
@ -123,13 +138,13 @@ class AzureVMService(Service):
A pair of Status and result. The result is always {}.
Status is one of {PENDING, READY, FAILED}
"""
_LOG.info("Deploy VM: %s :: %s", self.config["vmName"], tunables)
_LOG.info("Deploy VM: %s :: %s", self.config["vmName"], params)
json_req = {
"properties": {
"mode": "Incremental",
"template": self._template,
"parameters": AzureVMService._build_parameters(tunables)
"template": self._deploy_template,
"parameters": AzureVMService._build_arm_parameters(params)
}
}
@ -142,14 +157,18 @@ class AzureVMService(Service):
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug("Response: %s\n%s", response,
json.dumps(response.json(), indent=2))
json.dumps(response.json(), indent=2)
if response.content else "")
else:
_LOG.info("Response: %s", response)
if response.status_code == 200:
params = AzureVMService._extract_parameters(response.json())
_LOG.info("Extracted parameters: %s", params)
return (Status.READY, params)
output = AzureVMService._extract_arm_parameters(response.json())
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug("Extracted parameters:\n%s",
json.dumps(output, indent=2))
# self.config.update(params)
return (Status.READY, output)
elif response.status_code == 201:
return (Status.PENDING, {})
else:
@ -157,13 +176,13 @@ class AzureVMService(Service):
# _LOG.error("Bad Request:\n%s", response.request.body)
return (Status.FAILED, {})
def vm_start(self, tunables):
def vm_start(self, params):
"""
Start the VM on Azure.
Parameters
----------
tunables : dict
params : dict
Flat dictionary of (key, value) pairs of tunable parameters.
Returns
@ -172,7 +191,7 @@ class AzureVMService(Service):
A pair of Status and result. The result is always {}.
Status is one of {PENDING, READY, FAILED}
"""
_LOG.info("Start VM: %s :: %s", self.config["vmName"], tunables)
_LOG.info("Start VM: %s :: %s", self.config["vmName"], params)
_LOG.debug("Request: POST %s", self._url_start)
response = requests.post(self._url_start, headers=self._headers)
@ -187,7 +206,7 @@ class AzureVMService(Service):
# _LOG.error("Bad Request:\n%s", response.request.body)
return (Status.FAILED, {})
def remote_exec(self, tunables):
def remote_exec(self, params):
"""
Run a command on Azure VM.
@ -195,7 +214,7 @@ class AzureVMService(Service):
----------
tunables : dict
Flat dictionary of (key, value) pairs of tunable parameters.
Must have "commandId", "parameters", or "script" keys.
Must have "commandId", "script", and "parameters" keys.
Returns
-------
@ -203,23 +222,26 @@ class AzureVMService(Service):
A pair of Status and result.
Status is one of {PENDING, READY, FAILED}
"""
_LOG.info("Run a command on VM: %s :: %s",
self.config["vmName"], params["commandId"])
_LOG.info("Run a command on VM: %s :: %s %s %s",
self.config["vmName"], tunables["commandId"],
tunables.get("parameters", []),
tunables.get("script", []))
json_req = {
"commandId": params["commandId"],
"script": params["script"],
"parameters": params.get("parameters", [])
}
json_req = tunables # Pass to REST request as-is.
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug("Request: POST %s\n%s",
self._url_run, json.dumps(json_req, indent=2))
self._url_rexec_run, json.dumps(json_req, indent=2))
response = requests.post(
self._url_run, headers=self._headers, json=json_req)
self._url_rexec_run, headers=self._headers, json=json_req)
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug("Response: %s\n%s", response,
json.dumps(response.json(), indent=2))
json.dumps(response.json(), indent=2)
if response.content else "")
else:
_LOG.info("Response: %s", response)
@ -227,8 +249,56 @@ class AzureVMService(Service):
# TODO: extract the results from JSON response
return (Status.READY, {})
elif response.status_code == 202:
return (Status.PENDING, {})
return (Status.PENDING, {
"asyncResultsUrl": response.headers.get("Azure-AsyncOperation")
})
else:
_LOG.error("Response: %s :: %s", response, response.text)
# _LOG.error("Bad Request:\n%s", response.request.body)
return (Status.FAILED, {})
def get_remote_exec_results(self, params):
"""
Get the results of the asynchronously running command.
Parameters
----------
params : dict
Flat dictionary of (key, value) pairs of tunable parameters.
Must have the "asyncResultsUrl" key to get the results.
If the key is not present, return Status.PENDING.
Returns
-------
result : (Status, dict)
A pair of Status and result.
Status is one of {PENDING, RUNNING, READY, FAILED}
"""
_LOG.info("Check the results on VM: %s :: %s",
self.config["vmName"], params.get("commandId", ""))
url = params.get("asyncResultsUrl")
if url is None:
return (Status.PENDING, {})
response = requests.get(url, headers=self._headers)
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug("Response: %s\n%s", response,
json.dumps(response.json(), indent=2)
if response.content else "")
else:
_LOG.info("Response: %s", response)
if response.status_code == 200:
# TODO: extract the results from JSON response
output = response.json()
status = output.get("status")
if status == "InProgress":
return (Status.RUNNING, {})
elif status == "Succeeded":
return (Status.READY, output.get("properties", {}).get("output", {}))
_LOG.error("Response: %s :: %s", response, response.text)
# _LOG.error("Bad Request:\n%s", response.request.body)
return (Status.FAILED, {})

Просмотреть файл

@ -1,4 +1,6 @@
"VM-level benchmark environment on Azure."
"""
VM-level benchmark environment on Azure.
"""
import json
import logging
@ -9,7 +11,9 @@ _LOG = logging.getLogger(__name__)
class VMEnv(Environment):
"Azure VM environment."
"""
Azure VM environment.
"""
def setup(self):
"""
@ -20,7 +24,7 @@ class VMEnv(Environment):
is_success : bool
True if operation is successful, false otherwise.
"""
_LOG.info("Set up")
_LOG.info("VM set up")
return True
def teardown(self):
@ -32,7 +36,7 @@ class VMEnv(Environment):
is_success : bool
True if operation is successful, false otherwise.
"""
_LOG.info("Tear down")
_LOG.info("VM tear down")
return True
def run(self, tunables):
@ -57,5 +61,5 @@ class VMEnv(Environment):
if _LOG.isEnabledFor(logging.DEBUG):
_LOG.debug("Deploy VM:\n%s", json.dumps(params, indent=2))
(status, _output) = self._service.vm_deploy(params)
(status, _output) = self._service.vm_provision(params)
return status in {Status.PENDING, Status.READY}

Просмотреть файл

@ -1,17 +1,21 @@
"A hierarchy of benchmark environments."
"""
A hierarchy of benchmark environments.
"""
import abc
import json
import logging
import importlib
from mlos_bench.environment import Status
from mlos_bench.environment.status import Status
_LOG = logging.getLogger(__name__)
class Environment(metaclass=abc.ABCMeta):
"An abstract base of all benchmark environments."
"""
An abstract base of all benchmark environments.
"""
@staticmethod
def from_config(config, service=None):
@ -117,7 +121,10 @@ class Environment(metaclass=abc.ABCMeta):
return self.name
def __repr__(self):
return "Env: %s :: '%s'" % (self.__class__, self.name)
return "Env: {class_name} :: '{env_name}'".format(
class_name=self.__class__,
env_name=self.name
)
def _parse_tunables(self, tunables, cost=0):
"Augment tunables with the cost."

Просмотреть файл

@ -1,4 +1,6 @@
"Base class for the service mix-ins."
"""
Base class for the service mix-ins.
"""
import json
import logging
@ -8,7 +10,9 @@ _LOG = logging.getLogger(__name__)
class Service:
"An abstract base of all environment services."
"""
An abstract base of all environment services.
"""
@staticmethod
def from_config(config):

Просмотреть файл

@ -1,14 +1,19 @@
"Composite benchmark environment."
"""
Composite benchmark environment.
"""
import logging
from mlos_bench.environment import Environment, Service
from mlos_bench.environment.base_service import Service
from mlos_bench.environment.base_environment import Environment
_LOG = logging.getLogger(__name__)
class CompositeEnv(Environment):
"Composite benchmark environment."
"""
Composite benchmark environment.
"""
def __init__(self, name, config, service=None):
"""

Просмотреть файл

@ -6,15 +6,21 @@ import enum
class Status(enum.Enum):
"Enum for the status of the benchmark."
"""
Enum for the status of the benchmark.
"""
UNKNOWN = 0
PENDING = 1
READY = 2
RUNNING = 3
COMPLETED = 4
SUCCEEDED = 4
CANCELED = 5
FAILED = 6
TIMED_OUT = 7
@staticmethod
def is_good(status):
"Check if the status is not failed or canceled."
return status not in {Status.CANCELED, Status.FAILED}
"""
Check if the status is not failed or canceled.
"""
return status not in {Status.CANCELED, Status.FAILED, Status.TIMED_OUT}

Просмотреть файл

@ -11,8 +11,9 @@ from mlos_bench.environment import Environment
def optimize(config):
"Main optimization loop."
"""
Main optimization loop.
"""
env = Environment.from_config(config)
opt = Optimizer(env.tunable_params())

Просмотреть файл

@ -1,5 +1,5 @@
"""
OS Autotune main optimization loop.
Mock optimizer for OS Autotune.
"""
import logging
@ -8,7 +8,9 @@ _LOG = logging.getLogger(__name__)
class Optimizer:
"Toy random optimizer to test out the Environment API."
"""
Mock optimizer to test the Environment API.
"""
_MAX_ITER = 1

Просмотреть файл

@ -4,14 +4,14 @@ Setup instructions for the mlos_bench package.
from setuptools import setup, find_packages
version='0.0.4'
_VERSION = '0.0.4'
setup(
name='mlos-bench',
version=version,
version=_VERSION,
packages=find_packages(),
install_requires=[
'mlos-core=='+version,
'mlos-core==' + _VERSION,
],
# Transitive extra_requires from mlos-core.
extras_require={