Feature: 0.10.0 remove deprecated code (#671)

* remove deprecated code

* remove unused imports

* fix field name

* remove unused imports

* clean up comments, linting warnings
This commit is contained in:
Jacob Freck 2018-10-24 10:43:47 -07:00 коммит произвёл GitHub
Родитель 4408c4fc41
Коммит fc5053654c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
18 изменённых файлов: 10 добавлений и 1941 удалений

Просмотреть файл

@ -7,11 +7,8 @@ from aztk.utils import helpers
def get_task_state(core_cluster_operations, cluster_id: str, task_id: str):
try:
# TODO: return TaskState object instead of str
scheduling_target = core_cluster_operations.get_cluster_configuration(cluster_id).scheduling_target
if scheduling_target is not SchedulingTarget.Any:
# get storage table entry for this application
# convert entry to application_status
task = core_cluster_operations.get_task_from_table(cluster_id, task_id)
return task.state
else:

Просмотреть файл

@ -1,4 +1,4 @@
from aztk.models import SchedulingTarget, Task
from aztk.models import SchedulingTarget
from .get_recent_job import get_recent_job
from .task_table import list_task_table_entries
@ -12,7 +12,7 @@ def list_tasks(core_base_operations, id):
Args:
id: cluster or job id
Returns:
List[aztk.models.Task]
List[aztk.models.Task]
"""
scheduling_target = core_base_operations.get_cluster_configuration(id).scheduling_target

Просмотреть файл

@ -1,5 +1,4 @@
import aztk.error as error
import aztk.models as models
from aztk import error, models
from aztk.utils import ssh as ssh_lib

Просмотреть файл

@ -1,20 +1,5 @@
import asyncio
import concurrent.futures
from datetime import datetime, timedelta, timezone
import azure.batch.models as batch_models
from azure.batch.models import BatchErrorException
from Cryptodome.PublicKey import RSA
import aztk.error as error
import aztk.models as models
import aztk.utils.azure_api as azure_api
import aztk.utils.constants as constants
import aztk.utils.get_ssh_key as get_ssh_key
import aztk.utils.helpers as helpers
import aztk.utils.ssh as ssh_lib
from aztk.internal import cluster_data
from aztk.utils import deprecated, secure_utils
from aztk import models
from aztk.utils import azure_api
class CoreClient:
@ -29,6 +14,7 @@ class CoreClient:
self.secrets_configuration = None
self.batch_client = None
self.blob_client = None
self.table_service = None
def _get_context(self, secrets_configuration: models.SecretsConfiguration):
self.secrets_configuration = secrets_configuration
@ -44,423 +30,3 @@ class CoreClient:
"secrets_configuration": self.secrets_configuration,
}
return context
# ALL THE FOLLOWING METHODS ARE DEPRECATED AND WILL BE REMOVED IN 0.10.0
@deprecated("0.10.0")
def get_cluster_config(self, cluster_id: str) -> models.ClusterConfiguration:
return self._get_cluster_data(cluster_id).read_cluster_config()
@deprecated("0.10.0")
def _get_cluster_data(self, cluster_id: str) -> cluster_data.ClusterData:
"""
Returns ClusterData object to manage data related to the given cluster id
"""
return cluster_data.ClusterData(self.blob_client, cluster_id)
"""
General Batch Operations
"""
@deprecated("0.10.0")
def __delete_pool_and_job(self, pool_id: str, keep_logs: bool = False):
"""
Delete a pool and it's associated job
:param cluster_id: the pool to add the user to
:return bool: deleted the pool if exists and job if exists
"""
# job id is equal to pool id
job_id = pool_id
job_exists = True
try:
self.batch_client.job.get(job_id)
except batch_models.BatchErrorException:
job_exists = False
pool_exists = self.batch_client.pool.exists(pool_id)
if job_exists:
self.batch_client.job.delete(job_id)
if pool_exists:
self.batch_client.pool.delete(pool_id)
if not keep_logs:
cluster_data = self._get_cluster_data(pool_id)
cluster_data.delete_container(pool_id)
return job_exists or pool_exists
@deprecated("0.10.0")
def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task,
VmImageModel):
"""
Create a pool and job
:param cluster_conf: the configuration object used to create the cluster
:type cluster_conf: aztk.models.ClusterConfiguration
:parm software_metadata_key: the id of the software being used on the cluster
:param start_task: the start task for the cluster
:param VmImageModel: the type of image to provision for the cluster
:param wait: wait until the cluster is ready
"""
self._get_cluster_data(cluster_conf.cluster_id).save_cluster_config(cluster_conf)
# reuse pool_id as job_id
pool_id = cluster_conf.cluster_id
job_id = cluster_conf.cluster_id
# Get a verified node agent sku
sku_to_use, image_ref_to_use = helpers.select_latest_verified_vm_image_with_node_agent_sku(
VmImageModel.publisher, VmImageModel.offer, VmImageModel.sku, self.batch_client)
network_conf = None
if cluster_conf.subnet_id is not None:
network_conf = batch_models.NetworkConfiguration(subnet_id=cluster_conf.subnet_id)
auto_scale_formula = "$TargetDedicatedNodes={0}; $TargetLowPriorityNodes={1}".format(
cluster_conf.size, cluster_conf.size_low_priority)
# Configure the pool
pool = batch_models.PoolAddParameter(
id=pool_id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use),
vm_size=cluster_conf.vm_size,
enable_auto_scale=True,
auto_scale_formula=auto_scale_formula,
auto_scale_evaluation_interval=timedelta(minutes=5),
start_task=start_task,
enable_inter_node_communication=True if not cluster_conf.subnet_id else False,
max_tasks_per_node=4,
network_configuration=network_conf,
metadata=[
batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(
name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_CLUSTER_MODE_METADATA),
],
)
# Create the pool + create user for the pool
helpers.create_pool_if_not_exist(pool, self.batch_client)
# Create job
job = batch_models.JobAddParameter(id=job_id, pool_info=batch_models.PoolInformation(pool_id=pool_id))
# Add job to batch
self.batch_client.job.add(job)
return helpers.get_cluster(cluster_conf.cluster_id, self.batch_client)
@deprecated("0.10.0")
def __get_pool_details(self, cluster_id: str):
"""
Print the information for the given cluster
:param cluster_id: Id of the cluster
:return pool: CloudPool, nodes: ComputeNodePaged
"""
pool = self.batch_client.pool.get(cluster_id)
nodes = self.batch_client.compute_node.list(pool_id=cluster_id)
return pool, nodes
@deprecated("0.10.0")
def __list_clusters(self, software_metadata_key):
"""
List all the cluster on your account.
"""
pools = self.batch_client.pool.list()
software_metadata = (constants.AZTK_SOFTWARE_METADATA_KEY, software_metadata_key)
cluster_metadata = (constants.AZTK_MODE_METADATA_KEY, constants.AZTK_CLUSTER_MODE_METADATA)
aztk_pools = []
for pool in [pool for pool in pools if pool.metadata]:
pool_metadata = [(metadata.name, metadata.value) for metadata in pool.metadata]
if all([metadata in pool_metadata for metadata in [software_metadata, cluster_metadata]]):
aztk_pools.append(pool)
return aztk_pools
@deprecated("0.10.0")
def __create_user(self, pool_id: str, node_id: str, username: str, password: str = None,
ssh_key: str = None) -> str:
"""
Create a pool user
:param pool: the pool to add the user to
:param node: the node to add the user to
:param username: username of the user to add
:param password: password of the user to add
:param ssh_key: ssh_key of the user to add
"""
# Create new ssh user for the given node
self.batch_client.compute_node.add_user(
pool_id,
node_id,
batch_models.ComputeNodeUser(
name=username,
is_admin=True,
password=password,
ssh_public_key=get_ssh_key.get_user_public_key(ssh_key, self.secrets_configuration),
expiry_time=datetime.now(timezone.utc) + timedelta(days=365),
),
)
@deprecated("0.10.0")
def __delete_user(self, pool_id: str, node_id: str, username: str) -> str:
"""
Create a pool user
:param pool: the pool to add the user to
:param node: the node to add the user to
:param username: username of the user to add
"""
# Delete a user on the given node
self.batch_client.compute_node.delete_user(pool_id, node_id, username)
@deprecated("0.10.0")
def __get_remote_login_settings(self, pool_id: str, node_id: str):
"""
Get the remote_login_settings for node
:param pool_id
:param node_id
:returns aztk.models.RemoteLogin
"""
result = self.batch_client.compute_node.get_remote_login_settings(pool_id, node_id)
return models.RemoteLogin(ip_address=result.remote_login_ip_address, port=str(result.remote_login_port))
@deprecated("0.10.0")
def __create_user_on_node(self, username, pool_id, node_id, ssh_key=None, password=None):
try:
self.__create_user(pool_id=pool_id, node_id=node_id, username=username, ssh_key=ssh_key, password=password)
except BatchErrorException as error:
try:
self.__delete_user(pool_id, node_id, username)
self.__create_user(pool_id=pool_id, node_id=node_id, username=username, ssh_key=ssh_key)
except BatchErrorException as error:
raise error
@deprecated("0.10.0")
def __generate_user_on_node(self, pool_id, node_id):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey("OpenSSH").decode("utf-8")
self.__create_user_on_node(generated_username, pool_id, node_id, ssh_pub_key)
return generated_username, ssh_key
@deprecated("0.10.0")
def __generate_user_on_pool(self, pool_id, nodes):
generated_username = secure_utils.generate_random_string()
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey("OpenSSH").decode("utf-8")
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(self.__create_user_on_node, generated_username, pool_id, node.id, ssh_pub_key): node
for node in nodes
}
concurrent.futures.wait(futures)
return generated_username, ssh_key
@deprecated("0.10.0")
def __create_user_on_pool(self, username, pool_id, nodes, ssh_pub_key=None, password=None):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(self.__create_user_on_node, username, pool_id, node.id, ssh_pub_key, password): node
for node in nodes
}
concurrent.futures.wait(futures)
@deprecated("0.10.0")
def __delete_user_on_pool(self, username, pool_id, nodes):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.__delete_user, pool_id, node.id, username) for node in nodes]
concurrent.futures.wait(futures)
@deprecated("0.10.0")
def __node_run(self, cluster_id, node_id, command, internal, container_name=None, timeout=None):
pool, nodes = self.__get_pool_details(cluster_id)
try:
node = next(node for node in nodes if node.id == node_id)
except StopIteration:
raise error.AztkError("Node with id {} not found".format(node_id))
if internal:
node_rls = models.RemoteLogin(ip_address=node.ip_address, port="22")
else:
node_rls = self.__get_remote_login_settings(pool.id, node.id)
try:
generated_username, ssh_key = self.__generate_user_on_node(pool.id, node.id)
output = ssh_lib.node_exec_command(
node.id,
command,
generated_username,
node_rls.ip_address,
node_rls.port,
ssh_key=ssh_key.exportKey().decode("utf-8"),
container_name=container_name,
timeout=timeout,
)
return output
finally:
self.__delete_user(cluster_id, node.id, generated_username)
@deprecated("0.10.0")
def __cluster_run(self, cluster_id, command, internal, container_name=None, timeout=None):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = list(nodes)
if internal:
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
else:
cluster_nodes = [(node, self.__get_remote_login_settings(pool.id, node.id)) for node in nodes]
try:
generated_username, ssh_key = self.__generate_user_on_pool(pool.id, nodes)
output = asyncio.get_event_loop().run_until_complete(
ssh_lib.clus_exec_command(
command,
generated_username,
cluster_nodes,
ssh_key=ssh_key.exportKey().decode("utf-8"),
container_name=container_name,
timeout=timeout,
))
return output
except OSError as exc:
raise exc
finally:
self.__delete_user_on_pool(generated_username, pool.id, nodes)
@deprecated("0.10.0")
def __cluster_copy(
self,
cluster_id,
source_path,
destination_path=None,
container_name=None,
internal=False,
get=False,
timeout=None,
):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = list(nodes)
if internal:
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
else:
cluster_nodes = [(node, self.__get_remote_login_settings(pool.id, node.id)) for node in nodes]
try:
generated_username, ssh_key = self.__generate_user_on_pool(pool.id, nodes)
output = asyncio.get_event_loop().run_until_complete(
ssh_lib.clus_copy(
container_name=container_name,
username=generated_username,
nodes=cluster_nodes,
source_path=source_path,
destination_path=destination_path,
ssh_key=ssh_key.exportKey().decode("utf-8"),
get=get,
timeout=timeout,
))
return output
except (OSError, BatchErrorException) as exc:
raise exc
finally:
self.__delete_user_on_pool(generated_username, pool.id, nodes)
@deprecated("0.10.0")
def __ssh_into_node(self,
pool_id,
node_id,
username,
ssh_key=None,
password=None,
port_forward_list=None,
internal=False):
if internal:
result = self.batch_client.compute_node.get(pool_id=pool_id, node_id=node_id)
rls = models.RemoteLogin(ip_address=result.ip_address, port="22")
else:
result = self.batch_client.compute_node.get_remote_login_settings(pool_id, node_id)
rls = models.RemoteLogin(ip_address=result.remote_login_ip_address, port=str(result.remote_login_port))
ssh_lib.node_ssh(
username=username,
hostname=rls.ip_address,
port=rls.port,
ssh_key=ssh_key,
password=password,
port_forward_list=port_forward_list,
)
@deprecated("0.10.0")
def __submit_job(
self,
job_configuration,
start_task,
job_manager_task,
autoscale_formula,
software_metadata_key: str,
vm_image_model,
application_metadata,
):
"""
Job Submission
:param job_configuration -> aztk_sdk.spark.models.JobConfiguration
:param start_task -> batch_models.StartTask
:param job_manager_task -> batch_models.TaskAddParameter
:param autoscale_formula -> str
:param software_metadata_key -> str
:param vm_image_model -> aztk_sdk.models.VmImage
:returns None
"""
self._get_cluster_data(job_configuration.id).save_cluster_config(job_configuration.to_cluster_config())
# get a verified node agent sku
sku_to_use, image_ref_to_use = helpers.select_latest_verified_vm_image_with_node_agent_sku(
vm_image_model.publisher, vm_image_model.offer, vm_image_model.sku, self.batch_client)
# set up subnet if necessary
network_conf = None
if job_configuration.subnet_id:
network_conf = batch_models.NetworkConfiguration(subnet_id=job_configuration.subnet_id)
# set up a schedule for a recurring job
auto_pool_specification = batch_models.AutoPoolSpecification(
pool_lifetime_option=batch_models.PoolLifetimeOption.job_schedule,
auto_pool_id_prefix=job_configuration.id,
keep_alive=False,
pool=batch_models.PoolSpecification(
display_name=job_configuration.id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use),
vm_size=job_configuration.vm_size,
enable_auto_scale=True,
auto_scale_formula=autoscale_formula,
auto_scale_evaluation_interval=timedelta(minutes=5),
start_task=start_task,
enable_inter_node_communication=not job_configuration.mixed_mode(),
network_configuration=network_conf,
max_tasks_per_node=4,
metadata=[
batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(
name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_JOB_MODE_METADATA),
],
),
)
# define job specification
job_spec = batch_models.JobSpecification(
pool_info=batch_models.PoolInformation(auto_pool_specification=auto_pool_specification),
display_name=job_configuration.id,
on_all_tasks_complete=batch_models.OnAllTasksComplete.terminate_job,
job_manager_task=job_manager_task,
metadata=[batch_models.MetadataItem(name="applications", value=application_metadata)],
)
# define schedule
schedule = batch_models.Schedule(
do_not_run_until=None, do_not_run_after=None, start_window=None, recurrence_interval=None)
# create job schedule and add task
setup = batch_models.JobScheduleAddParameter(
id=job_configuration.id, schedule=schedule, job_specification=job_spec)
self.batch_client.job_schedule.add(setup)
return self.batch_client.job_schedule.get(job_schedule_id=job_configuration.id)

Просмотреть файл

@ -1,4 +1,3 @@
import hashlib
from datetime import timedelta
import azure.batch.models as batch_models

Просмотреть файл

@ -1,9 +1,7 @@
import hashlib
from azure.batch.models import BatchErrorException
from msrest.exceptions import ClientRequestError
from aztk.utils import BackOffPolicy, helpers, retry
from aztk.utils import BackOffPolicy, retry
def delete_pool_and_job_and_table(core_cluster_operations, pool_id: str, keep_logs: bool = False):

Просмотреть файл

@ -1,6 +1,5 @@
# TODO: return Cluster instead of (pool, nodes)
from aztk import models
from aztk.utils import helpers
def get_pool_details(core_cluster_operations, cluster_id: str):

Просмотреть файл

@ -1,16 +1,7 @@
from typing import List
from azure.batch.models import BatchErrorException
from aztk import error
from aztk import models as base_models
from aztk.client import CoreClient
from aztk.spark import models
from aztk.spark.client.cluster import ClusterOperations
from aztk.spark.client.job import JobOperations
from aztk.spark.helpers import job_submission as job_submit_helper
from aztk.spark.utils import util
from aztk.utils import deprecate, deprecated, helpers
class Client(CoreClient):
@ -21,217 +12,8 @@ class Client(CoreClient):
job (:obj:`aztk.spark.client.job.JobOperations`): Job
"""
def __init__(self, secrets_configuration: models.SecretsConfiguration = None, **kwargs):
def __init__(self, secrets_configuration: models.SecretsConfiguration):
super().__init__()
context = None
if kwargs.get("secrets_config"):
deprecate(
version="0.10.0",
message="secrets_config key is deprecated in secrets.yaml",
advice="Please use secrets_configuration key instead.",
)
context = self._get_context(kwargs.get("secrets_config"))
else:
context = self._get_context(secrets_configuration)
context = self._get_context(secrets_configuration)
self.cluster = ClusterOperations(context)
self.job = JobOperations(context)
# ALL THE FOLLOWING METHODS ARE DEPRECATED AND WILL BE REMOVED IN 0.10.0
@deprecated("0.10.0")
def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False):
return self.cluster.create(cluster_configuration=cluster_conf, wait=wait)
@deprecated("0.10.0")
def create_clusters_in_parallel(self, cluster_confs): # NOT IMPLEMENTED
for cluster_conf in cluster_confs:
self.cluster.create(cluster_conf)
@deprecated("0.10.0")
def delete_cluster(self, cluster_id: str, keep_logs: bool = False):
return self.cluster.delete(id=cluster_id, keep_logs=keep_logs)
@deprecated("0.10.0")
def get_cluster(self, cluster_id: str):
return self.cluster.get(id=cluster_id)
@deprecated("0.10.0")
def list_clusters(self):
return self.cluster.list()
@deprecated("0.10.0")
def get_remote_login_settings(self, cluster_id: str, node_id: str):
return self.cluster.get_remote_login_settings(cluster_id, node_id)
@deprecated("0.10.0")
def submit(self,
cluster_id: str,
application: models.ApplicationConfiguration,
remote: bool = False,
wait: bool = False):
return self.cluster.submit(id=cluster_id, application=application, remote=remote, wait=wait)
@deprecated("0.10.0")
def submit_all_applications(self, cluster_id: str, applications): # NOT IMPLEMENTED
for application in applications:
self.cluster.submit(cluster_id, application)
@deprecated("0.10.0")
def wait_until_application_done(self, cluster_id: str, task_id: str): # NOT IMPLEMENTED
try:
helpers.wait_for_task_to_complete(job_id=cluster_id, task_id=task_id, batch_client=self.batch_client)
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_applications_done(self, cluster_id: str): # NOT IMPLEMENTED
try:
helpers.wait_for_tasks_to_complete(job_id=cluster_id, batch_client=self.batch_client)
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_cluster_is_ready(self, cluster_id: str): # NOT IMPLEMENTED
try:
util.wait_for_master_to_be_ready(self.cluster._core_cluster_operations, self.cluster, cluster_id)
pool = self.batch_client.pool.get(cluster_id)
nodes = self.batch_client.compute_node.list(pool_id=cluster_id)
return models.Cluster(base_models.Cluster(pool, nodes))
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_all_clusters_are_ready(self, clusters: List[str]): # NOT IMPLEMENTED
for cluster_id in clusters:
self.wait_until_cluster_is_ready(cluster_id)
@deprecated("0.10.0")
def create_user(self, cluster_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
return self.cluster.create_user(id=cluster_id, username=username, password=password, ssh_key=ssh_key)
@deprecated("0.10.0")
def get_application_log(self, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
return self.cluster.get_application_log(
id=cluster_id, application_name=application_name, tail=tail, current_bytes=current_bytes)
@deprecated("0.10.0")
def get_application_status(self, cluster_id: str, app_name: str):
return self.cluster.get_application_state(id=cluster_id, application_name=app_name)
@deprecated("0.10.0")
def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False, timeout=None):
return self.cluster.run(id=cluster_id, command=command, host=host, internal=internal)
@deprecated("0.10.0")
def node_run(self, cluster_id: str, node_id: str, command: str, host=False, internal: bool = False, timeout=None):
return self.cluster.node_run(
id=cluster_id, node_id=node_id, command=command, host=host, internal=internal, timeout=timeout)
@deprecated("0.10.0")
def cluster_copy(
self,
cluster_id: str,
source_path: str,
destination_path: str,
host: bool = False,
internal: bool = False,
timeout: int = None,
):
return self.cluster.copy(
id=cluster_id,
source_path=source_path,
destination_path=destination_path,
host=host,
internal=internal,
timeout=timeout,
)
@deprecated("0.10.0")
def cluster_download(
self,
cluster_id: str,
source_path: str,
destination_path: str = None,
host: bool = False,
internal: bool = False,
timeout: int = None,
):
return self.cluster.download(
id=cluster_id,
source_path=source_path,
destination_path=destination_path,
host=host,
internal=internal,
timeout=timeout,
)
@deprecated("0.10.0")
def cluster_ssh_into_master(self,
cluster_id,
node_id,
username,
ssh_key=None,
password=None,
port_forward_list=None,
internal=False):
return self.cluster._core_cluster_operations.ssh_into_node(cluster_id, node_id, username, ssh_key, password,
port_forward_list, internal)
"""
job submission
"""
@deprecated("0.10.0")
def submit_job(self, job_configuration: models.JobConfiguration):
return self.job.submit(job_configuration)
@deprecated("0.10.0")
def list_jobs(self):
return self.job.list()
@deprecated("0.10.0")
def list_applications(self, job_id):
return self.job.list_applications(job_id)
@deprecated("0.10.0")
def get_job(self, job_id):
return self.job.get(job_id)
@deprecated("0.10.0")
def stop_job(self, job_id):
return self.job.stop(job_id)
@deprecated("0.10.0")
def delete_job(self, job_id: str, keep_logs: bool = False):
return self.job.delete(job_id, keep_logs)
@deprecated("0.10.0")
def get_application(self, job_id, application_name):
return self.job.get_application(job_id, application_name)
@deprecated("0.10.0")
def get_job_application_log(self, job_id, application_name):
return self.job.get_application_log(job_id, application_name)
@deprecated("0.10.0")
def stop_job_app(self, job_id, application_name): # NOT IMPLEMENTED
try:
return job_submit_helper.stop_app(self, job_id, application_name)
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_job_finished(self, job_id):
try:
self.job.wait(job_id)
except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
@deprecated("0.10.0")
def wait_until_all_jobs_finished(self, jobs): # NOT IMPLEMENTED
for job in jobs:
self.wait_until_job_finished(job)
@deprecated("0.10.0")
def run_cluster_diagnostics(self, cluster_id, output_directory=None):
return self.cluster.diagnostics(cluster_id, output_directory)

Просмотреть файл

@ -1 +0,0 @@
# ALL FILES IN THIS DIRECTORY ARE DEPRECATED, WILL BE REMOTE IN v0.10.0

Просмотреть файл

@ -1,25 +0,0 @@
import os
def run(spark_client, cluster_id, output_directory=None):
# copy debug program to each node
output = spark_client.cluster_copy(
cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
ssh_cmd = _build_diagnostic_ssh_command()
run_output = spark_client.cluster_run(cluster_id, ssh_cmd, host=True)
remote_path = "/tmp/debug.zip"
if output_directory:
local_path = os.path.join(os.path.abspath(output_directory), "debug.zip")
output = spark_client.cluster_download(cluster_id, remote_path, local_path, host=True)
# write run output to debug/ directory
with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), "w", encoding="UTF-8") as f:
[f.write(line + "\n") for node_output in run_output for line in node_output.output]
else:
output = spark_client.cluster_download(cluster_id, remote_path, host=True)
return output
def _build_diagnostic_ssh_command():
return "sudo rm -rf /tmp/debug.zip; " "sudo apt-get install -y python3-pip; " "sudo -H pip3 install --upgrade pip; " "sudo -H pip3 install docker; " "sudo python3 /tmp/debug.py"

Просмотреть файл

@ -1,159 +0,0 @@
from typing import List
import azure.batch.models as batch_models
from aztk import models as aztk_models
from aztk.utils import constants, helpers
POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool, elevation_level=batch_models.ElevationLevel.admin))
def _get_aztk_environment(cluster_id, worker_on_master, mixed_mode):
envs = []
envs.append(batch_models.EnvironmentSetting(name="AZTK_MIXED_MODE", value=helpers.bool_env(mixed_mode)))
envs.append(batch_models.EnvironmentSetting(name="AZTK_WORKER_ON_MASTER", value=helpers.bool_env(worker_on_master)))
envs.append(batch_models.EnvironmentSetting(name="AZTK_CLUSTER_ID", value=cluster_id))
return envs
def __get_docker_credentials(spark_client):
creds = []
docker = spark_client.secrets_config.docker
if docker:
if docker.endpoint:
creds.append(batch_models.EnvironmentSetting(name="DOCKER_ENDPOINT", value=docker.endpoint))
if docker.username:
creds.append(batch_models.EnvironmentSetting(name="DOCKER_USERNAME", value=docker.username))
if docker.password:
creds.append(batch_models.EnvironmentSetting(name="DOCKER_PASSWORD", value=docker.password))
return creds
def __get_secrets_env(spark_client):
shared_key = spark_client.secrets_config.shared_key
service_principal = spark_client.secrets_config.service_principal
if shared_key:
return [
batch_models.EnvironmentSetting(name="BATCH_SERVICE_URL", value=shared_key.batch_service_url),
batch_models.EnvironmentSetting(name="BATCH_ACCOUNT_KEY", value=shared_key.batch_account_key),
batch_models.EnvironmentSetting(name="STORAGE_ACCOUNT_NAME", value=shared_key.storage_account_name),
batch_models.EnvironmentSetting(name="STORAGE_ACCOUNT_KEY", value=shared_key.storage_account_key),
batch_models.EnvironmentSetting(name="STORAGE_ACCOUNT_SUFFIX", value=shared_key.storage_account_suffix),
]
else:
return [
batch_models.EnvironmentSetting(name="SP_TENANT_ID", value=service_principal.tenant_id),
batch_models.EnvironmentSetting(name="SP_CLIENT_ID", value=service_principal.client_id),
batch_models.EnvironmentSetting(name="SP_CREDENTIAL", value=service_principal.credential),
batch_models.EnvironmentSetting(
name="SP_BATCH_RESOURCE_ID", value=service_principal.batch_account_resource_id),
batch_models.EnvironmentSetting(
name="SP_STORAGE_RESOURCE_ID", value=service_principal.storage_account_resource_id),
]
def __cluster_install_cmd(
zip_resource_file: batch_models.ResourceFile,
gpu_enabled: bool,
docker_repo: str = None,
docker_run_options: str = None,
plugins=None,
worker_on_master: bool = True,
file_mounts=None,
mixed_mode: bool = False,
):
"""
For Docker on ubuntu 16.04 - return the command line
to be run on the start task of the pool to setup spark.
"""
default_docker_repo = constants.DEFAULT_DOCKER_REPO if not gpu_enabled else constants.DEFAULT_DOCKER_REPO_GPU
docker_repo = docker_repo or default_docker_repo
docker_run_options = docker_run_options or ""
shares = []
if file_mounts:
for mount in file_mounts:
# Create the directory on the node
shares.append("mkdir -p {0}".format(mount.mount_path))
# Mount the file share
shares.append("mount -t cifs //{0}.file.core.windows.net/{2} {3} "
"-o vers=3.0,username={0},password={1},dir_mode=0777,file_mode=0777,sec=ntlmssp".format(
mount.storage_account_name, mount.storage_account_key, mount.file_share_path,
mount.mount_path))
setup = [
"time("
"apt-get -y update;"
"apt-get -y --no-install-recommends install unzip;"
"unzip -o $AZ_BATCH_TASK_WORKING_DIR/{0};"
"chmod 777 $AZ_BATCH_TASK_WORKING_DIR/aztk/node_scripts/setup_host.sh;"
") 2>&1".format(zip_resource_file.file_path),
'/bin/bash $AZ_BATCH_TASK_WORKING_DIR/aztk/node_scripts/setup_host.sh {0} {1} "{2}"'.format(
constants.DOCKER_SPARK_CONTAINER_NAME, docker_repo, docker_run_options.replace('"', '\\"')),
]
commands = shares + setup
return commands
def generate_cluster_start_task(
spark_client,
zip_resource_file: batch_models.ResourceFile,
cluster_id: str,
gpu_enabled: bool,
docker_repo: str = None,
docker_run_options: str = None,
file_shares: List[aztk_models.FileShare] = None,
plugins: List[aztk_models.PluginConfiguration] = None,
mixed_mode: bool = False,
worker_on_master: bool = True,
):
"""
This will return the start task object for the pool to be created.
:param cluster_id str: Id of the cluster(Used for uploading the resource files)
:param zip_resource_file: Resource file object pointing to the zip file containing scripts to run on the node
"""
resource_files = [zip_resource_file]
spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT
spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT
spark_job_ui_port = constants.DOCKER_SPARK_JOB_UI_PORT
spark_container_name = constants.DOCKER_SPARK_CONTAINER_NAME
spark_submit_logs_file = constants.SPARK_SUBMIT_LOGS_FILE
# TODO use certificate
environment_settings = (__get_secrets_env(spark_client) + [
batch_models.EnvironmentSetting(name="SPARK_WEB_UI_PORT", value=spark_web_ui_port),
batch_models.EnvironmentSetting(name="SPARK_WORKER_UI_PORT", value=spark_worker_ui_port),
batch_models.EnvironmentSetting(name="SPARK_JOB_UI_PORT", value=spark_job_ui_port),
batch_models.EnvironmentSetting(name="SPARK_CONTAINER_NAME", value=spark_container_name),
batch_models.EnvironmentSetting(name="SPARK_SUBMIT_LOGS_FILE", value=spark_submit_logs_file),
batch_models.EnvironmentSetting(name="AZTK_GPU_ENABLED", value=helpers.bool_env(gpu_enabled)),
] + __get_docker_credentials(spark_client) + _get_aztk_environment(cluster_id, worker_on_master, mixed_mode))
# start task command
command = __cluster_install_cmd(
zip_resource_file,
gpu_enabled,
docker_repo,
docker_run_options,
plugins,
worker_on_master,
file_shares,
mixed_mode,
)
return batch_models.StartTask(
command_line=helpers.wrap_commands_in_shell(command),
resource_files=resource_files,
environment_settings=environment_settings,
user_identity=POOL_ADMIN_USER_IDENTITY,
wait_for_success=True,
max_task_retry_count=2,
)

Просмотреть файл

@ -1,106 +0,0 @@
import time
import azure
import azure.batch.models as batch_models
from azure.batch.models import BatchErrorException
from aztk import error
from aztk import models as base_models
from aztk.spark import models
from aztk.utils import constants, helpers
output_file = constants.TASK_WORKING_DIR + "/" + constants.SPARK_SUBMIT_LOGS_FILE
def __check_task_node_exist(batch_client, cluster_id: str, task: batch_models.CloudTask) -> bool:
try:
batch_client.compute_node.get(cluster_id, task.node_info.node_id)
return True
except BatchErrorException:
return False
def __wait_for_app_to_be_running(batch_client, cluster_id: str, application_name: str) -> batch_models.CloudTask:
"""
Wait for the batch task to leave the waiting state into running(or completed if it was fast enough)
"""
while True:
task = batch_client.task.get(cluster_id, application_name)
if task.state is batch_models.TaskState.active or task.state is batch_models.TaskState.preparing:
# TODO: log
time.sleep(5)
else:
return task
def __get_output_file_properties(batch_client, cluster_id: str, application_name: str):
while True:
try:
file = helpers.get_file_properties(cluster_id, application_name, output_file, batch_client)
return file
except BatchErrorException as e:
if e.response.status_code == 404:
# TODO: log
time.sleep(5)
continue
else:
raise e
def get_log_from_storage(blob_client, container_name, application_name, task):
try:
blob = blob_client.get_blob_to_text(container_name, application_name + "/" + constants.SPARK_SUBMIT_LOGS_FILE)
except azure.common.AzureMissingResourceHttpError:
raise error.AztkError("Logs not found in your storage account. They were either deleted or never existed.")
base_model = base_models.ApplicationLog(
name=application_name,
cluster_id=container_name,
application_state=task.state,
log=blob.content,
total_bytes=blob.properties.content_length,
exit_code=task.execution_info.exit_code,
)
return models.ApplicationLog(base_model)
def get_log(batch_client, blob_client, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
job_id = cluster_id
task_id = application_name
task = __wait_for_app_to_be_running(batch_client, cluster_id, application_name)
if not __check_task_node_exist(batch_client, cluster_id, task):
return get_log_from_storage(blob_client, cluster_id, application_name, task)
file = __get_output_file_properties(batch_client, cluster_id, application_name)
target_bytes = file.content_length
if target_bytes != current_bytes:
ocp_range = None
if tail:
ocp_range = "bytes={0}-{1}".format(current_bytes, target_bytes - 1)
stream = batch_client.file.get_from_task(
job_id, task_id, output_file, batch_models.FileGetFromTaskOptions(ocp_range=ocp_range))
content = helpers.read_stream_as_string(stream)
base_model = base_models.ApplicationLog(
name=application_name,
cluster_id=cluster_id,
application_state=task.state,
log=content,
total_bytes=target_bytes,
exit_code=task.execution_info.exit_code,
)
return models.ApplicationLog(base_model)
else:
base_model = base_models.ApplicationLog(
name=application_name,
cluster_id=cluster_id,
application_state=task.state,
log="",
total_bytes=target_bytes,
exit_code=task.execution_info.exit_code,
)
return models.ApplicationLog(base_model)

Просмотреть файл

@ -1,202 +0,0 @@
import time
import azure.batch.models as batch_models
import yaml
import aztk.error as error
from aztk.utils import helpers
from aztk.utils.command_builder import CommandBuilder
def __app_cmd():
docker_exec = CommandBuilder("sudo docker exec")
docker_exec.add_argument("-i")
docker_exec.add_option("-e", "AZ_BATCH_TASK_WORKING_DIR=$AZ_BATCH_TASK_WORKING_DIR")
docker_exec.add_option("-e", "AZ_BATCH_JOB_ID=$AZ_BATCH_JOB_ID")
docker_exec.add_argument(
r'spark /bin/bash >> output.log 2>&1 -c "'
r"source ~/.bashrc; "
r"export PYTHONPATH=$PYTHONPATH:\$AZTK_WORKING_DIR; "
r"cd \$AZ_BATCH_TASK_WORKING_DIR; "
r'\$AZTK_WORKING_DIR/.aztk-env/.venv/bin/python \$AZTK_WORKING_DIR/aztk/node_scripts/scheduling/job_submission.py"'
)
return docker_exec.to_str()
def generate_task(spark_client, job, application_tasks):
resource_files = []
for application, task in application_tasks:
task_definition_resource_file = helpers.upload_text_to_container(
container_name=job.id,
application_name=application.name + ".yaml",
file_path=application.name + ".yaml",
content=yaml.dump(task),
blob_client=spark_client.blob_client,
)
resource_files.append(task_definition_resource_file)
task_cmd = __app_cmd()
# Create task
task = batch_models.JobManagerTask(
id=job.id,
command_line=helpers.wrap_commands_in_shell([task_cmd]),
resource_files=resource_files,
kill_job_on_completion=False,
allow_low_priority_node=True,
user_identity=batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.task, elevation_level=batch_models.ElevationLevel.admin)),
)
return task
def __get_recent_job(spark_client, job_id):
job_schedule = spark_client.batch_client.job_schedule.get(job_id)
return spark_client.batch_client.job.get(job_schedule.execution_info.recent_job.id)
def list_jobs(spark_client):
return [cloud_job_schedule for cloud_job_schedule in spark_client.batch_client.job_schedule.list()]
def list_applications(spark_client, job_id):
recent_run_job = __get_recent_job(spark_client, job_id)
# get application names from Batch job metadata
applications = {}
for metadata_item in recent_run_job.metadata:
if metadata_item.name == "applications":
for app_name in metadata_item.value.split("\n"):
applications[app_name] = None
# get tasks from Batch job
for task in spark_client.batch_client.task.list(recent_run_job.id):
if task.id != job_id:
applications[task.id] = task
return applications
def get_job(spark_client, job_id):
job = spark_client.batch_client.job_schedule.get(job_id)
job_apps = [
app for app in spark_client.batch_client.task.list(job_id=job.execution_info.recent_job.id) if app.id != job_id
]
recent_run_job = __get_recent_job(spark_client, job_id)
pool_prefix = recent_run_job.pool_info.auto_pool_specification.auto_pool_id_prefix
pool = nodes = None
for cloud_pool in spark_client.batch_client.pool.list():
if pool_prefix in cloud_pool.id:
pool = cloud_pool
break
if pool:
nodes = spark_client.batch_client.compute_node.list(pool_id=pool.id)
return job, job_apps, pool, nodes
def disable(spark_client, job_id):
# disable the currently running job from the job schedule if exists
recent_run_job = __get_recent_job(spark_client, job_id)
if recent_run_job.id and recent_run_job.state == batch_models.JobState.active:
spark_client.batch_client.job.disable(
job_id=recent_run_job.id, disable_tasks=batch_models.DisableJobOption.requeue)
# disable the job_schedule
spark_client.batch_client.job_schedule.disable(job_id)
def enable(spark_client, job_id):
# disable the currently running job from the job schedule if exists
recent_run_job = __get_recent_job(spark_client, job_id)
if recent_run_job.id and recent_run_job.state == batch_models.JobState.active:
spark_client.batch_client.job.enable(job_id=recent_run_job.id)
# disable the job_schedule
spark_client.batch_client.job_schedule.enable(job_id)
def stop(spark_client, job_id):
# terminate currently running job and tasks
recent_run_job = __get_recent_job(spark_client, job_id)
spark_client.batch_client.job.terminate(recent_run_job.id)
# terminate job_schedule
spark_client.batch_client.job_schedule.terminate(job_id)
def delete(spark_client, job_id, keep_logs: bool = False):
recent_run_job = __get_recent_job(spark_client, job_id)
deleted_job_or_job_schedule = False
# delete job
try:
spark_client.batch_client.job.delete(recent_run_job.id)
deleted_job_or_job_schedule = True
except batch_models.BatchErrorException:
pass
# delete job_schedule
try:
spark_client.batch_client.job_schedule.delete(job_id)
deleted_job_or_job_schedule = True
except batch_models.BatchErrorException:
pass
# delete storage container
if keep_logs:
cluster_data = spark_client._get_cluster_data(job_id)
cluster_data.delete_container(job_id)
return deleted_job_or_job_schedule
def get_application(spark_client, job_id, application_name):
# info about the app
recent_run_job = __get_recent_job(spark_client, job_id)
try:
return spark_client.batch_client.task.get(job_id=recent_run_job.id, task_id=application_name)
except batch_models.BatchErrorException:
raise error.AztkError(
"The Spark application {0} is still being provisioned or does not exist.".format(application_name))
def get_application_log(spark_client, job_id, application_name):
# TODO: change where the logs are uploaded so they aren't overwritten on scheduled runs
# current: job_id, application_name/output.log
# new: job_id, recent_run_job.id/application_name/output.log
recent_run_job = __get_recent_job(spark_client, job_id)
try:
task = spark_client.batch_client.task.get(job_id=recent_run_job.id, task_id=application_name)
except batch_models.BatchErrorException as e:
# see if the application is written to metadata of pool
applications = list_applications(spark_client, job_id)
for application in applications:
if applications[application] is None and application == application_name:
raise error.AztkError("The application {0} has not yet been created.".format(application))
raise error.AztkError("The application {0} does not exist".format(application_name))
else:
if task.state in (
batch_models.TaskState.active,
batch_models.TaskState.running,
batch_models.TaskState.preparing,
):
raise error.AztkError("The application {0} has not yet finished executing.".format(application_name))
return spark_client.get_application_log(job_id, application_name)
def stop_app(spark_client, job_id, application_name):
recent_run_job = __get_recent_job(spark_client, job_id)
# stop batch task
try:
spark_client.batch_client.task.terminate(job_id=recent_run_job.id, task_id=application_name)
return True
except batch_models.BatchErrorException:
return False
def wait_until_job_finished(spark_client, job_id):
job_state = spark_client.batch_client.job_schedule.get(job_id).state
while job_state != batch_models.JobScheduleState.completed:
time.sleep(3)
job_state = spark_client.batch_client.job_schedule.get(job_id).state

Просмотреть файл

@ -1,132 +0,0 @@
import os
import azure.batch.models as batch_models
import yaml
from aztk.error import AztkError
from aztk.utils import helpers
from aztk.utils.command_builder import CommandBuilder
def __get_node(spark_client, node_id: str, cluster_id: str) -> batch_models.ComputeNode:
return spark_client.batch_client.compute_node.get(cluster_id, node_id)
def generate_task(spark_client, container_id, application, remote=False):
resource_files = []
# The application provided is not hosted remotely and therefore must be uploaded
if not remote:
app_resource_file = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=application.application,
blob_client=spark_client.blob_client,
use_full_path=False,
)
# Upload application file
resource_files.append(app_resource_file)
application.application = "$AZ_BATCH_TASK_WORKING_DIR/" + os.path.basename(application.application)
# Upload dependent JARS
jar_resource_file_paths = []
for jar in application.jars:
current_jar_resource_file_path = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=jar,
blob_client=spark_client.blob_client,
use_full_path=False,
)
jar_resource_file_paths.append(current_jar_resource_file_path)
resource_files.append(current_jar_resource_file_path)
# Upload dependent python files
py_files_resource_file_paths = []
for py_file in application.py_files:
current_py_files_resource_file_path = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=py_file,
blob_client=spark_client.blob_client,
use_full_path=False,
)
py_files_resource_file_paths.append(current_py_files_resource_file_path)
resource_files.append(current_py_files_resource_file_path)
# Upload other dependent files
files_resource_file_paths = []
for file in application.files:
files_resource_file_path = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=file,
blob_client=spark_client.blob_client,
use_full_path=False,
)
files_resource_file_paths.append(files_resource_file_path)
resource_files.append(files_resource_file_path)
# Upload application definition
application.jars = [os.path.basename(jar) for jar in application.jars]
application.py_files = [os.path.basename(py_files) for py_files in application.py_files]
application.files = [os.path.basename(files) for files in application.files]
application_definition_file = helpers.upload_text_to_container(
container_name=container_id,
application_name=application.name,
file_path="application.yaml",
content=yaml.dump(vars(application)),
blob_client=spark_client.blob_client,
)
resource_files.append(application_definition_file)
# create command to submit task
task_cmd = CommandBuilder("sudo docker exec")
task_cmd.add_argument("-i")
task_cmd.add_option("-e", "AZ_BATCH_TASK_WORKING_DIR=$AZ_BATCH_TASK_WORKING_DIR")
task_cmd.add_option("-e", "STORAGE_LOGS_CONTAINER={0}".format(container_id))
task_cmd.add_argument("spark /bin/bash >> output.log 2>&1")
task_cmd.add_argument(
r'-c "source ~/.bashrc; '
r"export PYTHONPATH=$PYTHONPATH:\$AZTK_WORKING_DIR; "
r"cd \$AZ_BATCH_TASK_WORKING_DIR; "
r'\$AZTK_WORKING_DIR/.aztk-env/.venv/bin/python \$AZTK_WORKING_DIR/aztk/node_scripts/scheduling/submit.py"')
# Create task
task = batch_models.TaskAddParameter(
id=application.name,
command_line=helpers.wrap_commands_in_shell([task_cmd.to_str()]),
resource_files=resource_files,
constraints=batch_models.TaskConstraints(max_task_retry_count=application.max_retry_count),
user_identity=batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.task, elevation_level=batch_models.ElevationLevel.admin)),
)
return task
def affinitize_task_to_master(spark_client, cluster_id, task):
cluster = spark_client.get_cluster(cluster_id)
if cluster.master_node_id is None:
raise AztkError("Master has not yet been selected. Please wait until the cluster is finished provisioning.")
master_node = spark_client.batch_client.compute_node.get(pool_id=cluster_id, node_id=cluster.master_node_id)
task.affinity_info = batch_models.AffinityInformation(affinity_id=master_node.affinity_id)
return task
def submit_application(spark_client, cluster_id, application, remote: bool = False, wait: bool = False):
"""
Submit a spark app
"""
task = generate_task(spark_client, cluster_id, application, remote)
task = affinitize_task_to_master(spark_client, cluster_id, task)
# Add task to batch job (which has the same name as cluster_id)
job_id = cluster_id
spark_client.batch_client.task.add(job_id=job_id, task=task)
if wait:
helpers.wait_for_task_to_complete(job_id=job_id, task_id=task.id, batch_client=spark_client.batch_client)

Просмотреть файл

@ -10,11 +10,6 @@ from aztk_cli.config import load_aztk_spark_config
def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument("--id", dest="cluster_id", help="The unique id of your spark cluster")
parser.add_argument("--size", type=int, help="Number of vms in your cluster")
parser.add_argument(
"--size-low-pri",
type=int,
help="Number of low priority vms in your cluster (Deprecated, use --size-low-priority)",
)
parser.add_argument("--size-low-priority", type=int, help="Number of low priority vms in your cluster")
parser.add_argument("--vm-size", help="VM size for nodes in your cluster")
parser.add_argument("--username", help="Username to access your cluster (required: --wait flag)")

Просмотреть файл

@ -338,7 +338,7 @@ def test_scheduling_target():
assert application_log.exit_code == 0
application_state = spark_client.cluster.get_application_state(
id=cluster_configuration.id, application_name="pipy100")
id=cluster_configuration.cluster_id, application_name="pipy100")
assert application_state == aztk.spark.models.ApplicationState.Completed
finally:

Просмотреть файл

@ -1,354 +0,0 @@
import os
import subprocess
import time
from datetime import datetime
from zipfile import ZipFile
import azure.batch.models as batch_models
import pytest
from azure.batch.models import BatchErrorException
from tests.integration_tests.spark.sdk.clean_up_cluster import clean_up_cluster
from tests.integration_tests.spark.sdk.ensure_spark_processes import \
ensure_spark_processes
from tests.integration_tests.spark.sdk.get_client import (get_spark_client,
get_test_suffix)
from tests.integration_tests.spark.sdk.wait_for_all_nodes import \
wait_for_all_nodes
import aztk.spark
from aztk.error import AztkError
from aztk.utils import constants
from aztk_cli import config
base_cluster_id = get_test_suffix("cluster")
spark_client = get_spark_client()
def test_create_cluster():
test_id = "test-create-deprecated-"
# TODO: make Cluster Configuration more robust, test each value
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
try:
with pytest.warns(DeprecationWarning):
cluster = spark_client.create_cluster(cluster_configuration, wait=True)
assert cluster.pool is not None
assert cluster.nodes is not None
assert cluster.id == cluster_configuration.cluster_id
assert cluster.vm_size == "standard_f2"
assert cluster.current_dedicated_nodes == 2
assert cluster.gpu_enabled is False
assert cluster.master_node_id is not None
assert cluster.current_low_pri_nodes == 0
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)
def test_get_cluster():
test_id = "test-get-deprecated-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
try:
with pytest.warns(DeprecationWarning):
spark_client.create_cluster(cluster_configuration, wait=True)
with pytest.warns(DeprecationWarning):
cluster = spark_client.get_cluster(cluster_id=cluster_configuration.cluster_id)
assert cluster.pool is not None
assert cluster.nodes is not None
assert cluster.id == cluster_configuration.cluster_id
assert cluster.vm_size == "standard_f2"
assert cluster.current_dedicated_nodes == 2
assert cluster.gpu_enabled is False
assert cluster.master_node_id is not None
assert cluster.current_low_pri_nodes == 0
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)
def test_list_clusters():
test_id = "test-list-deprecated-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
try:
with pytest.warns(DeprecationWarning):
spark_client.create_cluster(cluster_configuration, wait=True)
with pytest.warns(DeprecationWarning):
clusters = spark_client.list_clusters()
assert cluster_configuration.cluster_id in [cluster.id for cluster in clusters]
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)
def test_get_remote_login_settings():
test_id = "test-get-remote-login-deprecated-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
try:
with pytest.warns(DeprecationWarning):
spark_client.create_cluster(cluster_configuration, wait=True)
with pytest.warns(DeprecationWarning):
cluster = spark_client.get_cluster(cluster_id=cluster_configuration.cluster_id)
with pytest.warns(DeprecationWarning):
rls = spark_client.get_remote_login_settings(cluster_id=cluster.id, node_id=cluster.master_node_id)
assert rls.ip_address is not None
assert rls.port is not None
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)
def test_submit():
test_id = "test-submit-deprecated-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
application_configuration = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100],
main_class=None,
jars=[],
py_files=[],
files=[],
driver_java_options=None,
driver_class_path=None,
driver_memory=None,
driver_cores=None,
executor_memory=None,
executor_cores=None,
max_retry_count=None)
try:
with pytest.warns(DeprecationWarning):
spark_client.create_cluster(cluster_configuration, wait=True)
with pytest.warns(DeprecationWarning):
spark_client.submit(
cluster_id=cluster_configuration.cluster_id, application=application_configuration, wait=True)
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)
def test_get_application_log():
test_id = "test-get-app-log-deprecated-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
application_configuration = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100],
main_class=None,
jars=[],
py_files=[],
files=[],
driver_java_options=None,
driver_class_path=None,
driver_memory=None,
driver_cores=None,
executor_memory=None,
executor_cores=None,
max_retry_count=None)
try:
with pytest.warns(DeprecationWarning):
spark_client.create_cluster(cluster_configuration, wait=True)
with pytest.warns(DeprecationWarning):
spark_client.submit(
cluster_id=cluster_configuration.cluster_id, application=application_configuration, wait=True)
with pytest.warns(DeprecationWarning):
application_log = spark_client.get_application_log(
cluster_id=cluster_configuration.cluster_id,
application_name=application_configuration.name,
tail=False,
current_bytes=0)
assert application_log.exit_code == 0
assert application_log.name == application_configuration.name == "pipy100"
assert application_log.application_state == aztk.spark.models.ApplicationState.Completed
assert application_log.log is not None
assert application_log.total_bytes is not None
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)
def test_create_user_password():
#TODO: test with paramiko
pass
def test_create_user_ssh_key():
#TODO: test with paramiko
pass
def test_get_application_state_complete():
test_id = "test-app-status-complete-deprecated-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
application_configuration = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[100],
main_class=None,
jars=[],
py_files=[],
files=[],
driver_java_options=None,
driver_class_path=None,
driver_memory=None,
driver_cores=None,
executor_memory=None,
executor_cores=None,
max_retry_count=None)
try:
with pytest.warns(DeprecationWarning):
spark_client.create_cluster(cluster_configuration, wait=True)
with pytest.warns(DeprecationWarning):
spark_client.submit(
cluster_id=cluster_configuration.cluster_id, application=application_configuration, wait=True)
with pytest.warns(DeprecationWarning):
status = spark_client.get_application_status(
cluster_id=cluster_configuration.cluster_id, app_name=application_configuration.name)
assert status == aztk.spark.models.ApplicationState.Completed
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)
def test_delete_cluster():
test_id = "test-delete-deprecated-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
try:
with pytest.warns(DeprecationWarning):
spark_client.create_cluster(cluster_configuration, wait=True)
success = spark_client.delete_cluster(cluster_id=cluster_configuration.cluster_id)
assert success is True
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)
def test_spark_processes_up():
test_id = "test-spark-processes-up-deprecated-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
try:
with pytest.warns(DeprecationWarning):
cluster = spark_client.create_cluster(cluster_configuration, wait=True)
wait_for_all_nodes(spark_client, cluster.id, cluster.nodes)
ensure_spark_processes(spark_client=spark_client, id=cluster_configuration.cluster_id)
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)
def test_debug_tool():
test_id = "test-debug-tool-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id + base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None)
expected_members = [
"df.txt", "hostname.txt", "docker-images.txt", "docker-containers.txt", "spark/docker.log", "spark/ps_aux.txt",
"spark/logs", "spark/wd"
]
try:
with pytest.warns(DeprecationWarning):
cluster = spark_client.create_cluster(cluster_configuration, wait=True)
nodes = [node for node in cluster.nodes]
wait_for_all_nodes(spark_client, cluster.id, nodes)
with pytest.warns(DeprecationWarning):
cluster_output = spark_client.run_cluster_diagnostics(cluster_id=cluster.id)
for node_output in cluster_output:
node_output.output.seek(0) # tempfile requires seek 0 before reading
debug_zip = ZipFile(node_output.output)
assert node_output.id in [node.id for node in nodes]
assert node_output.error is None
assert any(member in name for name in debug_zip.namelist() for member in expected_members)
finally:
clean_up_cluster(spark_client, cluster_configuration.cluster_id)

Просмотреть файл

@ -1,287 +0,0 @@
import os
import subprocess
from datetime import datetime
import pytest
from azure.batch.models import BatchErrorException
from tests.integration_tests.spark.sdk.get_client import (get_spark_client, get_test_suffix)
import aztk.spark
from aztk.error import AztkError
from aztk_cli import config
base_job_id = get_test_suffix("job")
spark_client = get_spark_client()
def test_submit_job():
test_id = "submit-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id + base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=2,
max_low_pri_nodes=0)
try:
with pytest.warns(DeprecationWarning):
job = spark_client.submit_job(job_configuration=job_configuration)
with pytest.warns(DeprecationWarning):
spark_client.wait_until_job_finished(job_id=job_configuration.id)
assert job.id == job_configuration.id
assert job.state is not None
except (AztkError, BatchErrorException) as e:
raise e
finally:
clean_up_job(job_configuration.id)
def test_list_jobs():
test_id = "list-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id + base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=1,
max_low_pri_nodes=0,
worker_on_master=True)
try:
with pytest.warns(DeprecationWarning):
spark_client.submit_job(job_configuration=job_configuration)
with pytest.warns(DeprecationWarning):
spark_client.wait_until_job_finished(job_configuration.id)
jobs = spark_client.list_jobs()
assert jobs is not None
assert job_configuration.id in [job.id for job in jobs]
except (AztkError, BatchErrorException) as e:
raise e
finally:
clean_up_job(job_configuration.id)
def test_list_applications():
test_id = "list-apps-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id + base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=2,
max_low_pri_nodes=0)
try:
with pytest.warns(DeprecationWarning):
spark_client.submit_job(job_configuration=job_configuration)
with pytest.warns(DeprecationWarning):
spark_client.wait_until_job_finished(job_configuration.id)
applications = spark_client.list_applications(job_id=job_configuration.id)
assert applications not in (None, [])
assert len(applications) == 2
for application in applications:
assert isinstance(application, (aztk.spark.models.Application, str))
except (AztkError, BatchErrorException) as e:
raise e
finally:
clean_up_job(job_configuration.id)
def test_get_job():
test_id = "get-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
app2 = aztk.spark.models.ApplicationConfiguration(
name="pipy101",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id + base_job_id,
applications=[app1, app2],
vm_size="standard_f1",
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=1,
max_low_pri_nodes=0,
worker_on_master=True)
try:
with pytest.warns(DeprecationWarning):
spark_client.submit_job(job_configuration=job_configuration)
with pytest.warns(DeprecationWarning):
spark_client.wait_until_job_finished(job_configuration.id)
with pytest.warns(DeprecationWarning):
job = spark_client.get_job(job_id=job_configuration.id)
assert job.id == job_configuration.id
assert app1.name in [app.name for app in job.applications]
assert app2.name in [app.name for app in job.applications]
except (AztkError, BatchErrorException) as e:
raise e
finally:
clean_up_job(job_configuration.id)
def test_get_application():
test_id = "get-app-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id + base_job_id,
applications=[app1],
vm_size="standard_f1",
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=2,
max_low_pri_nodes=0)
try:
with pytest.warns(DeprecationWarning):
spark_client.submit_job(job_configuration=job_configuration)
with pytest.warns(DeprecationWarning):
spark_client.wait_until_job_finished(job_configuration.id)
with pytest.warns(DeprecationWarning):
application = spark_client.get_application(job_id=job_configuration.id, application_name=app1.name)
assert isinstance(application, aztk.spark.models.Application)
assert application.exit_code == 0
assert application.state == aztk.spark.models.ApplicationState.Completed
assert application.name == "pipy100"
except (AztkError, BatchErrorException) as e:
raise e
finally:
clean_up_job(job_configuration.id)
def test_get_application_log():
test_id = "gal-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id + base_job_id,
applications=[app1],
vm_size="standard_f1",
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=2,
max_low_pri_nodes=0)
try:
with pytest.warns(DeprecationWarning):
spark_client.submit_job(job_configuration=job_configuration)
with pytest.warns(DeprecationWarning):
spark_client.wait_until_job_finished(job_configuration.id)
with pytest.warns(DeprecationWarning):
application_log = spark_client.get_job_application_log(
job_id=job_configuration.id, application_name=app1.name)
assert isinstance(application_log, aztk.spark.models.ApplicationLog)
assert application_log.log is not None
assert application_log.exit_code == 0
assert application_log.name == "pipy100"
assert application_log.total_bytes != 0
except (AztkError, BatchErrorException) as e:
raise e
finally:
clean_up_job(job_configuration.id)
def test_delete_job():
test_id = "delete-"
app1 = aztk.spark.models.ApplicationConfiguration(
name="pipy100",
application="./examples/src/main/python/pi.py",
application_args=[10],
)
job_configuration = aztk.spark.models.JobConfiguration(
id=test_id + base_job_id,
applications=[app1],
vm_size="standard_f1",
spark_configuration=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
max_dedicated_nodes=1,
max_low_pri_nodes=0,
worker_on_master=True)
try:
with pytest.warns(DeprecationWarning):
spark_client.submit_job(job_configuration=job_configuration)
with pytest.warns(DeprecationWarning):
spark_client.wait_until_job_finished(job_configuration.id)
with pytest.warns(DeprecationWarning):
spark_client.delete_job(job_configuration.id)
with pytest.warns(DeprecationWarning):
assert job_configuration.id not in spark_client.list_jobs()
try:
with pytest.warns(DeprecationWarning):
spark_client.get_job(job_configuration.id)
except AztkError:
# this should fail
assert True
except (AztkError, BatchErrorException) as e:
raise e
finally:
clean_up_job(job_configuration.id)
def clean_up_job(job_id):
try:
with pytest.warns(DeprecationWarning):
spark_client.delete_job(job_id)
except Exception:
pass