* start refactor

* continue refactor for cluster and job functions

* fix imports

* fixes

* fixes

* refactor integration test secrets management

* fix cluster create, add new test

* add tests for new sdk api and fix bugs

* fix naming and bugs

* update job operations naming, bug fixes

* fix cluster tests

* fix joboperations and tests

* update cli and fix some bugs

* start fixes

* fix pylint errors, bugs

* add deprecated warning checks, rename tests

* add docstrings for baseoperations

* add docstrings

* docstrings, add back compat for coreclient, fix init for spark client

* whitespace

* docstrings, whitespace

* docstrings, fixes

* docstrings, fixes

* fix the sdk documentation, bugs

* fix method call

* pool_id->id

* rename ids

* cluster_id->id

* cluster_id->id

* add todo

* fixes

* add some todos

* rename pool to cluster, add todo for nodes params

* add todos for nodes param removal

* update functions names

* remove deprecated fucntion calls

* update docs and docstrings

* update docstrings

* get rid of TODOs, fix docstrings

* remove unused setting

* inheritance -> composition

* fix models bugs

* fix create_user bug

* update sdk_example.py

* fix create user argument issue

* update sdk_example.py

* update doc

* use Software model instead of string

* add job wait flag, add cluster application wait functions

* add docs for wait, update tests

* fix bug

* add clientrequesterror catch to fix tests
This commit is contained in:
Jacob Freck 2018-08-03 15:20:05 -07:00 коммит произвёл GitHub
Родитель c9fd8bbfeb
Коммит b18eb695a1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
111 изменённых файлов: 3707 добавлений и 838 удалений

Просмотреть файл

@ -3,5 +3,5 @@ based_on_style=pep8
spaces_before_comment=4 spaces_before_comment=4
split_before_logical_operator=True split_before_logical_operator=True
indent_width=4 indent_width=4
column_limit=140 column_limit=120
split_arguments_when_comma_terminated=True split_arguments_when_comma_terminated=True

2
.vscode/settings.json поставляемый
Просмотреть файл

@ -14,5 +14,5 @@
"python.formatting.provider": "yapf", "python.formatting.provider": "yapf",
"python.venvPath": "${workspaceFolder}/.venv/", "python.venvPath": "${workspaceFolder}/.venv/",
"python.pythonPath": "${workspaceFolder}/.venv/Scripts/python.exe", "python.pythonPath": "${workspaceFolder}/.venv/Scripts/python.exe",
"python.unitTest.pyTestEnabled": true "python.unitTest.pyTestEnabled": true,
} }

1
aztk/client/__init__.py Normal file
Просмотреть файл

@ -0,0 +1 @@
from .client import CoreClient

Просмотреть файл

@ -0,0 +1 @@
from .base_operations import BaseOperations

Просмотреть файл

@ -0,0 +1,223 @@
from aztk import models
from aztk.internal import cluster_data
from aztk.utils import ssh as ssh_lib
from .helpers import (create_user_on_cluster, create_user_on_node, delete_user_on_cluster, delete_user_on_node,
generate_user_on_cluster, generate_user_on_node, get_application_log, get_remote_login_settings,
node_run, run, ssh_into_node)
class BaseOperations:
"""Base operations that all other operations have as an attribute
Attributes:
batch_client (:obj:`azure.batch.batch_service_client.BatchServiceClient`): Client used to interact with the
Azure Batch service.
blob_client (:obj:`azure.storage.blob.BlockBlobService`): Client used to interact with the Azure Storage
Blob service.
secrets_configuration (:obj:`aztk.models.SecretsConfiguration`): Model that holds AZTK secrets used to authenticate
with Azure and the clusters.
"""
def __init__(self, context):
self.batch_client = context['batch_client']
self.blob_client = context['blob_client']
self.secrets_configuration = context['secrets_configuration']
def get_cluster_config(self, id: str) -> models.ClusterConfiguration:
"""Open an ssh tunnel to a node
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node to open the ssh tunnel to
username (:obj:`str`): the username to authenticate the ssh session
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key
or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications.
The defined ports will be forwarded to the client.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
Returns:
:obj:`aztk.models.ClusterConfiguration`: Object representing the cluster's configuration
"""
return self.get_cluster_data(id).read_cluster_config()
def get_cluster_data(self, id: str) -> cluster_data.ClusterData:
"""Gets the ClusterData object to manage data related to the given cluster
Args:
id (:obj:`str`): the id of the cluster to get
Returns:
:obj:`aztk.models.ClusterData`: Object used to manage the data and storage functions for a cluster
"""
return cluster_data.ClusterData(self.blob_client, id)
def ssh_into_node(self, id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
"""Open an ssh tunnel to a node
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node to open the ssh tunnel to
username (:obj:`str`): the username to authenticate the ssh session
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
port_forward_list (:obj:`List[PortForwardingSpecification`, optional): list of PortForwardingSpecifications.
The defined ports will be forwarded to the client.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
Returns:
:obj:`None`
"""
ssh_into_node.ssh_into_node(self, id, node_id, username, ssh_key, password, port_forward_list, internal)
def create_user_on_node(self, id, node_id, username, ssh_key=None, password=None):
"""Create a user on a node
Args:
id (:obj:`str`): id of the cluster to create the user on.
node_id (:obj:`str`): id of the node in the cluster to create the user on.
username (:obj:`str`): name of the user to create.
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password.
password (:obj:`str`, optional): password for the user, must use ssh_key or password.
Returns:
:obj:`None`
"""
return create_user_on_node.create_user_on_node(self, id, node_id, username, ssh_key, password)
#TODO: remove nodes as param
def create_user_on_cluster(self, id, nodes, username, ssh_pub_key=None, password=None):
"""Create a user on every node in the cluster
Args:
username (:obj:`str`): name of the user to create.
id (:obj:`str`): id of the cluster to create the user on.
nodes (:obj:`List[ComputeNode]`): list of nodes to create the user on
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
Returns:
:obj:`None`
"""
return create_user_on_cluster.create_user_on_cluster(self, id, nodes, username, ssh_pub_key, password)
def generate_user_on_node(self, id, node_id):
"""Create a user with an autogenerated username and ssh_key on the given node.
Args:
id (:obj:`str`): the id of the cluster to generate the user on.
node_id (:obj:`str`): the id of the node in the cluster to generate the user on.
Returns:
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`)
"""
return generate_user_on_node.generate_user_on_node(self, id, node_id)
#TODO: remove nodes as param
def generate_user_on_cluster(self, id, nodes):
"""Create a user with an autogenerated username and ssh_key on the cluster
Args:
id (:obj:`str`): the id of the cluster to generate the user on.
node_id (:obj:`str`): the id of the node in the cluster to generate the user on.
Returns:
:obj:`tuple`: A tuple of the form (username: :obj:`str`, ssh_key: :obj:`Cryptodome.PublicKey.RSA`)
"""
return generate_user_on_cluster.generate_user_on_cluster(self, id, nodes)
def delete_user_on_node(self, id: str, node_id: str, username: str) -> str:
"""Delete a user on a node
Args:
id (:obj:`str`): the id of the cluster to delete the user on.
node_id (:obj:`str`): the id of the node in the cluster to delete the user on.
username (:obj:`str`): the name of the user to delete.
Returns:
:obj:`None`
"""
return delete_user_on_node.delete_user(self, id, node_id, username)
#TODO: remove nodes as param
def delete_user_on_cluster(self, username, id, nodes):
"""Delete a user on every node in the cluster
Args:
id (:obj:`str`): the id of the cluster to delete the user on.
node_id (:obj:`str`): the id of the node in the cluster to delete the user on.
username (:obj:`str`): the name of the user to delete.
Returns:
:obj:`None`
"""
return delete_user_on_cluster.delete_user_on_cluster(self, username, id, nodes)
def node_run(self, id, node_id, command, internal, container_name=None, timeout=None):
"""Run a bash command on the given node
Args:
id (:obj:`str`): the id of the cluster to run the command on.
node_id (:obj:`str`): the id of the node in the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`aztk.models.NodeOutput`: object containing the output of the run command
"""
return node_run.node_run(self, id, node_id, command, internal, container_name, timeout)
def get_remote_login_settings(self, id: str, node_id: str):
"""Get the remote login information for a node in a cluster
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node in the cluster
Returns:
:obj:`aztk.models.RemoteLogin`: Object that contains the ip address and port combination to login to a node
"""
return get_remote_login_settings.get_remote_login_settings(self, id, node_id)
def run(self, id, command, internal, container_name=None, timeout=None):
"""Run a bash command on every node in the cluster
Args:
id (:obj:`str`): the id of the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if true, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`List[azkt.models.NodeOutput]`: list of NodeOutput objects containing the output of the run command
"""
return run.cluster_run(self, id, command, internal, container_name, timeout)
def get_application_log(self, id: str, application_name: str, tail=False, current_bytes: int = 0):
"""Get the log for a running or completed application
Args:
id (:obj:`str`): the id of the cluster to run the command on.
application_name (:obj:`str`): str
tail (:obj:`bool`, optional): If True, get the remaining bytes after current_bytes. Otherwise, the whole log will be retrieved.
Only use this if streaming the log as it is being written. Defaults to False.
current_bytes (:obj:`int`): Specifies the last seen byte, so only the bytes after current_bytes are retrieved.
Only useful is streaming the log as it is being written. Only used if tail is True.
Returns:
:obj:`aztk.models.ApplicationLog`: a model representing the output of the application.
"""
return get_application_log.get_application_log(self, id, application_name, tail, current_bytes)

Просмотреть файл

Просмотреть файл

@ -0,0 +1,11 @@
import concurrent.futures
#TODO: remove nodes param
def create_user_on_cluster(base_operations, id, nodes, username, ssh_pub_key=None, password=None):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(base_operations.create_user_on_node, id, node.id, username, ssh_pub_key, password): node
for node in nodes
}
concurrent.futures.wait(futures)

Просмотреть файл

@ -0,0 +1,42 @@
from datetime import datetime, timedelta, timezone
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from aztk import models
from aztk.utils import get_ssh_key
def __create_user(self, id: str, node_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
"""
Create a pool user
:param pool: the pool to add the user to
:param node: the node to add the user to
:param username: username of the user to add
:param password: password of the user to add
:param ssh_key: ssh_key of the user to add
"""
# Create new ssh user for the given node
self.batch_client.compute_node.add_user(
id,
node_id,
batch_models.ComputeNodeUser(
name=username,
is_admin=True,
password=password,
ssh_public_key=get_ssh_key.get_user_public_key(ssh_key, self.secrets_configuration),
expiry_time=datetime.now(timezone.utc) + timedelta(days=365),
),
)
def create_user_on_node(base_client, id, node_id, username, ssh_key=None, password=None):
try:
__create_user(
base_client, id=id, node_id=node_id, username=username, ssh_key=ssh_key, password=password)
except batch_error.BatchErrorException as error:
try:
base_client.delete_user_on_node(id, node_id, username)
base_client.create_user_on_node(id=id, node_id=node_id, username=username, ssh_key=ssh_key)
except batch_error.BatchErrorException as error:
raise error

Просмотреть файл

@ -0,0 +1,7 @@
import concurrent.futures
#TODO: remove nodes param
def delete_user_on_cluster(base_client, id, nodes, username):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(base_client.delete_user_on_node, id, node.id, username) for node in nodes]
concurrent.futures.wait(futures)

Просмотреть файл

@ -0,0 +1,9 @@
def delete_user(self, pool_id: str, node_id: str, username: str) -> str:
"""
Create a pool user
:param pool: the pool to add the user to
:param node: the node to add the user to
:param username: username of the user to add
"""
# Delete a user on the given node
self.batch_client.compute_node.delete_user(pool_id, node_id, username)

Просмотреть файл

@ -0,0 +1,20 @@
import concurrent.futures
from Cryptodome.PublicKey import RSA
from aztk.utils import secure_utils
#TODO: remove nodes param
def generate_user_on_cluster(base_operations, id, nodes):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8')
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(base_operations.create_user_on_node, id, node.id, generated_username, ssh_pub_key): node
for node in nodes
}
concurrent.futures.wait(futures)
return generated_username, ssh_key

Просмотреть файл

@ -0,0 +1,11 @@
from Cryptodome.PublicKey import RSA
from aztk.utils import secure_utils
def generate_user_on_node(base_client, pool_id, node_id):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8')
base_client.create_user_on_node(pool_id, node_id, generated_username, ssh_pub_key)
return generated_username, ssh_key

Просмотреть файл

@ -0,0 +1,114 @@
import time
import azure
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk import models
from aztk.utils import constants, helpers
output_file = constants.TASK_WORKING_DIR + \
"/" + constants.SPARK_SUBMIT_LOGS_FILE
def __check_task_node_exist(batch_client, cluster_id: str, task: batch_models.CloudTask) -> bool:
try:
batch_client.compute_node.get(cluster_id, task.node_info.node_id)
return True
except batch_error.BatchErrorException:
return False
def __wait_for_app_to_be_running(batch_client, cluster_id: str, application_name: str) -> batch_models.CloudTask:
"""
Wait for the batch task to leave the waiting state into running(or completed if it was fast enough)
"""
while True:
task = batch_client.task.get(cluster_id, application_name)
if task.state is batch_models.TaskState.active or task.state is batch_models.TaskState.preparing:
# TODO: log
time.sleep(5)
else:
return task
def __get_output_file_properties(batch_client, cluster_id: str, application_name: str):
while True:
try:
file = helpers.get_file_properties(cluster_id, application_name, output_file, batch_client)
return file
except batch_error.BatchErrorException as e:
if e.response.status_code == 404:
# TODO: log
time.sleep(5)
continue
else:
raise e
def get_log_from_storage(blob_client, container_name, application_name, task):
try:
blob = blob_client.get_blob_to_text(container_name, application_name + '/' + constants.SPARK_SUBMIT_LOGS_FILE)
except azure.common.AzureMissingResourceHttpError:
raise error.AztkError("Logs not found in your storage account. They were either deleted or never existed.")
return models.ApplicationLog(
name=application_name,
cluster_id=container_name,
application_state=task.state._value_,
log=blob.content,
total_bytes=blob.properties.content_length,
exit_code=task.execution_info.exit_code)
def get_log(batch_client, blob_client, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
job_id = cluster_id
task_id = application_name
task = __wait_for_app_to_be_running(batch_client, cluster_id, application_name)
if not __check_task_node_exist(batch_client, cluster_id, task):
return get_log_from_storage(blob_client, cluster_id, application_name, task)
file = __get_output_file_properties(batch_client, cluster_id, application_name)
target_bytes = file.content_length
if target_bytes != current_bytes:
ocp_range = None
if tail:
ocp_range = "bytes={0}-{1}".format(current_bytes, target_bytes - 1)
stream = batch_client.file.get_from_task(
job_id, task_id, output_file, batch_models.FileGetFromTaskOptions(ocp_range=ocp_range))
content = helpers.read_stream_as_string(stream)
return models.ApplicationLog(
name=application_name,
cluster_id=cluster_id,
application_state=task.state._value_,
log=content,
total_bytes=target_bytes,
exit_code=task.execution_info.exit_code)
else:
return models.ApplicationLog(
name=application_name,
cluster_id=cluster_id,
application_state=task.state._value_,
log='',
total_bytes=target_bytes,
exit_code=task.execution_info.exit_code)
def get_application_log(base_operations,
cluster_id: str,
application_name: str,
tail=False,
current_bytes: int = 0):
try:
return get_log(base_operations.batch_client, base_operations.blob_client, cluster_id,
application_name, tail, current_bytes)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,22 @@
import azure.batch.models.batch_error as batch_error
from aztk import error, models
from aztk.utils import helpers
def _get_remote_login_settings(base_client, pool_id: str, node_id: str):
"""
Get the remote_login_settings for node
:param pool_id
:param node_id
:returns aztk.models.RemoteLogin
"""
result = base_client.batch_client.compute_node.get_remote_login_settings(pool_id, node_id)
return models.RemoteLogin(ip_address=result.remote_login_ip_address, port=str(result.remote_login_port))
def get_remote_login_settings(base_client, cluster_id: str, node_id: str):
try:
return _get_remote_login_settings(base_client, cluster_id, node_id)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,30 @@
import aztk.error as error
import aztk.models as models
from aztk.utils import ssh as ssh_lib
def node_run(base_client, cluster_id, node_id, command, internal, container_name=None, timeout=None):
cluster = base_client.get(cluster_id)
pool, nodes = cluster.pool, list(cluster.nodes)
try:
node = next(node for node in nodes if node.id == node_id)
except StopIteration:
raise error.AztkError("Node with id {} not found".format(node_id))
if internal:
node_rls = models.RemoteLogin(ip_address=node.ip_address, port="22")
else:
node_rls = base_client.get_remote_login_settings(pool.id, node.id)
try:
generated_username, ssh_key = base_client.generate_user_on_node(pool.id, node.id)
output = ssh_lib.node_exec_command(
node.id,
command,
generated_username,
node_rls.ip_address,
node_rls.port,
ssh_key=ssh_key.exportKey().decode('utf-8'),
container_name=container_name,
timeout=timeout)
return output
finally:
base_client.delete_user_on_node(cluster_id, node.id, generated_username)

Просмотреть файл

@ -0,0 +1,36 @@
import asyncio
from azure.batch.models import batch_error
import aztk.models as models
from aztk import error
from aztk.utils import ssh as ssh_lib
from aztk.utils import helpers
def cluster_run(base_operations, cluster_id, command, internal, container_name=None, timeout=None):
cluster = base_operations.get(cluster_id)
pool, nodes = cluster.pool, list(cluster.nodes)
if internal:
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
else:
cluster_nodes = [(node, base_operations.get_remote_login_settings(pool.id, node.id)) for node in nodes]
try:
generated_username, ssh_key = base_operations.generate_user_on_cluster(pool.id, nodes)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
try:
output = asyncio.get_event_loop().run_until_complete(
ssh_lib.clus_exec_command(
command,
generated_username,
cluster_nodes,
ssh_key=ssh_key.exportKey().decode('utf-8'),
container_name=container_name,
timeout=timeout))
return output
except OSError as exc:
raise exc
finally:
base_operations.delete_user_on_cluster(pool.id, nodes, generated_username)

Просмотреть файл

@ -0,0 +1,20 @@
import aztk.models as models
from aztk.utils import ssh as ssh_lib
def ssh_into_node(base_client, pool_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
if internal:
result = base_client.batch_client.compute_node.get(pool_id=pool_id, node_id=node_id)
rls = models.RemoteLogin(ip_address=result.ip_address, port="22")
else:
result = base_client.batch_client.compute_node.get_remote_login_settings(pool_id, node_id)
rls = models.RemoteLogin(ip_address=result.remote_login_ip_address, port=str(result.remote_login_port))
ssh_lib.node_ssh(
username=username,
hostname=rls.ip_address,
port=rls.port,
ssh_key=ssh_key,
password=password,
port_forward_list=port_forward_list,
)

Просмотреть файл

@ -13,21 +13,38 @@ import aztk.utils.constants as constants
import aztk.utils.get_ssh_key as get_ssh_key import aztk.utils.get_ssh_key as get_ssh_key
import aztk.utils.helpers as helpers import aztk.utils.helpers as helpers
import aztk.utils.ssh as ssh_lib import aztk.utils.ssh as ssh_lib
from aztk.client.cluster import CoreClusterOperations
from aztk.client.job import CoreJobOperations
from aztk.internal import cluster_data from aztk.internal import cluster_data
from aztk.utils import secure_utils from aztk.utils import deprecated, secure_utils
class Client: class CoreClient:
def __init__(self, secrets_config: models.SecretsConfiguration): """The base AZTK client that all other clients inherit from.
self.secrets_config = secrets_config
azure_api.validate_secrets(secrets_config) **This client should not be used directly. Only software specific clients
self.batch_client = azure_api.make_batch_client(secrets_config) should be used.**
self.blob_client = azure_api.make_blob_client(secrets_config)
"""
def _get_context(self, secrets_configuration: models.SecretsConfiguration):
self.secrets_configuration = secrets_configuration
azure_api.validate_secrets(secrets_configuration)
self.batch_client = azure_api.make_batch_client(secrets_configuration)
self.blob_client = azure_api.make_blob_client(secrets_configuration)
context = {
'batch_client': self.batch_client,
'blob_client': self.blob_client,
'secrets_configuration': self.secrets_configuration,
}
return context
# ALL THE FOLLOWING METHODS ARE DEPRECATED AND WILL BE REMOVED IN 0.10.0
@deprecated("0.10.0")
def get_cluster_config(self, cluster_id: str) -> models.ClusterConfiguration: def get_cluster_config(self, cluster_id: str) -> models.ClusterConfiguration:
return self._get_cluster_data(cluster_id).read_cluster_config() return self._get_cluster_data(cluster_id).read_cluster_config()
@deprecated("0.10.0")
def _get_cluster_data(self, cluster_id: str) -> cluster_data.ClusterData: def _get_cluster_data(self, cluster_id: str) -> cluster_data.ClusterData:
""" """
Returns ClusterData object to manage data related to the given cluster id Returns ClusterData object to manage data related to the given cluster id
@ -38,6 +55,7 @@ class Client:
General Batch Operations General Batch Operations
''' '''
@deprecated("0.10.0")
def __delete_pool_and_job(self, pool_id: str, keep_logs: bool = False): def __delete_pool_and_job(self, pool_id: str, keep_logs: bool = False):
""" """
Delete a pool and it's associated job Delete a pool and it's associated job
@ -67,6 +85,7 @@ class Client:
return job_exists or pool_exists return job_exists or pool_exists
@deprecated("0.10.0")
def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, VmImageModel): def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, VmImageModel):
""" """
Create a pool and job Create a pool and job
@ -128,6 +147,7 @@ class Client:
return helpers.get_cluster(cluster_conf.cluster_id, self.batch_client) return helpers.get_cluster(cluster_conf.cluster_id, self.batch_client)
@deprecated("0.10.0")
def __get_pool_details(self, cluster_id: str): def __get_pool_details(self, cluster_id: str):
""" """
Print the information for the given cluster Print the information for the given cluster
@ -138,6 +158,7 @@ class Client:
nodes = self.batch_client.compute_node.list(pool_id=cluster_id) nodes = self.batch_client.compute_node.list(pool_id=cluster_id)
return pool, nodes return pool, nodes
@deprecated("0.10.0")
def __list_clusters(self, software_metadata_key): def __list_clusters(self, software_metadata_key):
""" """
List all the cluster on your account. List all the cluster on your account.
@ -155,6 +176,7 @@ class Client:
aztk_pools.append(pool) aztk_pools.append(pool)
return aztk_pools return aztk_pools
@deprecated("0.10.0")
def __create_user(self, pool_id: str, node_id: str, username: str, password: str = None, ssh_key: str = None) -> str: def __create_user(self, pool_id: str, node_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
""" """
Create a pool user Create a pool user
@ -173,9 +195,10 @@ class Client:
is_admin=True, is_admin=True,
password=password, password=password,
ssh_public_key=get_ssh_key.get_user_public_key( ssh_public_key=get_ssh_key.get_user_public_key(
ssh_key, self.secrets_config), ssh_key, self.secrets_configuration),
expiry_time=datetime.now(timezone.utc) + timedelta(days=365))) expiry_time=datetime.now(timezone.utc) + timedelta(days=365)))
@deprecated("0.10.0")
def __delete_user(self, pool_id: str, node_id: str, username: str) -> str: def __delete_user(self, pool_id: str, node_id: str, username: str) -> str:
""" """
Create a pool user Create a pool user
@ -186,6 +209,7 @@ class Client:
# Delete a user on the given node # Delete a user on the given node
self.batch_client.compute_node.delete_user(pool_id, node_id, username) self.batch_client.compute_node.delete_user(pool_id, node_id, username)
@deprecated("0.10.0")
def __get_remote_login_settings(self, pool_id: str, node_id: str): def __get_remote_login_settings(self, pool_id: str, node_id: str):
""" """
Get the remote_login_settings for node Get the remote_login_settings for node
@ -197,6 +221,7 @@ class Client:
pool_id, node_id) pool_id, node_id)
return models.RemoteLogin(ip_address=result.remote_login_ip_address, port=str(result.remote_login_port)) return models.RemoteLogin(ip_address=result.remote_login_ip_address, port=str(result.remote_login_port))
@deprecated("0.10.0")
def __create_user_on_node(self, username, pool_id, node_id, ssh_key=None, password=None): def __create_user_on_node(self, username, pool_id, node_id, ssh_key=None, password=None):
try: try:
self.__create_user(pool_id=pool_id, node_id=node_id, username=username, ssh_key=ssh_key, password=password) self.__create_user(pool_id=pool_id, node_id=node_id, username=username, ssh_key=ssh_key, password=password)
@ -207,6 +232,7 @@ class Client:
except batch_error.BatchErrorException as error: except batch_error.BatchErrorException as error:
raise error raise error
@deprecated("0.10.0")
def __generate_user_on_node(self, pool_id, node_id): def __generate_user_on_node(self, pool_id, node_id):
generated_username = secure_utils.generate_random_string() generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048) ssh_key = RSA.generate(2048)
@ -214,6 +240,7 @@ class Client:
self.__create_user_on_node(generated_username, pool_id, node_id, ssh_pub_key) self.__create_user_on_node(generated_username, pool_id, node_id, ssh_pub_key)
return generated_username, ssh_key return generated_username, ssh_key
@deprecated("0.10.0")
def __generate_user_on_pool(self, pool_id, nodes): def __generate_user_on_pool(self, pool_id, nodes):
generated_username = secure_utils.generate_random_string() generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048) ssh_key = RSA.generate(2048)
@ -228,6 +255,7 @@ class Client:
return generated_username, ssh_key return generated_username, ssh_key
@deprecated("0.10.0")
def __create_user_on_pool(self, username, pool_id, nodes, ssh_pub_key=None, password=None): def __create_user_on_pool(self, username, pool_id, nodes, ssh_pub_key=None, password=None):
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(self.__create_user_on_node, futures = {executor.submit(self.__create_user_on_node,
@ -238,11 +266,13 @@ class Client:
password): node for node in nodes} password): node for node in nodes}
concurrent.futures.wait(futures) concurrent.futures.wait(futures)
@deprecated("0.10.0")
def __delete_user_on_pool(self, username, pool_id, nodes): def __delete_user_on_pool(self, username, pool_id, nodes):
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.__delete_user, pool_id, node.id, username) for node in nodes] futures = [executor.submit(self.__delete_user, pool_id, node.id, username) for node in nodes]
concurrent.futures.wait(futures) concurrent.futures.wait(futures)
@deprecated("0.10.0")
def __node_run(self, cluster_id, node_id, command, internal, container_name=None, timeout=None): def __node_run(self, cluster_id, node_id, command, internal, container_name=None, timeout=None):
pool, nodes = self.__get_pool_details(cluster_id) pool, nodes = self.__get_pool_details(cluster_id)
try: try:
@ -271,6 +301,7 @@ class Client:
finally: finally:
self.__delete_user(cluster_id, node.id, generated_username) self.__delete_user(cluster_id, node.id, generated_username)
@deprecated("0.10.0")
def __cluster_run(self, cluster_id, command, internal, container_name=None, timeout=None): def __cluster_run(self, cluster_id, command, internal, container_name=None, timeout=None):
pool, nodes = self.__get_pool_details(cluster_id) pool, nodes = self.__get_pool_details(cluster_id)
nodes = list(nodes) nodes = list(nodes)
@ -297,6 +328,7 @@ class Client:
finally: finally:
self.__delete_user_on_pool(generated_username, pool.id, nodes) self.__delete_user_on_pool(generated_username, pool.id, nodes)
@deprecated("0.10.0")
def __cluster_copy(self, cluster_id, source_path, destination_path=None, container_name=None, internal=False, get=False, timeout=None): def __cluster_copy(self, cluster_id, source_path, destination_path=None, container_name=None, internal=False, get=False, timeout=None):
pool, nodes = self.__get_pool_details(cluster_id) pool, nodes = self.__get_pool_details(cluster_id)
nodes = list(nodes) nodes = list(nodes)
@ -325,6 +357,7 @@ class Client:
finally: finally:
self.__delete_user_on_pool(generated_username, pool.id, nodes) self.__delete_user_on_pool(generated_username, pool.id, nodes)
@deprecated("0.10.0")
def __ssh_into_node(self, pool_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False): def __ssh_into_node(self, pool_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
if internal: if internal:
result = self.batch_client.compute_node.get(pool_id=pool_id, node_id=node_id) result = self.batch_client.compute_node.get(pool_id=pool_id, node_id=node_id)
@ -342,6 +375,7 @@ class Client:
port_forward_list=port_forward_list, port_forward_list=port_forward_list,
) )
@deprecated("0.10.0")
def __submit_job(self, def __submit_job(self,
job_configuration, job_configuration,
start_task, start_task,
@ -429,44 +463,3 @@ class Client:
self.batch_client.job_schedule.add(setup) self.batch_client.job_schedule.add(setup)
return self.batch_client.job_schedule.get(job_schedule_id=job_configuration.id) return self.batch_client.job_schedule.get(job_schedule_id=job_configuration.id)
'''
Define Public Interface
'''
def create_cluster(self, cluster_conf, wait: bool = False):
raise NotImplementedError()
def create_clusters_in_parallel(self, cluster_confs):
raise NotImplementedError()
def delete_cluster(self, cluster_id: str):
raise NotImplementedError()
def get_cluster(self, cluster_id: str):
raise NotImplementedError()
def list_clusters(self):
raise NotImplementedError()
def wait_until_cluster_is_ready(self, cluster_id):
raise NotImplementedError()
def create_user(self, cluster_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
raise NotImplementedError()
def get_remote_login_settings(self, cluster_id, node_id):
raise NotImplementedError()
def cluster_run(self, cluster_id, command):
raise NotImplementedError()
def cluster_copy(self, cluster_id, source_path, destination_path):
raise NotImplementedError()
def cluster_download(self, cluster_id, source_path, destination_path):
raise NotImplementedError()
def submit_job(self, job):
raise NotImplementedError()

Просмотреть файл

@ -0,0 +1 @@
from .operations import CoreClusterOperations

Просмотреть файл

Просмотреть файл

@ -0,0 +1,41 @@
import asyncio
import azure.batch.models.batch_error as batch_error
import aztk.models as models
from aztk import error
from aztk.utils import ssh as ssh_lib
from aztk.utils import helpers
def cluster_copy(cluster_operations, cluster_id, source_path, destination_path=None, container_name=None, internal=False, get=False, timeout=None):
cluster = cluster_operations.get(cluster_id)
pool, nodes = cluster.pool, list(cluster.nodes)
if internal:
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
else:
cluster_nodes = [(node, cluster_operations.get_remote_login_settings(pool.id, node.id)) for node in nodes]
try:
generated_username, ssh_key = cluster_operations.generate_user_on_cluster(pool.id, nodes)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
try:
output = asyncio.get_event_loop().run_until_complete(
ssh_lib.clus_copy(
container_name=container_name,
username=generated_username,
nodes=cluster_nodes,
source_path=source_path,
destination_path=destination_path,
ssh_key=ssh_key.exportKey().decode('utf-8'),
get=get,
timeout=timeout
)
)
return output
except (OSError, batch_error.BatchErrorException) as exc:
raise exc
finally:
cluster_operations.delete_user_on_cluster(pool.id, nodes, generated_username)

Просмотреть файл

@ -0,0 +1,67 @@
from datetime import timedelta
import azure.batch.models as batch_models
from aztk import models
from aztk.utils import helpers, constants
def create_pool_and_job(core_cluster_operations, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, VmImageModel):
"""
Create a pool and job
:param cluster_conf: the configuration object used to create the cluster
:type cluster_conf: aztk.models.ClusterConfiguration
:parm software_metadata_key: the id of the software being used on the cluster
:param start_task: the start task for the cluster
:param VmImageModel: the type of image to provision for the cluster
:param wait: wait until the cluster is ready
"""
core_cluster_operations.get_cluster_data(cluster_conf.cluster_id).save_cluster_config(cluster_conf)
# reuse pool_id as job_id
pool_id = cluster_conf.cluster_id
job_id = cluster_conf.cluster_id
# Get a verified node agent sku
sku_to_use, image_ref_to_use = \
helpers.select_latest_verified_vm_image_with_node_agent_sku(
VmImageModel.publisher, VmImageModel.offer, VmImageModel.sku, core_cluster_operations.batch_client)
network_conf = None
if cluster_conf.subnet_id is not None:
network_conf = batch_models.NetworkConfiguration(
subnet_id=cluster_conf.subnet_id)
auto_scale_formula = "$TargetDedicatedNodes={0}; $TargetLowPriorityNodes={1}".format(
cluster_conf.size, cluster_conf.size_low_priority)
# Configure the pool
pool = batch_models.PoolAddParameter(
id=pool_id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use,
node_agent_sku_id=sku_to_use),
vm_size=cluster_conf.vm_size,
enable_auto_scale=True,
auto_scale_formula=auto_scale_formula,
auto_scale_evaluation_interval=timedelta(minutes=5),
start_task=start_task,
enable_inter_node_communication=True if not cluster_conf.subnet_id else False,
max_tasks_per_node=4,
network_configuration=network_conf,
metadata=[
batch_models.MetadataItem(
name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(
name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_CLUSTER_MODE_METADATA)
])
# Create the pool + create user for the pool
helpers.create_pool_if_not_exist(pool, core_cluster_operations.batch_client)
# Create job
job = batch_models.JobAddParameter(
id=job_id,
pool_info=batch_models.PoolInformation(pool_id=pool_id))
# Add job to batch
core_cluster_operations.batch_client.job.add(job)
return helpers.get_cluster(cluster_conf.cluster_id, core_cluster_operations.batch_client)

Просмотреть файл

@ -0,0 +1,31 @@
import azure.batch.models as batch_models
def delete_pool_and_job(core_cluster_operations, pool_id: str, keep_logs: bool = False):
"""
Delete a pool and it's associated job
:param cluster_id: the pool to add the user to
:return bool: deleted the pool if exists and job if exists
"""
# job id is equal to pool id
job_id = pool_id
job_exists = True
try:
core_cluster_operations.batch_client.job.get(job_id)
except batch_models.batch_error.BatchErrorException:
job_exists = False
pool_exists = core_cluster_operations.batch_client.pool.exists(pool_id)
if job_exists:
core_cluster_operations.batch_client.job.delete(job_id)
if pool_exists:
core_cluster_operations.batch_client.pool.delete(pool_id)
if not keep_logs:
cluster_data = core_cluster_operations.get_cluster_data(pool_id)
cluster_data.delete_container(pool_id)
return job_exists or pool_exists

Просмотреть файл

@ -0,0 +1,15 @@
#TODO: return Cluster instead of (pool, nodes)
from aztk import models
def get_pool_details(core_cluster_operations, cluster_id: str):
"""
Print the information for the given cluster
:param cluster_id: Id of the cluster
:return pool: CloudPool, nodes: ComputeNodePaged
"""
pool = core_cluster_operations.batch_client.pool.get(cluster_id)
nodes = core_cluster_operations.batch_client.compute_node.list(pool_id=cluster_id)
return models.Cluster(pool, nodes)

Просмотреть файл

@ -0,0 +1,20 @@
from aztk import models
from aztk.utils import constants
def list_clusters(cluster_client, software_metadata_key):
"""
List all the cluster on your account.
"""
pools = cluster_client.batch_client.pool.list()
software_metadata = (
constants.AZTK_SOFTWARE_METADATA_KEY, software_metadata_key)
cluster_metadata = (
constants.AZTK_MODE_METADATA_KEY, constants.AZTK_CLUSTER_MODE_METADATA)
aztk_clusters = []
for pool in [pool for pool in pools if pool.metadata]:
pool_metadata = [(metadata.name, metadata.value) for metadata in pool.metadata]
if all([metadata in pool_metadata for metadata in [software_metadata, cluster_metadata]]):
aztk_clusters.append(models.Cluster(pool))
return aztk_clusters

Просмотреть файл

@ -0,0 +1,12 @@
import time
import azure.batch.models as batch_models
def wait_for_task_to_complete(core_cluster_operations, job_id: str, task_id: str):
while True:
task = core_cluster_operations.batch_client.task.get(job_id=job_id, task_id=task_id)
if task.state != batch_models.TaskState.completed:
time.sleep(2)
else:
return

Просмотреть файл

@ -0,0 +1,94 @@
from aztk.client.base import BaseOperations
from aztk.models import ClusterConfiguration
from .helpers import copy, create, delete, get, list, wait_for_task_to_complete
class CoreClusterOperations(BaseOperations):
def create(self, cluster_configuration: ClusterConfiguration, software_metadata_key: str, start_task,
vm_image_model):
"""Create a cluster.
Args:
cluster_configuration (:obj:`aztk.models.ClusterConfiguration`): Configuration for the cluster to be created
software_metadata_key (:obj:`str`): the key for the primary software that will be run on the cluster
start_task (:obj:`azure.batch.models.StartTask`): Batch StartTask defintion to configure the Batch Pool
vm_image_model (:obj:`azure.batch.models.VirtualMachineConfiguration`): Configuration of the virtual machine image and settings
Returns:
:obj:`aztk.models.Cluster`: A Cluster object representing the state and configuration of the cluster.
"""
return create.create_pool_and_job(self, cluster_configuration, software_metadata_key, start_task,
vm_image_model)
def get(self, id: str):
"""Get the state and configuration of a cluster
Args:
id (:obj:`str`): the id of the cluster to get.
Returns:
:obj:`aztk.models.Cluster`: A Cluster object representing the state and configuration of the cluster.
"""
return get.get_pool_details(self, id)
def copy(self, id, source_path, destination_path=None, container_name=None, internal=False, get=False,
timeout=None):
"""Copy files to or from every node in a cluster.
Args:
id (:obj:`str`): the id of the cluster to copy files with.
source_path (:obj:`str`): the path of the file to copy from.
destination_path (:obj:`str`, optional): the local directory path where the output should be written.
If None, a SpooledTemporaryFile will be returned in the NodeOutput object, else the file will be
written to this path. Defaults to None.
container_name (:obj:`str`, optional): the name of the container to copy to or from.
If None, the copy operation will occur on the host VM, Defaults to None.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
get (:obj:`bool`, optional): If True, the file are downloaded from every node in the cluster.
Else, the file is copied from the client to the node. Defaults to False.
timeout (:obj:`int`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`List[aztk.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command.
"""
return copy.cluster_copy(self, id, source_path, destination_path, container_name, internal, get, timeout)
def delete(self, id: str, keep_logs: bool = False):
"""Copy files to or from every node in a cluster.
Args:
id (:obj:`str`): the id of the cluster to delete
keep_logs (:obj:`bool`): If True, the logs related to this cluster in Azure Storage are not deleted.
Defaults to False.
Returns:
:obj:`List[aztk.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command.
"""
return delete.delete_pool_and_job(self, id, keep_logs)
def list(self, software_metadata_key):
"""List clusters running the specified software.
Args:
software_metadata_key(:obj:`str`): the key of the primary softare running on the cluster.
This filters out non-aztk clusters and aztk clusters running other software.
Returns:
:obj:`List[aztk.models.Cluster]`: list of clusters running the software defined by software_metadata_key
"""
return list.list_clusters(self, software_metadata_key)
def wait(self, id, task_name):
"""Wait until the task has completed
Args:
id (:obj:`str`): the id of the job the task was submitted to
task_name (:obj:`str`): the name of the task to wait for
Returns:
:obj:`None`
"""
return wait_for_task_to_complete.wait_for_task_to_complete(self, id, task_name)

Просмотреть файл

@ -0,0 +1 @@
from .operations import CoreJobOperations

Просмотреть файл

Просмотреть файл

@ -0,0 +1,76 @@
from datetime import timedelta
import azure.batch.models as batch_models
from aztk.utils import helpers, constants
def submit_job(
job_client,
job_configuration,
start_task,
job_manager_task,
autoscale_formula,
software_metadata_key: str,
vm_image_model,
application_metadata):
"""
Job Submission
:param job_configuration -> aztk_sdk.spark.models.JobConfiguration
:param start_task -> batch_models.StartTask
:param job_manager_task -> batch_models.TaskAddParameter
:param autoscale_formula -> str
:param software_metadata_key -> str
:param vm_image_model -> aztk_sdk.models.VmImage
:returns None
"""
job_client.get_cluster_data(job_configuration.id).save_cluster_config(job_configuration.to_cluster_config())
# get a verified node agent sku
sku_to_use, image_ref_to_use = \
helpers.select_latest_verified_vm_image_with_node_agent_sku(
vm_image_model.publisher, vm_image_model.offer, vm_image_model.sku, job_client.batch_client)
# set up subnet if necessary
network_conf = None
if job_configuration.subnet_id:
network_conf = batch_models.NetworkConfiguration(subnet_id=job_configuration.subnet_id)
# set up a schedule for a recurring job
auto_pool_specification = batch_models.AutoPoolSpecification(
pool_lifetime_option=batch_models.PoolLifetimeOption.job_schedule,
auto_pool_id_prefix=job_configuration.id,
keep_alive=False,
pool=batch_models.PoolSpecification(
display_name=job_configuration.id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use),
vm_size=job_configuration.vm_size,
enable_auto_scale=True,
auto_scale_formula=autoscale_formula,
auto_scale_evaluation_interval=timedelta(minutes=5),
start_task=start_task,
enable_inter_node_communication=not job_configuration.mixed_mode(),
network_configuration=network_conf,
max_tasks_per_node=4,
metadata=[
batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_JOB_MODE_METADATA)
]))
# define job specification
job_spec = batch_models.JobSpecification(
pool_info=batch_models.PoolInformation(auto_pool_specification=auto_pool_specification),
display_name=job_configuration.id,
on_all_tasks_complete=batch_models.OnAllTasksComplete.terminate_job,
job_manager_task=job_manager_task,
metadata=[batch_models.MetadataItem(name='applications', value=application_metadata)])
# define schedule
schedule = batch_models.Schedule(do_not_run_until=None, do_not_run_after=None, start_window=None, recurrence_interval=None)
# create job schedule and add task
setup = batch_models.JobScheduleAddParameter(id=job_configuration.id, schedule=schedule, job_specification=job_spec)
job_client.batch_client.job_schedule.add(setup)
return job_client.batch_client.job_schedule.get(job_schedule_id=job_configuration.id)

Просмотреть файл

@ -0,0 +1,30 @@
from aztk.client.base import BaseOperations
from .helpers import submit
class CoreJobOperations(BaseOperations):
def submit(self, job_configuration, start_task, job_manager_task, autoscale_formula, software_metadata_key: str,
vm_image_model, application_metadata):
"""Submit a job
Jobs are a cluster definition and one or many application definitions which run on the cluster. The job's
cluster will be allocated and configured, then the applications will be executed with their output stored
in Azure Storage. When all applications have completed, the cluster will be automatically deleted.
Args:
job_configuration (:obj:`aztk.models.JobConfiguration`): Model defining the job's configuration.
start_task (:obj:`azure.batch.models.StartTask`): Batch StartTask defintion to configure the Batch Pool
job_manager_task (:obj:`azure.batch.models.JobManagerTask`): Batch JobManagerTask defintion to schedule
the defined applications on the cluster.
autoscale_formula (:obj:`str`): formula that defines the numbers of nodes allocated to the cluster.
software_metadata_key (:obj:`str`): the key of the primary softare running on the cluster.
vm_image_model
application_metadata (:obj:`List[str]`): list of the names of all applications that will be run as a
part of the job
Returns:
:obj:`azure.batch.models.CloudJobSchedule`: Model representing the Azure Batch JobSchedule state.
"""
return submit.submit_job(self, job_configuration, start_task, job_manager_task, autoscale_formula,
software_metadata_key, vm_image_model, application_metadata)

Просмотреть файл

@ -18,4 +18,5 @@ from .software import Software
from .cluster import Cluster from .cluster import Cluster
from .scheduling_target import SchedulingTarget from .scheduling_target import SchedulingTarget
from .port_forward_specification import PortForwardingSpecification from .port_forward_specification import PortForwardingSpecification
from .application_log import ApplicationLog
from .plugins import * from .plugins import *

Просмотреть файл

@ -0,0 +1,12 @@
import azure.batch.models as batch_models
class ApplicationLog():
def __init__(self, name: str, cluster_id: str, log: str, total_bytes: int,
application_state: batch_models.TaskState, exit_code: int):
self.name = name
self.cluster_id = cluster_id # TODO: change to something cluster/job agnostic
self.log = log
self.total_bytes = total_bytes
self.application_state = application_state
self.exit_code = exit_code

Просмотреть файл

@ -1,10 +1,11 @@
""" """
This is the code that all nodes will run in their start task to try to allocate the master This is the code that all nodes will run in their start task to try to allocate the master
""" """
import azure.batch.batch_service_client as batch import azure.batch.batch_service_client as batch
import azure.batch.models as batchmodels import azure.batch.models as batchmodels
import azure.batch.models.batch_error as batcherror import azure.batch.models.batch_error as batcherror
from msrest.exceptions import ClientRequestError
from core import config from core import config
MASTER_NODE_METADATA_KEY = "_spark_master_node" MASTER_NODE_METADATA_KEY = "_spark_master_node"
@ -36,7 +37,7 @@ def try_assign_self_as_master(client: batch.BatchServiceClient, pool: batchmodel
if_match=pool.e_tag, if_match=pool.e_tag,
)) ))
return True return True
except batcherror.BatchErrorException: except (batcherror.BatchErrorException, ClientRequestError):
print("Couldn't assign itself as master the pool because the pool was modified since last get.") print("Couldn't assign itself as master the pool because the pool was modified since last get.")
return False return False

Просмотреть файл

@ -1,361 +0,0 @@
from typing import List
import azure.batch.models.batch_error as batch_error
import aztk
from aztk import error
from aztk.client import Client as BaseClient
from aztk.internal.cluster_data import NodeData
from aztk.spark import models
from aztk.spark.helpers import create_cluster as create_cluster_helper
from aztk.spark.helpers import get_log as get_log_helper
from aztk.spark.helpers import job_submission as job_submit_helper
from aztk.spark.helpers import submit as cluster_submit_helper
from aztk.spark.helpers import cluster_diagnostic_helper
from aztk.spark.utils import util
from aztk.utils import helpers
class Client(BaseClient):
"""
Aztk Spark Client
This is the main entry point for using aztk for spark
Args:
secrets_config(aztk.spark.models.models.SecretsConfiguration): Configuration with all the needed credentials
"""
def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False):
"""
Create a new aztk spark cluster
Args:
cluster_conf(aztk.spark.models.models.ClusterConfiguration): Configuration for the the cluster to be created
wait(bool): If you should wait for the cluster to be ready before returning
Returns:
aztk.spark.models.Cluster
"""
cluster_conf = _apply_default_for_cluster_config(cluster_conf)
cluster_conf.validate()
cluster_data = self._get_cluster_data(cluster_conf.cluster_id)
try:
zip_resource_files = None
node_data = NodeData(cluster_conf).add_core().done()
zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file()
start_task = create_cluster_helper.generate_cluster_start_task(self,
zip_resource_files,
cluster_conf.cluster_id,
cluster_conf.gpu_enabled(),
cluster_conf.get_docker_repo(),
cluster_conf.file_shares,
cluster_conf.plugins,
cluster_conf.mixed_mode(),
cluster_conf.worker_on_master)
software_metadata_key = "spark"
vm_image = models.VmImage(
publisher='Canonical',
offer='UbuntuServer',
sku='16.04')
cluster = self.__create_pool_and_job(
cluster_conf, software_metadata_key, start_task, vm_image)
# Wait for the master to be ready
if wait:
util.wait_for_master_to_be_ready(self, cluster.id)
cluster = self.get_cluster(cluster.id)
return cluster
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def create_clusters_in_parallel(self, cluster_confs):
for cluster_conf in cluster_confs:
self.create_cluster(cluster_conf)
def delete_cluster(self, cluster_id: str, keep_logs: bool = False):
try:
return self.__delete_pool_and_job(cluster_id, keep_logs)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def get_cluster(self, cluster_id: str):
try:
pool, nodes = self.__get_pool_details(cluster_id)
return models.Cluster(pool, nodes)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def list_clusters(self):
try:
return [models.Cluster(pool) for pool in self.__list_clusters(aztk.models.Software.spark)]
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def get_remote_login_settings(self, cluster_id: str, node_id: str):
try:
return self.__get_remote_login_settings(cluster_id, node_id)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def submit(self, cluster_id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False):
try:
cluster_submit_helper.submit_application(self, cluster_id, application, remote, wait)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def submit_all_applications(self, cluster_id: str, applications):
for application in applications:
self.submit(cluster_id, application)
def wait_until_application_done(self, cluster_id: str, task_id: str):
try:
helpers.wait_for_task_to_complete(job_id=cluster_id, task_id=task_id, batch_client=self.batch_client)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def wait_until_applications_done(self, cluster_id: str):
try:
helpers.wait_for_tasks_to_complete(job_id=cluster_id, batch_client=self.batch_client)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def wait_until_cluster_is_ready(self, cluster_id: str):
try:
util.wait_for_master_to_be_ready(self, cluster_id)
pool = self.batch_client.pool.get(cluster_id)
nodes = self.batch_client.compute_node.list(pool_id=cluster_id)
return models.Cluster(pool, nodes)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def wait_until_all_clusters_are_ready(self, clusters: List[str]):
for cluster_id in clusters:
self.wait_until_cluster_is_ready(cluster_id)
def create_user(self, cluster_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
try:
cluster = self.get_cluster(cluster_id)
master_node_id = cluster.master_node_id
if not master_node_id:
raise error.ClusterNotReadyError("The master has not yet been picked, a user cannot be added.")
self.__create_user_on_pool(username, cluster.id, cluster.nodes, ssh_key, password)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def get_application_log(self, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
try:
return get_log_helper.get_log(self.batch_client, self.blob_client,
cluster_id, application_name, tail, current_bytes)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def get_application_status(self, cluster_id: str, app_name: str):
try:
task = self.batch_client.task.get(cluster_id, app_name)
return task.state._value_
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False, timeout=None):
try:
return self.__cluster_run(cluster_id,
command,
internal,
container_name='spark' if not host else None,
timeout=timeout)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def node_run(self, cluster_id: str, node_id: str, command: str, host=False, internal: bool = False, timeout=None):
try:
return self.__node_run(cluster_id,
node_id,
command,
internal,
container_name='spark' if not host else None,
timeout=timeout)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout: int = None):
try:
container_name = None if host else 'spark'
return self.__cluster_copy(cluster_id,
source_path,
destination_path=destination_path,
container_name=container_name,
get=False,
internal=internal,
timeout=timeout)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def cluster_download(self, cluster_id: str, source_path: str, destination_path: str = None, host: bool = False, internal: bool = False, timeout: int = None):
try:
container_name = None if host else 'spark'
return self.__cluster_copy(cluster_id,
source_path,
destination_path=destination_path,
container_name=container_name,
get=True,
internal=internal,
timeout=timeout)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def cluster_ssh_into_master(self, cluster_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
try:
self.__ssh_into_node(cluster_id, node_id, username, ssh_key, password, port_forward_list, internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
'''
job submission
'''
def submit_job(self, job_configuration: models.JobConfiguration):
try:
job_configuration = _apply_default_for_job_config(job_configuration)
job_configuration.validate()
cluster_data = self._get_cluster_data(job_configuration.id)
node_data = NodeData(job_configuration.to_cluster_config()).add_core().done()
zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file()
start_task = create_cluster_helper.generate_cluster_start_task(self,
zip_resource_files,
job_configuration.id,
job_configuration.gpu_enabled,
job_configuration.get_docker_repo(),
mixed_mode=job_configuration.mixed_mode(),
worker_on_master=job_configuration.worker_on_master)
application_tasks = []
for application in job_configuration.applications:
application_tasks.append(
(application, cluster_submit_helper.generate_task(self, job_configuration.id, application))
)
job_manager_task = job_submit_helper.generate_task(self, job_configuration, application_tasks)
software_metadata_key = "spark"
vm_image = models.VmImage(
publisher='Canonical',
offer='UbuntuServer',
sku='16.04')
autoscale_formula = "$TargetDedicatedNodes = {0}; " \
"$TargetLowPriorityNodes = {1}".format(
job_configuration.max_dedicated_nodes,
job_configuration.max_low_pri_nodes)
job = self.__submit_job(
job_configuration=job_configuration,
start_task=start_task,
job_manager_task=job_manager_task,
autoscale_formula=autoscale_formula,
software_metadata_key=software_metadata_key,
vm_image_model=vm_image,
application_metadata='\n'.join(application.name for application in (job_configuration.applications or [])))
return models.Job(job)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def list_jobs(self):
try:
return [models.Job(cloud_job_schedule) for cloud_job_schedule in job_submit_helper.list_jobs(self)]
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def list_applications(self, job_id):
try:
applications = job_submit_helper.list_applications(self, job_id)
for item in applications:
if applications[item]:
applications[item] = models.Application(applications[item])
return applications
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def get_job(self, job_id):
try:
job, apps, pool, nodes = job_submit_helper.get_job(self, job_id)
return models.Job(job, apps, pool, nodes)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def stop_job(self, job_id):
try:
return job_submit_helper.stop(self, job_id)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def delete_job(self, job_id: str, keep_logs: bool = False):
try:
return job_submit_helper.delete(self, job_id, keep_logs)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def get_application(self, job_id, application_name):
try:
return models.Application(job_submit_helper.get_application(self, job_id, application_name))
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def get_job_application_log(self, job_id, application_name):
try:
return job_submit_helper.get_application_log(self, job_id, application_name)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def stop_job_app(self, job_id, application_name):
try:
return job_submit_helper.stop_app(self, job_id, application_name)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def wait_until_job_finished(self, job_id):
try:
job_submit_helper.wait_until_job_finished(self, job_id)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def wait_until_all_jobs_finished(self, jobs):
for job in jobs:
self.wait_until_job_finished(job)
def run_cluster_diagnostics(self, cluster_id, output_directory=None):
try:
output = cluster_diagnostic_helper.run(self, cluster_id, output_directory)
return output
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
def _default_scheduling_target(vm_count: int):
if vm_count == 0:
return models.SchedulingTarget.Any
else:
return models.SchedulingTarget.Dedicated
def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration):
cluster_conf = models.ClusterConfiguration()
cluster_conf.merge(configuration)
if cluster_conf.scheduling_target is None:
cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size)
return cluster_conf
def _apply_default_for_job_config(job_conf: models.JobConfiguration):
if job_conf.scheduling_target is None:
job_conf.scheduling_target = _default_scheduling_target(job_conf.max_dedicated_nodes)
return job_conf

Просмотреть файл

@ -0,0 +1 @@
from .client import Client

Просмотреть файл

@ -0,0 +1 @@
from .operations import SparkBaseOperations

Просмотреть файл

Просмотреть файл

@ -0,0 +1,96 @@
import os
import azure.batch.models as batch_models
import yaml
from aztk.utils import helpers
from aztk.utils.command_builder import CommandBuilder
def generate_application_task(core_base_operations, container_id, application, remote=False):
resource_files = []
# The application provided is not hosted remotely and therefore must be uploaded
if not remote:
app_resource_file = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=application.application,
blob_client=core_base_operations.blob_client,
use_full_path=False)
# Upload application file
resource_files.append(app_resource_file)
application.application = '$AZ_BATCH_TASK_WORKING_DIR/' + os.path.basename(application.application)
# Upload dependent JARS
jar_resource_file_paths = []
for jar in application.jars:
current_jar_resource_file_path = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=jar,
blob_client=core_base_operations.blob_client,
use_full_path=False)
jar_resource_file_paths.append(current_jar_resource_file_path)
resource_files.append(current_jar_resource_file_path)
# Upload dependent python files
py_files_resource_file_paths = []
for py_file in application.py_files:
current_py_files_resource_file_path = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=py_file,
blob_client=core_base_operations.blob_client,
use_full_path=False)
py_files_resource_file_paths.append(current_py_files_resource_file_path)
resource_files.append(current_py_files_resource_file_path)
# Upload other dependent files
files_resource_file_paths = []
for file in application.files:
files_resource_file_path = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=file,
blob_client=core_base_operations.blob_client,
use_full_path=False)
files_resource_file_paths.append(files_resource_file_path)
resource_files.append(files_resource_file_path)
# Upload application definition
application.jars = [os.path.basename(jar) for jar in application.jars]
application.py_files = [os.path.basename(py_files) for py_files in application.py_files]
application.files = [os.path.basename(files) for files in application.files]
application_definition_file = helpers.upload_text_to_container(
container_name=container_id,
application_name=application.name,
file_path='application.yaml',
content=yaml.dump(vars(application)),
blob_client=core_base_operations.blob_client)
resource_files.append(application_definition_file)
# create command to submit task
task_cmd = CommandBuilder('sudo docker exec')
task_cmd.add_argument('-i')
task_cmd.add_option('-e', 'AZ_BATCH_TASK_WORKING_DIR=$AZ_BATCH_TASK_WORKING_DIR')
task_cmd.add_option('-e', 'STORAGE_LOGS_CONTAINER={0}'.format(container_id))
task_cmd.add_argument('spark /bin/bash >> output.log 2>&1')
task_cmd.add_argument('-c "source ~/.bashrc; ' \
'export PYTHONPATH=$PYTHONPATH:\$AZTK_WORKING_DIR; ' \
'cd \$AZ_BATCH_TASK_WORKING_DIR; ' \
'\$AZTK_WORKING_DIR/.aztk-env/.venv/bin/python \$AZTK_WORKING_DIR/aztk/node_scripts/submit.py"')
# Create task
task = batch_models.TaskAddParameter(
id=application.name,
command_line=helpers.wrap_commands_in_shell([task_cmd.to_str()]),
resource_files=resource_files,
constraints=batch_models.TaskConstraints(max_task_retry_count=application.max_retry_count),
user_identity=batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.task, elevation_level=batch_models.ElevationLevel.admin)))
return task

Просмотреть файл

@ -0,0 +1,148 @@
from typing import List
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.internal.cluster_data import NodeData
from aztk.spark import models
from aztk.spark.utils import util
from aztk.utils import constants, helpers
from aztk.spark import models
POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool, elevation_level=batch_models.ElevationLevel.admin))
def _get_aztk_environment(cluster_id, worker_on_master, mixed_mode):
envs = []
envs.append(batch_models.EnvironmentSetting(name="AZTK_MIXED_MODE", value=helpers.bool_env(mixed_mode)))
envs.append(batch_models.EnvironmentSetting(name="AZTK_WORKER_ON_MASTER", value=helpers.bool_env(worker_on_master)))
envs.append(batch_models.EnvironmentSetting(name="AZTK_CLUSTER_ID", value=cluster_id))
return envs
def __get_docker_credentials(core_base_operations):
creds = []
docker = core_base_operations.secrets_configuration.docker
if docker:
if docker.endpoint:
creds.append(batch_models.EnvironmentSetting(name="DOCKER_ENDPOINT", value=docker.endpoint))
if docker.username:
creds.append(batch_models.EnvironmentSetting(name="DOCKER_USERNAME", value=docker.username))
if docker.password:
creds.append(batch_models.EnvironmentSetting(name="DOCKER_PASSWORD", value=docker.password))
return creds
def __get_secrets_env(core_base_operations):
shared_key = core_base_operations.secrets_configuration.shared_key
service_principal = core_base_operations.secrets_configuration.service_principal
if shared_key:
return [
batch_models.EnvironmentSetting(name="BATCH_SERVICE_URL", value=shared_key.batch_service_url),
batch_models.EnvironmentSetting(name="BATCH_ACCOUNT_KEY", value=shared_key.batch_account_key),
batch_models.EnvironmentSetting(name="STORAGE_ACCOUNT_NAME", value=shared_key.storage_account_name),
batch_models.EnvironmentSetting(name="STORAGE_ACCOUNT_KEY", value=shared_key.storage_account_key),
batch_models.EnvironmentSetting(name="STORAGE_ACCOUNT_SUFFIX", value=shared_key.storage_account_suffix),
]
else:
return [
batch_models.EnvironmentSetting(name="SP_TENANT_ID", value=service_principal.tenant_id),
batch_models.EnvironmentSetting(name="SP_CLIENT_ID", value=service_principal.client_id),
batch_models.EnvironmentSetting(name="SP_CREDENTIAL", value=service_principal.credential),
batch_models.EnvironmentSetting(
name="SP_BATCH_RESOURCE_ID", value=service_principal.batch_account_resource_id),
batch_models.EnvironmentSetting(
name="SP_STORAGE_RESOURCE_ID", value=service_principal.storage_account_resource_id),
]
def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
gpu_enabled: bool,
docker_repo: str = None,
plugins=None,
worker_on_master: bool = True,
file_mounts=None,
mixed_mode: bool = False):
"""
For Docker on ubuntu 16.04 - return the command line
to be run on the start task of the pool to setup spark.
"""
default_docker_repo = constants.DEFAULT_DOCKER_REPO if not gpu_enabled else constants.DEFAULT_DOCKER_REPO_GPU
docker_repo = docker_repo or default_docker_repo
shares = []
if file_mounts:
for mount in file_mounts:
# Create the directory on the node
shares.append('mkdir -p {0}'.format(mount.mount_path))
# Mount the file share
shares.append(
'mount -t cifs //{0}.file.core.windows.net/{2} {3} -o vers=3.0,username={0},password={1},dir_mode=0777,file_mode=0777,sec=ntlmssp'.
format(mount.storage_account_name, mount.storage_account_key, mount.file_share_path, mount.mount_path))
setup = [
'time('\
'apt-get -y update;'\
'apt-get -y --no-install-recommends install unzip;'\
'unzip -o $AZ_BATCH_TASK_WORKING_DIR/{0};'\
'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/aztk/node_scripts/setup_host.sh;'\
') 2>&1'.format(zip_resource_file.file_path),
'/bin/bash $AZ_BATCH_TASK_WORKING_DIR/aztk/node_scripts/setup_host.sh {0} {1}'.format(
constants.DOCKER_SPARK_CONTAINER_NAME,
docker_repo,
)
]
commands = shares + setup
return commands
def generate_cluster_start_task(core_base_operations,
zip_resource_file: batch_models.ResourceFile,
cluster_id: str,
gpu_enabled: bool,
docker_repo: str = None,
file_shares: List[models.FileShare] = None,
plugins: List[models.PluginConfiguration] = None,
mixed_mode: bool = False,
worker_on_master: bool = True):
"""
This will return the start task object for the pool to be created.
:param cluster_id str: Id of the cluster(Used for uploading the resource files)
:param zip_resource_file: Resource file object pointing to the zip file containing scripts to run on the node
"""
resource_files = [zip_resource_file]
spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT
spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT
spark_job_ui_port = constants.DOCKER_SPARK_JOB_UI_PORT
spark_container_name = constants.DOCKER_SPARK_CONTAINER_NAME
spark_submit_logs_file = constants.SPARK_SUBMIT_LOGS_FILE
# TODO use certificate
environment_settings = __get_secrets_env(core_base_operations) + [
batch_models.EnvironmentSetting(name="SPARK_WEB_UI_PORT", value=spark_web_ui_port),
batch_models.EnvironmentSetting(name="SPARK_WORKER_UI_PORT", value=spark_worker_ui_port),
batch_models.EnvironmentSetting(name="SPARK_JOB_UI_PORT", value=spark_job_ui_port),
batch_models.EnvironmentSetting(name="SPARK_CONTAINER_NAME", value=spark_container_name),
batch_models.EnvironmentSetting(name="SPARK_SUBMIT_LOGS_FILE", value=spark_submit_logs_file),
batch_models.EnvironmentSetting(name="AZTK_GPU_ENABLED", value=helpers.bool_env(gpu_enabled)),
] + __get_docker_credentials(core_base_operations) + _get_aztk_environment(cluster_id, worker_on_master, mixed_mode)
# start task command
command = __cluster_install_cmd(zip_resource_file, gpu_enabled, docker_repo, plugins, worker_on_master, file_shares,
mixed_mode)
return batch_models.StartTask(
command_line=helpers.wrap_commands_in_shell(command),
resource_files=resource_files,
environment_settings=environment_settings,
user_identity=POOL_ADMIN_USER_IDENTITY,
wait_for_success=True)

Просмотреть файл

@ -0,0 +1,64 @@
from typing import List
import azure.batch.models as batch_models
from aztk.client.base import BaseOperations as CoreBaseOperations
from aztk.spark import models
from .helpers import generate_cluster_start_task, generate_application_task
class SparkBaseOperations:
"""Spark Base operations object that all other Spark operations objects inherit from
"""
#TODO: make this private or otherwise not public
def _generate_cluster_start_task(self,
core_base_operations,
zip_resource_file: batch_models.ResourceFile,
id: str,
gpu_enabled: bool,
docker_repo: str = None,
file_shares: List[models.FileShare] = None,
plugins: List[models.PluginConfiguration] = None,
mixed_mode: bool = False,
worker_on_master: bool = True):
"""Generate the Azure Batch Start Task to provision a Spark cluster.
Args:
zip_resource_file (:obj:`azure.batch.models.ResourceFile`): a single zip file of all necessary data
to upload to the cluster.
id (:obj:`str`): the id of the cluster.
gpu_enabled (:obj:`bool`): if True, the cluster is GPU enabled.
docker_repo (:obj:`str`, optional): the docker repository and tag that identifies the docker image to use.
If None, the default Docker image will be used. Defaults to None.
file_shares (:obj:`aztk.spark.models.FileShare`, optional): a list of FileShares to mount on the cluster.
Defaults to None.
plugins (:obj:`aztk.spark.models.PluginConfiguration`, optional): a list of plugins to set up on the cluster.
Defaults to None.
mixed_mode (:obj:`bool`, optional): If True, the cluster is configured to use both dedicated and low priority VMs.
Defaults to False.
worker_on_master (:obj:`bool`, optional): If True, the cluster is configured to provision a Spark worker
on the VM that runs the Spark master. Defaults to True.
Returns:
:obj:`azure.batch.models.StartTask`: the StartTask definition to provision the cluster.
"""
return generate_cluster_start_task.generate_cluster_start_task(
core_base_operations, zip_resource_file, id, gpu_enabled, docker_repo, file_shares, plugins, mixed_mode, worker_on_master)
#TODO: make this private or otherwise not public
def _generate_application_task(self, core_base_operations, container_id, application, remote=False):
"""Generate the Azure Batch Start Task to provision a Spark cluster.
Args:
container_id (:obj:`str`): the id of the container to run the application in
application (:obj:`aztk.spark.models.ApplicationConfiguration): the Application Definition
remote (:obj:`bool`): If True, the application file will not be uploaded, it is assumed to be reachable
by the cluster already. This is useful when your application is stored in a mounted Azure File Share
and not the client. Defaults to False.
Returns:
:obj:`azure.batch.models.TaskAddParameter`: the Task definition for the Application.
"""
return generate_application_task.generate_application_task(core_base_operations, container_id, application, remote)

233
aztk/spark/client/client.py Normal file
Просмотреть файл

@ -0,0 +1,233 @@
from typing import List
import azure.batch.models.batch_error as batch_error
import aztk
from aztk import error
from aztk import models as base_models
from aztk.client import CoreClient
from aztk.internal.cluster_data import NodeData
from aztk.spark import models
from aztk.spark.client.cluster import ClusterOperations
from aztk.spark.client.job import JobOperations
from aztk.spark.helpers import cluster_diagnostic_helper
from aztk.spark.helpers import create_cluster as create_cluster_helper
from aztk.spark.helpers import get_log as get_log_helper
from aztk.spark.helpers import job_submission as job_submit_helper
from aztk.spark.helpers import submit as cluster_submit_helper
from aztk.spark.utils import util
from aztk.utils import azure_api, deprecated, deprecate, helpers
class Client(CoreClient):
"""The client used to create and manage Spark clusters
Attributes:
cluster (:obj:`aztk.spark.client.cluster.ClusterOperations`): Cluster
job (:obj:`aztk.spark.client.job.JobOperations`): Job
"""
def __init__(self, secrets_configuration: models.SecretsConfiguration = None, **kwargs):
self.secrets_configuration = None
context = None
if kwargs.get("secrets_config"):
deprecate(version="0.10.0", message="secrets_config key is deprecated in secrets.yaml",
advice="Please use secrets_configuration key instead.")
context = self._get_context(kwargs.get("secrets_config"))
else:
context = self._get_context(secrets_configuration)
self.cluster = ClusterOperations(context)
self.job = JobOperations(context)
# ALL THE FOLLOWING METHODS ARE DEPRECATED AND WILL BE REMOVED IN 0.10.0
@deprecated("0.10.0")
def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False):
return self.cluster.create(cluster_configuration=cluster_conf, wait=wait)
@deprecated("0.10.0")
def create_clusters_in_parallel(self, cluster_confs): # NOT IMPLEMENTED
for cluster_conf in cluster_confs:
self.cluster.create(cluster_conf)
@deprecated("0.10.0")
def delete_cluster(self, cluster_id: str, keep_logs: bool = False):
return self.cluster.delete(id=cluster_id, keep_logs=keep_logs)
@deprecated("0.10.0")
def get_cluster(self, cluster_id: str):
return self.cluster.get(id=cluster_id)
@deprecated("0.10.0")
def list_clusters(self):
return self.cluster.list()
@deprecated("0.10.0")
def get_remote_login_settings(self, cluster_id: str, node_id: str):
return self.cluster.get_remote_login_settings(cluster_id, node_id)
@deprecated("0.10.0")
def submit(self,
cluster_id: str,
application: models.ApplicationConfiguration,
remote: bool = False,
wait: bool = False):
return self.cluster.submit(id=cluster_id, application=application, remote=remote, wait=wait)
@deprecated("0.10.0")
def submit_all_applications(self, cluster_id: str, applications): # NOT IMPLEMENTED
for application in applications:
self.cluster.submit(cluster_id, application)
@deprecated("0.10.0")
def wait_until_application_done(self, cluster_id: str, task_id: str): # NOT IMPLEMENTED
try:
helpers.wait_for_task_to_complete(job_id=cluster_id, task_id=task_id, batch_client=self.batch_client)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_applications_done(self, cluster_id: str): # NOT IMPLEMENTED
try:
helpers.wait_for_tasks_to_complete(job_id=cluster_id, batch_client=self.batch_client)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_cluster_is_ready(self, cluster_id: str): # NOT IMPLEMENTED
try:
util.wait_for_master_to_be_ready(self.cluster._core_cluster_operations, self.cluster, cluster_id)
pool = self.batch_client.pool.get(cluster_id)
nodes = self.batch_client.compute_node.list(pool_id=cluster_id)
return models.Cluster(base_models.Cluster(pool, nodes))
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_all_clusters_are_ready(self, clusters: List[str]): # NOT IMPLEMENTED
for cluster_id in clusters:
self.wait_until_cluster_is_ready(cluster_id)
@deprecated("0.10.0")
def create_user(self, cluster_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
return self.cluster.create_user(id=cluster_id, username=username, password=password, ssh_key=ssh_key)
@deprecated("0.10.0")
def get_application_log(self, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
return self.cluster.get_application_log(
id=cluster_id, application_name=application_name, tail=tail, current_bytes=current_bytes)
@deprecated("0.10.0")
def get_application_status(self, cluster_id: str, app_name: str):
return self.cluster.get_application_status(id=cluster_id, application_name=app_name)
@deprecated("0.10.0")
def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False, timeout=None):
return self.cluster.run(id=cluster_id, command=command, host=host, internal=internal)
@deprecated("0.10.0")
def node_run(self, cluster_id: str, node_id: str, command: str, host=False, internal: bool = False, timeout=None):
return self.cluster.node_run(
id=cluster_id, node_id=node_id, command=command, host=host, internal=internal, timeout=timeout)
@deprecated("0.10.0")
def cluster_copy(self,
cluster_id: str,
source_path: str,
destination_path: str,
host: bool = False,
internal: bool = False,
timeout: int = None):
return self.cluster.copy(
id=cluster_id,
source_path=source_path,
destination_path=destination_path,
host=host,
internal=internal,
timeout=timeout)
@deprecated("0.10.0")
def cluster_download(self,
cluster_id: str,
source_path: str,
destination_path: str = None,
host: bool = False,
internal: bool = False,
timeout: int = None):
return self.cluster.download(
id=cluster_id,
source_path=source_path,
destination_path=destination_path,
host=host,
internal=internal,
timeout=timeout)
@deprecated("0.10.0")
def cluster_ssh_into_master(self,
cluster_id,
node_id,
username,
ssh_key=None,
password=None,
port_forward_list=None,
internal=False):
return self.cluster._core_cluster_operations.ssh_into_node(cluster_id, node_id, username, ssh_key, password, port_forward_list, internal)
'''
job submission
'''
@deprecated("0.10.0")
def submit_job(self, job_configuration: models.JobConfiguration):
return self.job.submit(job_configuration)
@deprecated("0.10.0")
def list_jobs(self):
return self.job.list()
@deprecated("0.10.0")
def list_applications(self, job_id):
return self.job.list_applications(job_id)
@deprecated("0.10.0")
def get_job(self, job_id):
return self.job.get(job_id)
@deprecated("0.10.0")
def stop_job(self, job_id):
return self.job.stop(job_id)
@deprecated("0.10.0")
def delete_job(self, job_id: str, keep_logs: bool = False):
return self.job.delete(job_id, keep_logs)
@deprecated("0.10.0")
def get_application(self, job_id, application_name):
return self.job.get_application(job_id, application_name)
@deprecated("0.10.0")
def get_job_application_log(self, job_id, application_name):
return self.job.get_application_log(job_id, application_name)
@deprecated("0.10.0")
def stop_job_app(self, job_id, application_name): # NOT IMPLEMENTED
try:
return job_submit_helper.stop_app(self, job_id, application_name)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_job_finished(self, job_id):
try:
self.job.wait(job_id)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_all_jobs_finished(self, jobs): # NOT IMPLEMENTED
for job in jobs:
self.wait_until_job_finished(job)
@deprecated("0.10.0")
def run_cluster_diagnostics(self, cluster_id, output_directory=None):
return self.cluster.diagnostics(cluster_id, output_directory)

Просмотреть файл

@ -0,0 +1 @@
from .operations import ClusterOperations

Просмотреть файл

Просмотреть файл

@ -0,0 +1,19 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def cluster_copy(core_cluster_operations, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout: int = None):
try:
container_name = None if host else 'spark'
return core_cluster_operations.copy(
cluster_id,
source_path,
destination_path=destination_path,
container_name=container_name,
get=False,
internal=internal,
timeout=timeout)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,67 @@
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk import models as base_models
from aztk.internal.cluster_data import NodeData
from aztk.spark import models
from aztk.spark.utils import constants, util
from aztk.utils import helpers
POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool, elevation_level=batch_models.ElevationLevel.admin))
def _default_scheduling_target(vm_count: int):
if vm_count == 0:
return models.SchedulingTarget.Any
else:
return models.SchedulingTarget.Dedicated
def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration):
cluster_conf = models.ClusterConfiguration()
cluster_conf.merge(configuration)
if cluster_conf.scheduling_target is None:
cluster_conf.scheduling_target = _default_scheduling_target(cluster_conf.size)
return cluster_conf
def create_cluster(core_cluster_operations, spark_cluster_operations, cluster_conf: models.ClusterConfiguration, wait: bool = False):
"""
Create a new aztk spark cluster
Args:
cluster_conf(aztk.spark.models.models.ClusterConfiguration): Configuration for the the cluster to be created
wait(bool): If you should wait for the cluster to be ready before returning
Returns:
:obj:`aztk.spark.models.Cluster`
"""
cluster_conf = _apply_default_for_cluster_config(cluster_conf)
cluster_conf.validate()
cluster_data = core_cluster_operations.get_cluster_data(cluster_conf.cluster_id)
try:
zip_resource_files = None
node_data = NodeData(cluster_conf).add_core().done()
zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file()
start_task = spark_cluster_operations._generate_cluster_start_task(core_cluster_operations, zip_resource_files, cluster_conf.cluster_id,
cluster_conf.gpu_enabled(), cluster_conf.get_docker_repo(),
cluster_conf.file_shares, cluster_conf.plugins,
cluster_conf.mixed_mode(), cluster_conf.worker_on_master)
software_metadata_key = base_models.Software.spark
cluster = core_cluster_operations.create(cluster_conf, software_metadata_key, start_task, constants.SPARK_VM_IMAGE)
# Wait for the master to be ready
if wait:
util.wait_for_master_to_be_ready(core_cluster_operations, spark_cluster_operations, cluster.id)
cluster = spark_cluster_operations.get(cluster.id)
return cluster
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,15 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def create_user(core_cluster_operations, spark_cluster_operations, cluster_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
try:
cluster = spark_cluster_operations.get(cluster_id)
master_node_id = cluster.master_node_id
if not master_node_id:
raise error.ClusterNotReadyError("The master has not yet been picked, a user cannot be added.")
core_cluster_operations.create_user_on_cluster(cluster.id, cluster.nodes, username, ssh_key, password)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,11 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def delete_cluster(core_cluster_operations, cluster_id: str, keep_logs: bool = False):
try:
return core_cluster_operations.delete(cluster_id, keep_logs)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,44 @@
import os
from azure.batch.models import batch_error
from aztk import error
from aztk.utils import helpers
def _run(spark_cluster_operations, cluster_id, output_directory=None):
# copy debug program to each node
output = spark_cluster_operations.copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
ssh_cmd = _build_diagnostic_ssh_command()
run_output = spark_cluster_operations.run(cluster_id, ssh_cmd, host=True)
remote_path = "/tmp/debug.zip"
if output_directory:
local_path = os.path.join(os.path.abspath(output_directory), "debug.zip")
output = spark_cluster_operations.download(cluster_id, remote_path, local_path, host=True)
# write run output to debug/ directory
with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), 'w', encoding="UTF-8") as f:
[f.write(line + '\n') for node_output in run_output for line in node_output.output]
else:
output = spark_cluster_operations.download(cluster_id, remote_path, host=True)
return output
def _build_diagnostic_ssh_command():
return "sudo rm -rf /tmp/debug.zip; "\
"sudo apt-get install -y python3-pip; "\
"sudo -H pip3 install --upgrade pip; "\
"sudo -H pip3 install docker; "\
"sudo python3 /tmp/debug.py"
def run_cluster_diagnostics(spark_cluster_operations, cluster_id, output_directory=None):
try:
output = _run(spark_cluster_operations, cluster_id, output_directory)
return output
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,19 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def cluster_download(core_cluster_operations, cluster_id: str, source_path: str, destination_path: str = None, host: bool = False, internal: bool = False, timeout: int = None):
try:
container_name = None if host else 'spark'
return core_cluster_operations.copy(cluster_id,
source_path,
destination_path=destination_path,
container_name=container_name,
get=True,
internal=internal,
timeout=timeout)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,13 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
def get_cluster(core_cluster_operations, cluster_id: str):
try:
cluster = core_cluster_operations.get(cluster_id)
return models.Cluster(cluster)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,7 @@
from aztk.spark import models
def get_application_log(core_base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
base_application_log = core_base_operations.get_application_log(
cluster_id, application_name, tail, current_bytes)
return models.ApplicationLog(base_application_log)

Просмотреть файл

@ -0,0 +1,12 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def get_application_status(core_cluster_operations, cluster_id: str, app_name: str):
try:
task = core_cluster_operations.batch_client.task.get(cluster_id, app_name)
return task.state._value_
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,12 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
def get_remote_login_settings(core_cluster_operations, id: str, node_id: str):
try:
return models.RemoteLogin(core_cluster_operations.get_remote_login_settings(id, node_id))
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,14 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk import models as base_models
from aztk.spark import models
from aztk.utils import helpers
def list_clusters(core_cluster_operations):
try:
software_metadata_key = base_models.Software.spark
return [models.Cluster(cluster) for cluster in core_cluster_operations.list(software_metadata_key)]
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,18 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def node_run(core_cluster_operations,
cluster_id: str,
node_id: str,
command: str,
host=False,
internal: bool = False,
timeout=None):
try:
return core_cluster_operations.node_run(
cluster_id, node_id, command, internal, container_name='spark' if not host else None, timeout=timeout)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,12 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def cluster_run(core_cluster_operations, cluster_id: str, command: str, host=False, internal: bool = False, timeout=None):
try:
return core_cluster_operations.run(
cluster_id, command, internal, container_name='spark' if not host else None, timeout=timeout)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,12 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def cluster_ssh_into_master(spark_cluster_operations, cluster_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
try:
spark_cluster_operations.ssh_into_node(cluster_id, node_id, username, ssh_key, password, port_forward_list, internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,47 @@
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.error import AztkError
from aztk.spark import models
from aztk.utils import helpers
def __get_node(core_cluster_operations, node_id: str, cluster_id: str) -> batch_models.ComputeNode:
return core_cluster_operations.batch_client.compute_node.get(cluster_id, node_id)
def affinitize_task_to_master(core_cluster_operations, spark_cluster_operations, cluster_id, task):
cluster = spark_cluster_operations.get(cluster_id)
if cluster.master_node_id is None:
raise AztkError("Master has not yet been selected. Please wait until the cluster is finished provisioning.")
master_node = core_cluster_operations.batch_client.compute_node.get(pool_id=cluster_id, node_id=cluster.master_node_id)
task.affinity_info = batch_models.AffinityInformation(affinity_id=master_node.affinity_id)
return task
def submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote: bool = False, wait: bool = False):
"""
Submit a spark app
"""
task = spark_cluster_operations._generate_application_task(core_cluster_operations, cluster_id, application, remote)
task = affinitize_task_to_master(core_cluster_operations, spark_cluster_operations, cluster_id, task)
# Add task to batch job (which has the same name as cluster_id)
job_id = cluster_id
core_cluster_operations.batch_client.task.add(job_id=job_id, task=task)
if wait:
helpers.wait_for_task_to_complete(job_id=job_id, task_id=task.id, batch_client=core_cluster_operations.batch_client)
def submit(core_cluster_operations,
spark_cluster_operations,
cluster_id: str,
application: models.ApplicationConfiguration,
remote: bool = False,
wait: bool = False):
try:
submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,10 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def wait_for_application_to_complete(core_cluster_operations, id, application_name):
try:
return core_cluster_operations.wait(id, application_name)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,248 @@
from aztk.client.cluster import CoreClusterOperations
from aztk.spark import models
from aztk.spark.client.base import SparkBaseOperations
from .helpers import (copy, create, create_user, delete, diagnostics, download, get, get_application_log,
get_application_status, get_remote_login_settings, list, node_run, run, submit, wait)
class ClusterOperations(SparkBaseOperations):
"""Spark ClusterOperations object
Attributes:
_core_cluster_operations (:obj:`aztk.client.cluster.CoreClusterOperations`):
# _spark_base_cluster_operations (:obj:`aztk.spark.client.cluster.CoreClusterOperations`):
"""
def __init__(self, context):
self._core_cluster_operations = CoreClusterOperations(context)
# self._spark_base_cluster_operations = SparkBaseOperations()
def create(self, cluster_configuration: models.ClusterConfiguration, wait: bool = False):
"""Create a cluster.
Args:
cluster_configuration (:obj:`ClusterConfiguration`): Configuration for the cluster to be created.
wait (:obj:`bool`): if True, this function will block until the cluster creation is finished.
Returns:
:obj:`aztk.spark.models.Cluster`: An Cluster object representing the state and configuration of the cluster.
"""
return create.create_cluster(self._core_cluster_operations, self, cluster_configuration, wait)
def delete(self, id: str, keep_logs: bool = False):
"""Delete a cluster.
Args:
id (:obj:`str`): the id of the cluster to delete.
keep_logs (:obj:`bool`): If True, the logs related to this cluster in Azure Storage are not deleted.
Defaults to False.
Returns:
:obj:`bool`: True if the deletion process was successful.
"""
return delete.delete_cluster(self._core_cluster_operations, id, keep_logs)
def get(self, id: str):
"""Get details about the state of a cluster.
Args:
id (:obj:`str`): the id of the cluster to get.
Returns:
:obj:`aztk.spark.models.Cluster`: A Cluster object representing the state and configuration of the cluster.
"""
return get.get_cluster(self._core_cluster_operations, id)
def list(self):
"""List all clusters.
Returns:
:obj:`List[aztk.spark.models.Cluster]`: List of Cluster objects each representing the state and configuration of the cluster.
"""
return list.list_clusters(self._core_cluster_operations)
def submit(self, id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False):
"""Submit an application to a cluster.
Args:
id (:obj:`str`): the id of the cluster to submit the application to.
application (:obj:`aztk.spark.models.ApplicationConfiguration`): Application definition
remote (:obj:`bool`): If True, the application file will not be uploaded, it is assumed to be reachable
by the cluster already. This is useful when your application is stored in a mounted Azure File Share
and not the client. Defaults to False.
wait (:obj:`bool`, optional): If True, this function blocks until the application has completed. Defaults to False.
Returns:
:obj:`None`
"""
return submit.submit(self._core_cluster_operations, self, id, application, remote, wait)
def create_user(self, id: str, username: str, password: str = None, ssh_key: str = None):
"""Create a user on every node in the cluster
Args:
username (:obj:`str`): name of the user to create.
pool_id (:obj:`str`): id of the cluster to create the user on.
ssh_key (:obj:`str`, optional): ssh public key to create the user with, must use ssh_key or password. Defaults to None.
password (:obj:`str`, optional): password for the user, must use ssh_key or password. Defaults to None.
Returns:
:obj:`None`
"""
return create_user.create_user(self._core_cluster_operations, self, id, username, ssh_key, password)
def get_application_status(self, id: str, application_name: str):
"""Get the status of a submitted application
Args:
id (:obj:`str`): the name of the cluster the application was submitted to
application_name (:obj:`str`): the name of the application to get
Returns:
:obj:`str`: the status state of the application
"""
return get_application_status.get_application_status(self._core_cluster_operations, id, application_name)
def run(self, id: str, command: str, host=False, internal: bool = False, timeout=None):
"""Run a bash command on every node in the cluster
Args:
id (:obj:`str`): the id of the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if true, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`List[aztk.spark.models.NodeOutput]`: list of NodeOutput objects containing the output of the run command
"""
return run.cluster_run(self._core_cluster_operations, id, command, host, internal, timeout)
def node_run(self, id: str, node_id: str, command: str, host=False, internal: bool = False, timeout=None):
"""Run a bash command on the given node
Args:
id (:obj:`str`): the id of the cluster to run the command on.
node_id (:obj:`str`): the id of the node in the cluster to run the command on.
command (:obj:`str`): the bash command to execute on the node.
internal (:obj:`bool`): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
container_name=None (:obj:`str`, optional): the name of the container to run the command in.
If None, the command will run on the host VM. Defaults to None.
timeout=None (:obj:`str`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`aztk.spark.models.NodeOutput`: object containing the output of the run command
"""
return node_run.node_run(self._core_cluster_operations, id, node_id, command, host, internal, timeout)
def copy(self,
id: str,
source_path: str,
destination_path: str,
host: bool = False,
internal: bool = False,
timeout: int = None):
"""Copy a file to every node in a cluster.
Args:
id (:obj:`str`): the id of the cluster to copy files with.
source_path (:obj:`str`): the local path of the file to copy.
destination_path (:obj:`str`, optional): the path on each node the file is copied to.
container_name (:obj:`str`, optional): the name of the container to copy to or from.
If None, the copy operation will occur on the host VM, Defaults to None.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
timeout (:obj:`int`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`List[aztk.spark.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command.
"""
return copy.cluster_copy(self._core_cluster_operations, id, source_path, destination_path, host, internal, timeout)
def download(self,
id: str,
source_path: str,
destination_path: str = None,
host: bool = False,
internal: bool = False,
timeout: int = None):
"""Download a file from every node in a cluster.
Args:
id (:obj:`str`): the id of the cluster to copy files with.
source_path (:obj:`str`): the path of the file to copy from.
destination_path (:obj:`str`, optional): the local directory path where the output should be written.
If None, a SpooledTemporaryFile will be returned in the NodeOutput object, else the file will be
written to this path. Defaults to None.
container_name (:obj:`str`, optional): the name of the container to copy to or from.
If None, the copy operation will occur on the host VM, Defaults to None.
internal (:obj:`bool`, optional): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. Defaults to False.
timeout (:obj:`int`, optional): The timeout in seconds for establishing a connection to the node.
Defaults to None.
Returns:
:obj:`List[aztk.spark.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command.
"""
return download.cluster_download(self._core_cluster_operations, id, source_path, destination_path, host, internal,
timeout)
def diagnostics(self, id, output_directory=None):
"""Download a file from every node in a cluster.
Args:
id (:obj:`str`): the id of the cluster to copy files with.
output_directory (:obj:`str`, optional): the local directory path where the output should be written.
If None, a SpooledTemporaryFile will be returned in the NodeOutput object, else the file will be
written to this path. Defaults to None.
Returns:
:obj:`List[aztk.spark.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command.
"""
return diagnostics.run_cluster_diagnostics(self, id, output_directory)
def get_application_log(self, id: str, application_name: str, tail=False, current_bytes: int = 0):
"""Get the log for a running or completed application
Args:
id (:obj:`str`): the id of the cluster to run the command on.
application_name (:obj:`str`): str
tail (:obj:`bool`, optional): If True, get the remaining bytes after current_bytes. Otherwise, the whole log will be retrieved.
Only use this if streaming the log as it is being written. Defaults to False.
current_bytes (:obj:`int`): Specifies the last seen byte, so only the bytes after current_bytes are retrieved.
Only useful is streaming the log as it is being written. Only used if tail is True.
Returns:
:obj:`aztk.spark.models.ApplicationLog`: a model representing the output of the application.
"""
return get_application_log.get_application_log(self._core_cluster_operations, id, application_name, tail, current_bytes)
def get_remote_login_settings(self, id: str, node_id: str):
"""Get the remote login information for a node in a cluster
Args:
id (:obj:`str`): the id of the cluster the node is in
node_id (:obj:`str`): the id of the node in the cluster
Returns:
:obj:`aztk.spark.models.RemoteLogin`: Object that contains the ip address and port combination to login to a node
"""
return get_remote_login_settings.get_remote_login_settings(self._core_cluster_operations, id, node_id)
def wait(self, id: str, application_name: str):
"""Wait until the application has completed
Args:
id (:obj:`str`): the id of the cluster the application was submitted to
application_name (:obj:`str`): the name of the application to wait for
Returns:
:obj:`None`
"""
return wait.wait_for_application_to_complete(self._core_cluster_operations, id, application_name)

Просмотреть файл

@ -0,0 +1 @@
from .operations import JobOperations

Просмотреть файл

Просмотреть файл

@ -0,0 +1,39 @@
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
from .get_recent_job import get_recent_job
def _delete(core_job_operations, spark_job_operations, job_id, keep_logs: bool = False):
recent_run_job = get_recent_job(core_job_operations, job_id)
deleted_job_or_job_schedule = False
# delete job
try:
core_job_operations.batch_client.job.delete(recent_run_job.id)
deleted_job_or_job_schedule = True
except batch_models.batch_error.BatchErrorException:
pass
# delete job_schedule
try:
core_job_operations.batch_client.job_schedule.delete(job_id)
deleted_job_or_job_schedule = True
except batch_models.batch_error.BatchErrorException:
pass
# delete storage container
if keep_logs:
cluster_data = core_job_operations.get_cluster_data(job_id)
cluster_data.delete_container(job_id)
return deleted_job_or_job_schedule
def delete(core_job_operations, spark_job_operations, job_id: str, keep_logs: bool = False):
try:
return _delete(core_job_operations, spark_job_operations, job_id, keep_logs)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,32 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
from .get_recent_job import get_recent_job
def _get_job(core_job_operations, job_id):
job = core_job_operations.batch_client.job_schedule.get(job_id)
job_apps = [
app for app in core_job_operations.batch_client.task.list(job_id=job.execution_info.recent_job.id) if app.id != job_id
]
recent_run_job = get_recent_job(core_job_operations, job_id)
pool_prefix = recent_run_job.pool_info.auto_pool_specification.auto_pool_id_prefix
pool = nodes = None
for cloud_pool in core_job_operations.batch_client.pool.list():
if pool_prefix in cloud_pool.id:
pool = cloud_pool
break
if pool:
nodes = core_job_operations.batch_client.compute_node.list(pool_id=pool.id)
return job, job_apps, pool, nodes
def get_job(core_job_operations, job_id):
try:
job, apps, pool, nodes = _get_job(core_job_operations, job_id)
return models.Job(job, apps, pool, nodes)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,25 @@
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
from .get_recent_job import get_recent_job
def _get_application(spark_job_operations, job_id, application_name):
# info about the app
recent_run_job = get_recent_job(spark_job_operations._core_job_operations, job_id)
try:
return spark_job_operations._core_job_operations.batch_client.task.get(job_id=recent_run_job.id, task_id=application_name)
except batch_models.batch_error.BatchErrorException:
raise error.AztkError(
"The Spark application {0} is still being provisioned or does not exist.".format(application_name))
def get_application(spark_job_operations, job_id, application_name):
try:
return models.Application(_get_application(spark_job_operations, job_id, application_name))
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,40 @@
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
from .list_applications import list_applications
from .get_recent_job import get_recent_job
def _get_application_log(core_job_operations, spark_job_operations, job_id, application_name):
# TODO: change where the logs are uploaded so they aren't overwritten on scheduled runs
# current: job_id, application_name/output.log
# new: job_id, recent_run_job.id/application_name/output.log
recent_run_job = get_recent_job(core_job_operations, job_id)
try:
task = core_job_operations.batch_client.task.get(job_id=recent_run_job.id, task_id=application_name)
except batch_models.batch_error.BatchErrorException as e:
# see if the application is written to metadata of pool
applications = spark_job_operations.list_applications(job_id)
for application in applications:
if applications[application] is None and application == application_name:
raise error.AztkError("The application {0} has not yet been created.".format(application))
raise error.AztkError("The application {0} does not exist".format(application_name))
else:
if task.state in (batch_models.TaskState.active, batch_models.TaskState.running,
batch_models.TaskState.preparing):
raise error.AztkError("The application {0} has not yet finished executing.".format(application_name))
return core_job_operations.get_application_log(job_id, application_name)
def get_job_application_log(core_job_operations, spark_job_operations, job_id, application_name):
try:
return models.ApplicationLog(
_get_application_log(core_job_operations, spark_job_operations, job_id, application_name))
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,3 @@
def get_recent_job(core_job_operations, job_id):
job_schedule = core_job_operations.batch_client.job_schedule.get(job_id)
return core_job_operations.batch_client.job.get(job_schedule.execution_info.recent_job.id)

Просмотреть файл

@ -0,0 +1,16 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
def _list_jobs(core_job_operations):
return [cloud_job_schedule for cloud_job_schedule in core_job_operations.batch_client.job_schedule.list()]
def list_jobs(core_job_operations):
try:
return [models.Job(cloud_job_schedule) for cloud_job_schedule in _list_jobs(core_job_operations)]
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,35 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
from .get_recent_job import get_recent_job
def _list_applications(core_job_operations, job_id):
recent_run_job = get_recent_job(core_job_operations, job_id)
# get application names from Batch job metadata
applications = {}
for metadata_item in recent_run_job.metadata:
if metadata_item.name == "applications":
for app_name in metadata_item.value.split('\n'):
applications[app_name] = None
# get tasks from Batch job
for task in core_job_operations.batch_client.task.list(recent_run_job.id):
if task.id != job_id:
applications[task.id] = task
return applications
def list_applications(core_job_operations, job_id):
try:
applications = _list_applications(core_job_operations, job_id)
for item in applications:
if applications[item]:
applications[item] = models.Application(applications[item])
return applications
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,22 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
from .get_recent_job import get_recent_job
def _stop(core_job_operations, job_id):
# terminate currently running job and tasks
recent_run_job = get_recent_job(core_job_operations, job_id)
core_job_operations.batch_client.job.terminate(recent_run_job.id)
# terminate job_schedule
core_job_operations.batch_client.job_schedule.terminate(job_id)
def stop(self, job_id):
try:
return _stop(self, job_id)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,16 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.spark import models
from aztk.utils import helpers
from .get_recent_job import get_recent_job
def stop_app(core_job_operations, job_id, application_name):
recent_run_job = get_recent_job(core_job_operations, job_id)
# stop batch task
try:
core_job_operations.batch_client.task.terminate(job_id=recent_run_job.id, task_id=application_name)
return True
except batch_error.BatchErrorException:
return False

Просмотреть файл

@ -0,0 +1,116 @@
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
import yaml
from aztk import error
from aztk import models as base_models
from aztk.internal.cluster_data import NodeData
from aztk.spark import models
from aztk.utils import helpers
from aztk.utils.command_builder import CommandBuilder
def __app_cmd():
docker_exec = CommandBuilder("sudo docker exec")
docker_exec.add_argument("-i")
docker_exec.add_option("-e", "AZ_BATCH_TASK_WORKING_DIR=$AZ_BATCH_TASK_WORKING_DIR")
docker_exec.add_option("-e", "AZ_BATCH_JOB_ID=$AZ_BATCH_JOB_ID")
docker_exec.add_argument("spark /bin/bash >> output.log 2>&1 -c \"" \
"source ~/.bashrc; " \
"export PYTHONPATH=$PYTHONPATH:\$AZTK_WORKING_DIR; " \
"cd \$AZ_BATCH_TASK_WORKING_DIR; " \
"\$AZTK_WORKING_DIR/.aztk-env/.venv/bin/python \$AZTK_WORKING_DIR/aztk/node_scripts/job_submission.py\"")
return docker_exec.to_str()
def generate_job_manager_task(core_job_operations, job, application_tasks):
resource_files = []
for application, task in application_tasks:
task_definition_resource_file = helpers.upload_text_to_container(
container_name=job.id,
application_name=application.name + '.yaml',
file_path=application.name + '.yaml',
content=yaml.dump(task),
blob_client=core_job_operations.blob_client)
resource_files.append(task_definition_resource_file)
task_cmd = __app_cmd()
# Create task
task = batch_models.JobManagerTask(
id=job.id,
command_line=helpers.wrap_commands_in_shell([task_cmd]),
resource_files=resource_files,
kill_job_on_completion=False,
allow_low_priority_node=True,
user_identity=batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.task, elevation_level=batch_models.ElevationLevel.admin)))
return task
def _default_scheduling_target(vm_count: int):
if vm_count == 0:
return models.SchedulingTarget.Any
else:
return models.SchedulingTarget.Dedicated
def _apply_default_for_job_config(job_conf: models.JobConfiguration):
if job_conf.scheduling_target is None:
job_conf.scheduling_target = _default_scheduling_target(job_conf.max_dedicated_nodes)
return job_conf
def submit_job(core_job_operations, spark_job_operations, job_configuration: models.JobConfiguration, wait: bool = False):
try:
job_configuration = _apply_default_for_job_config(job_configuration)
job_configuration.validate()
cluster_data = core_job_operations.get_cluster_data(job_configuration.id)
node_data = NodeData(job_configuration.to_cluster_config()).add_core().done()
zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file()
start_task = spark_job_operations._generate_cluster_start_task(
core_job_operations,
zip_resource_files,
job_configuration.id,
job_configuration.gpu_enabled,
job_configuration.get_docker_repo(),
mixed_mode=job_configuration.mixed_mode(),
worker_on_master=job_configuration.worker_on_master)
application_tasks = []
for application in job_configuration.applications:
application_tasks.append((application,
spark_job_operations._generate_application_task(core_job_operations, job_configuration.id,
application)))
job_manager_task = generate_job_manager_task(core_job_operations, job_configuration, application_tasks)
software_metadata_key = base_models.Software.spark
vm_image = models.VmImage(publisher='Canonical', offer='UbuntuServer', sku='16.04')
autoscale_formula = "$TargetDedicatedNodes = {0}; " \
"$TargetLowPriorityNodes = {1}".format(
job_configuration.max_dedicated_nodes,
job_configuration.max_low_pri_nodes)
job = core_job_operations.submit(
job_configuration=job_configuration,
start_task=start_task,
job_manager_task=job_manager_task,
autoscale_formula=autoscale_formula,
software_metadata_key=software_metadata_key,
vm_image_model=vm_image,
application_metadata='\n'.join(application.name for application in (job_configuration.applications or [])))
if wait:
spark_job_operations.wait(id=job_configuration.id)
return models.Job(job)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,22 @@
import time
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def _wait_until_job_finished(core_job_operations, job_id):
job_state = core_job_operations.batch_client.job_schedule.get(job_id).state
while job_state != batch_models.JobScheduleState.completed:
time.sleep(3)
job_state = core_job_operations.batch_client.job_schedule.get(job_id).state
def wait_until_job_finished(core_job_operations, job_id):
try:
_wait_until_job_finished(core_job_operations, job_id)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -0,0 +1,134 @@
from aztk.client.job import CoreJobOperations
from aztk.spark import models
from aztk.spark.client.base import SparkBaseOperations
from .helpers import (delete, get, get_application, get_application_log, list, list_applications, stop,
stop_application, submit, wait_until_complete)
class JobOperations(SparkBaseOperations):
"""Spark ClusterOperations object
Attributes:
_core_job_operations (:obj:`aztk.client.cluster.CoreJobOperations`):
"""
def __init__(self, context):
self._core_job_operations = CoreJobOperations(context)
# self._spark_base_cluster_operations = SparkBaseOperations()
def list(self):
"""List all jobs.
Returns:
:obj:`List[Job]`: List of aztk.models.Job objects each representing the state and configuration of the job.
"""
return list.list_jobs(self._core_job_operations)
def delete(self, id, keep_logs: bool = False):
"""Delete a job.
Args:
id (:obj:`str`): the id of the job to delete.
keep_logs (:obj:`bool`): If True, the logs related to this job in Azure Storage are not deleted.
Defaults to False.
Returns:
:obj:`bool`: True if the deletion process was successful.
"""
return delete.delete(self._core_job_operations, self, id, keep_logs)
def get(self, id):
"""Get details about the state of a job.
Args:
id (:obj:`str`): the id of the job to get.
Returns:
:obj:`aztk.spark.models.job`: A job object representing the state and configuration of the job.
"""
return get.get_job(self._core_job_operations, id)
def get_application(self, id, application_name):
"""Get information on a submitted application
Args:
id (:obj:`str`): the name of the job the application was submitted to
application_name (:obj:`str`): the name of the application to get
Returns:
:obj:`aztk.spark.models.Application`: object representing that state and output of an application
"""
return get_application.get_application(self, id, application_name)
def get_application_log(self, id, application_name):
"""Get the log for a running or completed application
Args:
id (:obj:`str`): the id of the job the application was submitted to.
application_name (:obj:`str`): the name of the application to get the log of
Returns:
:obj:`aztk.spark.models.ApplicationLog`: a model representing the output of the application.
"""
return get_application_log.get_job_application_log(self._core_job_operations, self, id, application_name)
def list_applications(self, id):
"""List all application defined as a part of a job
Args:
id (:obj:`str`): the id of the job to list the applications of
Returns:
:obj:`List[aztk.spark.models.Application]`: a list of all applications defined as a part of the job
"""
return list_applications.list_applications(self._core_job_operations, id)
def stop(self, id):
"""Stop a submitted job
Args:
id (:obj:`str`): the id of the job to stop
Returns:
:obj:`None`
"""
return stop.stop(self._core_job_operations, id)
def stop_application(self, id, application_name):
"""Stops a submitted application
Args:
id (:obj:`str`): the id of the job the application belongs to
application_name (:obj:`str`): the name of the application to stop
Returns:
:obj:`bool`: True if the stop was successful, else False
"""
return stop_application.stop_app(self._core_job_operations, id, application_name)
def submit(self, job_configuration: models.JobConfiguration, wait: bool = False):
"""Submit a job
Jobs are a cluster definition and one or many application definitions which run on the cluster. The job's
cluster will be allocated and configured, then the applications will be executed with their output stored
in Azure Storage. When all applications have completed, the cluster will be automatically deleted.
Args:
job_configuration (:obj:`aztk.spark.models.JobConfiguration`): Model defining the job's configuration.
wait (:obj:`bool`): If True, blocks until job is completed. Defaults to False.
Returns:
:obj:`aztk.spark.models.Job`: Model representing the state of the job.
"""
return submit.submit_job(self._core_job_operations, self, job_configuration, wait)
def wait(self, id):
"""Wait until the job has completed.
Args:
id (:obj:`str`): the id of the job the application belongs to
Returns:
:obj:`None`
"""
wait_until_complete.wait_until_job_finished(self._core_job_operations, id)

Просмотреть файл

@ -0,0 +1,2 @@
# ALL FILES IN THIS DIRECTORY ARE DEPRECATED, WILL BE REMOTE IN v0.9.0

Просмотреть файл

@ -1,13 +1,13 @@
import time import time
import azure.batch.models as batch_models
import azure import azure
import azure.batch.models as batch_models
import azure.batch.models.batch_error as batch_error import azure.batch.models.batch_error as batch_error
from aztk import error from aztk import error
from aztk.utils import helpers from aztk import models as base_models
from aztk.utils import constants
from aztk.spark import models from aztk.spark import models
from aztk.utils import constants, helpers
output_file = constants.TASK_WORKING_DIR + \ output_file = constants.TASK_WORKING_DIR + \
"/" + constants.SPARK_SUBMIT_LOGS_FILE "/" + constants.SPARK_SUBMIT_LOGS_FILE
@ -53,14 +53,14 @@ def get_log_from_storage(blob_client, container_name, application_name, task):
blob = blob_client.get_blob_to_text(container_name, application_name + '/' + constants.SPARK_SUBMIT_LOGS_FILE) blob = blob_client.get_blob_to_text(container_name, application_name + '/' + constants.SPARK_SUBMIT_LOGS_FILE)
except azure.common.AzureMissingResourceHttpError: except azure.common.AzureMissingResourceHttpError:
raise error.AztkError("Logs not found in your storage account. They were either deleted or never existed.") raise error.AztkError("Logs not found in your storage account. They were either deleted or never existed.")
base_model = base_models.ApplicationLog(
return models.ApplicationLog(
name=application_name, name=application_name,
cluster_id=container_name, cluster_id=container_name,
application_state=task.state._value_, application_state=task.state._value_,
log=blob.content, log=blob.content,
total_bytes=blob.properties.content_length, total_bytes=blob.properties.content_length,
exit_code = task.execution_info.exit_code) exit_code=task.execution_info.exit_code)
return models.ApplicationLog(base_model)
def get_log(batch_client, blob_client, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0): def get_log(batch_client, blob_client, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
@ -85,19 +85,20 @@ def get_log(batch_client, blob_client, cluster_id: str, application_name: str, t
stream = batch_client.file.get_from_task( stream = batch_client.file.get_from_task(
job_id, task_id, output_file, batch_models.FileGetFromTaskOptions(ocp_range=ocp_range)) job_id, task_id, output_file, batch_models.FileGetFromTaskOptions(ocp_range=ocp_range))
content = helpers.read_stream_as_string(stream) content = helpers.read_stream_as_string(stream)
base_model = base_models.ApplicationLog(
return models.ApplicationLog(
name=application_name, name=application_name,
cluster_id=cluster_id, cluster_id=cluster_id,
application_state=task.state._value_, application_state=task.state._value_,
log=content, log=content,
total_bytes=target_bytes, total_bytes=target_bytes,
exit_code=task.execution_info.exit_code) exit_code=task.execution_info.exit_code)
return models.ApplicationLog(base_model)
else: else:
return models.ApplicationLog( base_model = base_models.ApplicationLog(
name=application_name, name=application_name,
cluster_id=cluster_id, cluster_id=cluster_id,
application_state=task.state._value_, application_state=task.state._value_,
log='', log='',
total_bytes=target_bytes, total_bytes=target_bytes,
exit_code=task.execution_info.exit_code) exit_code=task.execution_info.exit_code)
return models.ApplicationLog(base_model)

Просмотреть файл

@ -17,10 +17,10 @@ class SparkToolkit(aztk.models.Toolkit):
class Cluster(aztk.models.Cluster): class Cluster(aztk.models.Cluster):
def __init__(self, pool: batch_models.CloudPool = None, nodes: batch_models.ComputeNodePaged = None): def __init__(self, cluster: aztk.models.Cluster):
super().__init__(pool, nodes) super().__init__(cluster.pool, cluster.nodes)
self.master_node_id = self.__get_master_node_id() self.master_node_id = self.__get_master_node_id()
self.gpu_enabled = helpers.is_gpu_enabled(pool.vm_size) self.gpu_enabled = helpers.is_gpu_enabled(cluster.pool.vm_size)
def is_pool_running_spark(self, pool: batch_models.CloudPool): def is_pool_running_spark(self, pool: batch_models.CloudPool):
if pool.metadata is None: if pool.metadata is None:
@ -47,7 +47,9 @@ class Cluster(aztk.models.Cluster):
class RemoteLogin(aztk.models.RemoteLogin): class RemoteLogin(aztk.models.RemoteLogin):
pass def __init__(self, remote_login: aztk.models.RemoteLogin):
super().__init__(remote_login.ip_address, remote_login.port)
class PortForwardingSpecification(aztk.models.PortForwardingSpecification): class PortForwardingSpecification(aztk.models.PortForwardingSpecification):
pass pass
@ -286,16 +288,16 @@ class Job():
self.creation_time = cloud_job_schedule.creation_time self.creation_time = cloud_job_schedule.creation_time
self.applications = [Application(task) for task in (cloud_tasks or [])] self.applications = [Application(task) for task in (cloud_tasks or [])]
if pool: if pool:
self.cluster = Cluster(pool, nodes) self.cluster = Cluster(aztk.models.Cluster(pool, nodes))
else: else:
self.cluster = None self.cluster = None
class ApplicationLog(): class ApplicationLog(aztk.models.ApplicationLog):
def __init__(self, name: str, cluster_id: str, log: str, total_bytes: int, application_state: batch_models.TaskState, exit_code: int): def __init__(self, application_log: aztk.models.ApplicationLog):
self.name = name self.name = application_log.name
self.cluster_id = cluster_id # TODO: change to something cluster/job agnostic self.cluster_id = application_log.cluster_id # TODO: change to something cluster/job agnostic
self.log = log self.log = application_log.log
self.total_bytes = total_bytes self.total_bytes = application_log.total_bytes
self.application_state = application_state self.application_state = application_log.application_state
self.exit_code = exit_code self.exit_code = application_log.exit_code

Просмотреть файл

@ -0,0 +1,3 @@
from aztk.spark import models
SPARK_VM_IMAGE = models.VmImage(publisher='Canonical', offer='UbuntuServer', sku='16.04')

Просмотреть файл

@ -17,18 +17,18 @@ class MasterInvalidStateError(Exception):
pass pass
def wait_for_master_to_be_ready(client, cluster_id: str): def wait_for_master_to_be_ready(core_operations, spark_operations, cluster_id: str):
master_node_id = None master_node_id = None
start_time = datetime.datetime.now() start_time = datetime.datetime.now()
while True: while True:
if not master_node_id: if not master_node_id:
master_node_id = client.get_cluster(cluster_id).master_node_id master_node_id = spark_operations.get(cluster_id).master_node_id
if not master_node_id: if not master_node_id:
time.sleep(5) time.sleep(5)
continue continue
master_node = client.batch_client.compute_node.get(cluster_id, master_node_id) master_node = core_operations.batch_client.compute_node.get(cluster_id, master_node_id)
if master_node.state in [batch_models.ComputeNodeState.idle, batch_models.ComputeNodeState.running]: if master_node.state in [batch_models.ComputeNodeState.idle, batch_models.ComputeNodeState.running]:
break break

Просмотреть файл

@ -30,12 +30,12 @@ def execute(args: typing.NamedTuple):
if args.ssh_key: if args.ssh_key:
ssh_key = args.ssh_key ssh_key = args.ssh_key
else: else:
ssh_key = spark_client.secrets_config.ssh_pub_key ssh_key = spark_client.secrets_configuration.ssh_pub_key
ssh_key, password = utils.get_ssh_key_or_prompt(ssh_key, args.username, args.password, spark_client.secrets_config) ssh_key, password = utils.get_ssh_key_or_prompt(ssh_key, args.username, args.password, spark_client.secrets_configuration)
spark_client.create_user( spark_client.cluster.create_user(
cluster_id=args.cluster_id, id=args.cluster_id,
username=args.username, username=args.username,
password=password, password=password,
ssh_key=ssh_key ssh_key=ssh_key

Просмотреть файл

@ -30,7 +30,7 @@ def execute(args: typing.NamedTuple):
if args.tail: if args.tail:
utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.app_name) utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.app_name)
else: else:
app_log = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.app_name) app_log = spark_client.cluster.get_application_log(id=args.cluster_id, application_name=args.app_name)
if args.output: if args.output:
with utils.Spinner(): with utils.Spinner():
with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f: with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f:

Просмотреть файл

@ -24,8 +24,8 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
with utils.Spinner(): with utils.Spinner():
copy_output = spark_client.cluster_copy( copy_output = spark_client.cluster.copy(
cluster_id=args.cluster_id, id=args.cluster_id,
source_path=args.source_path, source_path=args.source_path,
destination_path=args.dest_path, destination_path=args.dest_path,
internal=args.internal internal=args.internal

Просмотреть файл

@ -66,10 +66,10 @@ def execute(args: typing.NamedTuple):
user_configuration = cluster_conf.user_configuration user_configuration = cluster_conf.user_configuration
if user_configuration and user_configuration.username: if user_configuration and user_configuration.username:
ssh_key, password = utils.get_ssh_key_or_prompt(spark_client.secrets_config.ssh_pub_key, ssh_key, password = utils.get_ssh_key_or_prompt(spark_client.secrets_configuration.ssh_pub_key,
user_configuration.username, user_configuration.username,
user_configuration.password, user_configuration.password,
spark_client.secrets_config) spark_client.secrets_configuration)
cluster_conf.user_configuration = aztk.spark.models.UserConfiguration( cluster_conf.user_configuration = aztk.spark.models.UserConfiguration(
username=user_configuration.username, username=user_configuration.username,
password=password, password=password,
@ -82,8 +82,8 @@ def execute(args: typing.NamedTuple):
utils.print_cluster_conf(cluster_conf, wait) utils.print_cluster_conf(cluster_conf, wait)
with utils.Spinner(): with utils.Spinner():
# create spark cluster # create spark cluster
cluster = spark_client.create_cluster( cluster = spark_client.cluster.create(
cluster_conf, cluster_configuration=cluster_conf,
wait=wait wait=wait
) )

Просмотреть файл

@ -22,5 +22,5 @@ def execute(args: typing.NamedTuple):
if not args.output: if not args.output:
args.output = os.path.join(os.getcwd(), "debug-{0}-{1}".format(args.cluster_id, timestr)) args.output = os.path.join(os.getcwd(), "debug-{0}-{1}".format(args.cluster_id, timestr))
with utils.Spinner(): with utils.Spinner():
spark_client.run_cluster_diagnostics(cluster_id=args.cluster_id, output_directory=args.output) spark_client.cluster.diagnostics(id=args.cluster_id, output_directory=args.output)
# TODO: analyze results, display some info about status # TODO: analyze results, display some info about status

Просмотреть файл

@ -40,7 +40,7 @@ def execute(args: typing.NamedTuple):
log.error("Confirmation cluster id does not match. Please try again.") log.error("Confirmation cluster id does not match. Please try again.")
return return
if spark_client.delete_cluster(cluster_id, args.keep_logs): if spark_client.cluster.delete(id=cluster_id, keep_logs=args.keep_logs):
log.info("Deleting cluster %s", cluster_id) log.info("Deleting cluster %s", cluster_id)
else: else:
log.error("Cluster with id '%s' doesn't exist or was already deleted.", cluster_id) log.error("Cluster with id '%s' doesn't exist or was already deleted.", cluster_id)

Просмотреть файл

@ -23,10 +23,10 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
cluster_id = args.cluster_id cluster_id = args.cluster_id
cluster = spark_client.get_cluster(cluster_id) cluster = spark_client.cluster.get(cluster_id)
utils.print_cluster(spark_client, cluster, args.internal) utils.print_cluster(spark_client, cluster, args.internal)
configuration = spark_client.get_cluster_config(cluster_id) configuration = spark_client.cluster.get_cluster_config(cluster_id)
if configuration and args.show_config: if configuration and args.show_config:
log.info("-------------------------------------------") log.info("-------------------------------------------")
log.info("Cluster configuration:") log.info("Cluster configuration:")

Просмотреть файл

@ -16,7 +16,7 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
clusters = spark_client.list_clusters() clusters = spark_client.cluster.list()
if args.quiet: if args.quiet:
utils.print_clusters_quiet(clusters) utils.print_clusters_quiet(clusters)
else: else:

Просмотреть файл

@ -27,8 +27,8 @@ def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
with utils.Spinner(): with utils.Spinner():
if args.node_id: if args.node_id:
results = [spark_client.node_run(args.cluster_id, args.node_id, args.command, args.host, args.internal)] results = [spark_client.cluster.node_run(args.cluster_id, args.node_id, args.command, args.host, args.internal)]
else: else:
results = spark_client.cluster_run(args.cluster_id, args.command, args.host, args.internal) results = spark_client.cluster.run(args.cluster_id, args.command, args.host, args.internal)
[utils.log_node_run_output(node_output) for node_output in results] [utils.log_node_run_output(node_output) for node_output in results]

Просмотреть файл

@ -31,8 +31,8 @@ http_prefix = 'http://localhost:'
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
cluster = spark_client.get_cluster(args.cluster_id) cluster = spark_client.cluster.get(args.cluster_id)
cluster_config = spark_client.get_cluster_config(args.cluster_id) cluster_config = spark_client.cluster.get_cluster_config(args.cluster_id)
ssh_conf = SshConfig() ssh_conf = SshConfig()
ssh_conf.merge( ssh_conf.merge(
@ -93,7 +93,7 @@ def native_python_ssh_into_master(spark_client, cluster, ssh_conf, password):
log.warning("No ssh client found, using pure python connection.") log.warning("No ssh client found, using pure python connection.")
return return
configuration = spark_client.get_cluster_config(cluster.id) configuration = spark_client.cluster.get_cluster_config(cluster.id)
plugin_ports = [] plugin_ports = []
if configuration and configuration.plugins: if configuration and configuration.plugins:
ports = [ ports = [
@ -104,7 +104,7 @@ def native_python_ssh_into_master(spark_client, cluster, ssh_conf, password):
plugin_ports.extend(ports) plugin_ports.extend(ports)
print("Press ctrl+c to exit...") print("Press ctrl+c to exit...")
spark_client.cluster_ssh_into_master( spark_client.cluster.ssh_into_master(
cluster.id, cluster.id,
cluster.master_node_id, cluster.master_node_id,
ssh_conf.username, ssh_conf.username,

Просмотреть файл

@ -134,8 +134,8 @@ def execute(args: typing.NamedTuple):
log.info("-------------------------------------------") log.info("-------------------------------------------")
spark_client.submit( spark_client.cluster.submit(
cluster_id=args.cluster_id, id=args.cluster_id,
application = aztk.spark.models.ApplicationConfiguration( application = aztk.spark.models.ApplicationConfiguration(
name=args.name, name=args.name,
application=args.app, application=args.app,
@ -162,8 +162,8 @@ def execute(args: typing.NamedTuple):
exit_code = utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.name) exit_code = utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.name)
else: else:
with utils.Spinner(): with utils.Spinner():
spark_client.wait_until_application_done(cluster_id=args.cluster_id, task_id=args.name) spark_client.cluster.wait(id=args.cluster_id, application_name=args.name) # TODO: replace wait_until_application_done
application_log = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.name) application_log = spark_client.cluster.get_application_log(id=args.cluster_id, application_name=args.name)
with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f: with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f:
f.write(application_log.log) f.write(application_log.log)
exit_code = application_log.exit_code exit_code = application_log.exit_code

Просмотреть файл

@ -29,7 +29,7 @@ def execute(args: typing.NamedTuple):
if not args.force: if not args.force:
# check if job exists before prompting for confirmation # check if job exists before prompting for confirmation
spark_client.get_job(job_id) spark_client.job.get(id=job_id)
if not args.keep_logs: if not args.keep_logs:
log.warning("All logs persisted for this job will be deleted.") log.warning("All logs persisted for this job will be deleted.")
@ -40,7 +40,7 @@ def execute(args: typing.NamedTuple):
log.error("Confirmation cluster id does not match. Please try again.") log.error("Confirmation cluster id does not match. Please try again.")
return return
if spark_client.delete_job(job_id, args.keep_logs): if spark_client.job.delete(id=job_id, keep_logs=args.keep_logs):
log.info("Deleting Job %s", job_id) log.info("Deleting Job %s", job_id)
else: else:
log.error("Job with id '%s' doesn't exist or was already deleted.", job_id) log.error("Job with id '%s' doesn't exist or was already deleted.", job_id)

Просмотреть файл

@ -16,4 +16,4 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
utils.print_job(spark_client, spark_client.get_job(args.job_id)) utils.print_job(spark_client, spark_client.job.get(id=args.job_id))

Просмотреть файл

@ -20,4 +20,4 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
utils.print_application(spark_client.get_application(args.job_id, args.app_name)) utils.print_application(spark_client.job.get_application(args.job_id, args.app_name))

Просмотреть файл

@ -22,7 +22,7 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
app_log = spark_client.get_job_application_log(args.job_id, args.app_name) app_log = spark_client.job.get_application_log(args.job_id, args.app_name)
if args.output: if args.output:
with utils.Spinner(): with utils.Spinner():
with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f: with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f:

Просмотреть файл

@ -13,4 +13,4 @@ def setup_parser(_: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
utils.print_jobs(spark_client.list_jobs()) utils.print_jobs(spark_client.job.list())

Просмотреть файл

@ -14,4 +14,4 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
utils.print_applications(spark_client.list_applications(args.job_id)) utils.print_applications(spark_client.job.list_applications(args.job_id))

Просмотреть файл

@ -15,5 +15,5 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
spark_client.stop_job(args.job_id) spark_client.job.stop(args.job_id)
log.print("Stopped Job {0}".format(args.job_id)) log.print("Stopped Job {0}".format(args.job_id))

Просмотреть файл

@ -20,7 +20,7 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple): def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_secrets()) spark_client = aztk.spark.Client(config.load_aztk_secrets())
if spark_client.stop_job_app(args.job_id, args.app_name): if spark_client.job.stop_application(args.job_id, args.app_name):
log.info("Stopped app {0}".format(args.app_name)) log.info("Stopped app {0}".format(args.app_name))
else: else:
log.error("App with name {0} does not exist or was already deleted") log.error("App with name {0} does not exist or was already deleted")

Просмотреть файл

@ -48,4 +48,4 @@ def execute(args: typing.NamedTuple):
) )
#TODO: utils.print_job_conf(job_configuration) #TODO: utils.print_job_conf(job_configuration)
spark_client.submit_job(job_configuration) spark_client.job.submit(job_configuration)

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше