Internal: verify code formatting in build (#633)

* format all files, enforce formatting in travis build

* add yapf to vsts build

* update vsts build

* fix

* fix

* fix

* change queue to ubuntu

* revert

* temporarily enable builds on pushes to this branch

* change to non preview

* revert

* update yapf version, rerun

* update pytest parallelism

* add retry to arm call to avoid failures

* remove non-master trigger

* update builds, formatting style
This commit is contained in:
Jacob Freck 2018-08-06 15:29:06 -07:00 коммит произвёл GitHub
Родитель b18eb695a1
Коммит 7730c46ee4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
127 изменённых файлов: 1047 добавлений и 1221 удалений

Просмотреть файл

@ -1,7 +1,8 @@
[style]
based_on_style=pep8
based_on_style=google
spaces_before_comment=4
split_before_logical_operator=True
indent_width=4
column_limit=120
split_arguments_when_comma_terminated=True
blank_line_before_nested_class_or_def=False

Просмотреть файл

@ -8,6 +8,7 @@ install:
- pip install -e .
script:
- yapf -dpr aztk/ aztk_cli/
- pylint -E aztk
- pytest --ignore=tests/integration_tests

4
.vscode/settings.json поставляемый
Просмотреть файл

@ -15,4 +15,8 @@
"python.venvPath": "${workspaceFolder}/.venv/",
"python.pythonPath": "${workspaceFolder}/.venv/Scripts/python.exe",
"python.unitTest.pyTestEnabled": true,
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
}

Просмотреть файл

@ -1,20 +1,29 @@
trigger:
- master
steps:
- task: UsePythonVersion@0
phases:
- phase: Test
queue: Hosted Linux Preview
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '>= 3.5'
addToPath: true
architecture: 'x64'
- script: |
- script: |
pip install -r requirements.txt
pip install -e .
condition: and(succeeded(), eq(variables['agent.os'], 'linux'))
condition: succeeded()
displayName: install aztk
- script: |
pytest -n 50
condition: and(succeeded(), in(variables['agent.os'], 'linux'))
- script: |
yapf -dpr aztk/ aztk_cli/
condition: succeeded()
displayName: yapf
- script: |
pytest -n 102
condition: succeeded()
displayName: pytest

Просмотреть файл

@ -56,12 +56,11 @@ def create_resource_group(credentials, subscription_id, **kwargs):
resource_group_name=kwargs.get("resource_group", DefaultSettings.resource_group),
parameters={
'location': kwargs.get("region", DefaultSettings.region),
}
)
})
except CloudError as e:
if i == 2:
raise AccountSetupError(
"Unable to create resource group in region {}".format(kwargs.get("region", DefaultSettings.region)))
raise AccountSetupError("Unable to create resource group in region {}".format(
kwargs.get("region", DefaultSettings.region)))
print(e.message)
print("Please try again.")
kwargs["resource_group"] = prompt_with_default("Azure Region", DefaultSettings.region)
@ -82,15 +81,10 @@ def create_storage_account(credentials, subscription_id, **kwargs):
resource_group_name=kwargs.get("resource_group", DefaultSettings.resource_group),
account_name=kwargs.get("storage_account", DefaultSettings.storage_account),
parameters=StorageAccountCreateParameters(
sku=Sku(SkuName.standard_lrs),
kind=Kind.storage,
location=kwargs.get('region', DefaultSettings.region)
)
)
sku=Sku(SkuName.standard_lrs), kind=Kind.storage, location=kwargs.get('region', DefaultSettings.region)))
return storage_account.result().id
def create_batch_account(credentials, subscription_id, **kwargs):
"""
Create a Batch account
@ -108,10 +102,7 @@ def create_batch_account(credentials, subscription_id, **kwargs):
parameters=BatchAccountCreateParameters(
location=kwargs.get('region', DefaultSettings.region),
auto_storage=AutoStorageBaseProperties(
storage_account_id=kwargs.get('storage_account_id', DefaultSettings.region)
)
)
)
storage_account_id=kwargs.get('storage_account_id', DefaultSettings.region))))
return batch_account.result().id
@ -151,19 +142,13 @@ def create_vnet(credentials, subscription_id, **kwargs):
resource_group_name=resource_group_name,
virtual_network_name=kwargs.get("virtual_network_name", DefaultSettings.virtual_network_name),
parameters=VirtualNetwork(
location=kwargs.get("region", DefaultSettings.region),
address_space=AddressSpace(["10.0.0.0/24"])
)
)
location=kwargs.get("region", DefaultSettings.region), address_space=AddressSpace(["10.0.0.0/24"])))
virtual_network = virtual_network.result()
subnet = network_client.subnets.create_or_update(
resource_group_name=resource_group_name,
virtual_network_name=virtual_network_name,
subnet_name=subnet_name,
subnet_parameters=Subnet(
address_prefix='10.0.0.0/24'
)
)
subnet_parameters=Subnet(address_prefix='10.0.0.0/24'))
return subnet.result().id
@ -175,10 +160,7 @@ def create_aad_user(credentials, tenant_id, **kwargs):
:param **application_name: str
"""
graph_rbac_client = GraphRbacManagementClient(
credentials,
tenant_id,
base_url=AZURE_PUBLIC_CLOUD.endpoints.active_directory_graph_resource_id
)
credentials, tenant_id, base_url=AZURE_PUBLIC_CLOUD.endpoints.active_directory_graph_resource_id)
application_credential = uuid.uuid4()
try:
display_name = kwargs.get("application_name", DefaultSettings.application_name)
@ -192,42 +174,32 @@ def create_aad_user(credentials, tenant_id, **kwargs):
start_date=datetime(2000, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
end_date=datetime(2299, 12, 31, 0, 0, 0, 0, tzinfo=timezone.utc),
value=application_credential,
key_id=uuid.uuid4()
)
]
)
)
key_id=uuid.uuid4())
]))
service_principal = graph_rbac_client.service_principals.create(
ServicePrincipalCreateParameters(
app_id=application.app_id,
account_enabled=True
)
)
ServicePrincipalCreateParameters(app_id=application.app_id, account_enabled=True))
except GraphErrorException as e:
if e.inner_exception.code == "Request_BadRequest":
application = next(graph_rbac_client.applications.list(
application = next(
graph_rbac_client.applications.list(
filter="identifierUris/any(c:c eq 'http://{}.com')".format(display_name)))
confirmation_prompt = "Previously created application with name {} found. "\
"Would you like to use it? (y/n): ".format(application.display_name)
prompt_for_confirmation(confirmation_prompt, e, ValueError("Response not recognized. Please try again."))
password_credentials = list(graph_rbac_client.applications.list_password_credentials(application_object_id=application.object_id))
password_credentials = list(
graph_rbac_client.applications.list_password_credentials(application_object_id=application.object_id))
password_credentials.append(
PasswordCredential(
start_date=datetime(2000, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc),
end_date=datetime(2299, 12, 31, 0, 0, 0, 0, tzinfo=timezone.utc),
value=application_credential,
key_id=uuid.uuid4()
)
)
key_id=uuid.uuid4()))
graph_rbac_client.applications.patch(
application_object_id=application.object_id,
parameters=ApplicationUpdateParameters(
password_credentials=password_credentials
)
)
service_principal = next(graph_rbac_client.service_principals.list(
filter="appId eq '{}'".format(application.app_id)))
parameters=ApplicationUpdateParameters(password_credentials=password_credentials))
service_principal = next(
graph_rbac_client.service_principals.list(filter="appId eq '{}'".format(application.app_id)))
else:
raise e
@ -244,21 +216,15 @@ def create_role_assignment(credentials, subscription_id, scope, principal_id):
"""
authorization_client = AuthorizationManagementClient(credentials, subscription_id)
role_name = 'Contributor'
roles = list(authorization_client.role_definitions.list(
scope,
filter="roleName eq '{}'".format(role_name)
))
roles = list(authorization_client.role_definitions.list(scope, filter="roleName eq '{}'".format(role_name)))
contributor_role = roles[0]
for i in range(10):
try:
authorization_client.role_assignments.create(
scope,
uuid.uuid4(),
authorization_client.role_assignments.create(scope, uuid.uuid4(),
{
'role_definition_id': contributor_role.id,
'principal_id': principal_id
}
)
})
break
except CloudError as e:
# ignore error if service principal has not yet been created
@ -321,7 +287,6 @@ def prompt_tenant_selection(tenant_ids):
raise AccountSetupError("Tenant selection not recognized after 3 attempts.")
class Spinner:
busy = False
delay = 0.1
@ -329,7 +294,8 @@ class Spinner:
@staticmethod
def spinning_cursor():
while 1:
for cursor in '|/-\\': yield cursor
for cursor in '|/-\\':
yield cursor
def __init__(self, delay=None):
self.spinner_generator = self.spinning_cursor()
@ -358,7 +324,6 @@ class Spinner:
time.sleep(self.delay)
if __name__ == "__main__":
print("\nGetting credentials.")
# get credentials and tenant_id
@ -374,15 +339,22 @@ if __name__ == "__main__":
"Default values are provided in the brackets. "\
"Hit enter to use default.")
kwargs = {
"region": prompt_with_default("Azure Region", DefaultSettings.region),
"resource_group": prompt_with_default("Resource Group Name", DefaultSettings.resource_group),
"storage_account": prompt_with_default("Storage Account Name", DefaultSettings.storage_account),
"batch_account": prompt_with_default("Batch Account Name", DefaultSettings.batch_account),
"region":
prompt_with_default("Azure Region", DefaultSettings.region),
"resource_group":
prompt_with_default("Resource Group Name", DefaultSettings.resource_group),
"storage_account":
prompt_with_default("Storage Account Name", DefaultSettings.storage_account),
"batch_account":
prompt_with_default("Batch Account Name", DefaultSettings.batch_account),
# "virtual_network_name": prompt_with_default("Virtual Network Name", DefaultSettings.virtual_network_name),
# "subnet_name": prompt_with_default("Subnet Name", DefaultSettings.subnet_name),
"application_name": prompt_with_default("Active Directory Application Name", DefaultSettings.application_name),
"application_credential_name": prompt_with_default("Active Directory Application Credential Name", DefaultSettings.resource_group),
"service_principal": prompt_with_default("Service Principal Name", DefaultSettings.service_principal)
"application_name":
prompt_with_default("Active Directory Application Name", DefaultSettings.application_name),
"application_credential_name":
prompt_with_default("Active Directory Application Credential Name", DefaultSettings.resource_group),
"service_principal":
prompt_with_default("Service Principal Name", DefaultSettings.service_principal)
}
print("Creating the Azure resources.")
@ -410,9 +382,9 @@ if __name__ == "__main__":
with Spinner():
profile = credentials.get_cli_profile()
aad_cred, subscription_id, tenant_id = profile.get_login_credentials(
resource=AZURE_PUBLIC_CLOUD.endpoints.active_directory_graph_resource_id
)
application_id, service_principal_object_id, application_credential = create_aad_user(aad_cred, tenant_id, **kwargs)
resource=AZURE_PUBLIC_CLOUD.endpoints.active_directory_graph_resource_id)
application_id, service_principal_object_id, application_credential = create_aad_user(
aad_cred, tenant_id, **kwargs)
print("Created Azure Active Directory service principal.")
@ -428,7 +400,6 @@ if __name__ == "__main__":
# "subnet_id": subnet_id,
"batch_account_resource_id": batch_account_id,
"storage_account_resource_id": storage_account_id
}
)
})
print("\n# Copy the following into your .aztk/secrets.yaml file\n{}".format(secrets))

Просмотреть файл

@ -32,8 +32,7 @@ def __create_user(self, id: str, node_id: str, username: str, password: str = No
def create_user_on_node(base_client, id, node_id, username, ssh_key=None, password=None):
try:
__create_user(
base_client, id=id, node_id=node_id, username=username, ssh_key=ssh_key, password=password)
__create_user(base_client, id=id, node_id=node_id, username=username, ssh_key=ssh_key, password=password)
except batch_error.BatchErrorException as error:
try:
base_client.delete_user_on_node(id, node_id, username)

Просмотреть файл

@ -1,5 +1,6 @@
import concurrent.futures
#TODO: remove nodes param
def delete_user_on_cluster(base_client, id, nodes, username):
with concurrent.futures.ThreadPoolExecutor() as executor:

Просмотреть файл

@ -102,13 +102,9 @@ def get_log(batch_client, blob_client, cluster_id: str, application_name: str, t
exit_code=task.execution_info.exit_code)
def get_application_log(base_operations,
cluster_id: str,
application_name: str,
tail=False,
current_bytes: int = 0):
def get_application_log(base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
try:
return get_log(base_operations.batch_client, base_operations.blob_client, cluster_id,
application_name, tail, current_bytes)
return get_log(base_operations.batch_client, base_operations.blob_client, cluster_id, application_name, tail,
current_bytes)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -2,7 +2,14 @@ import aztk.models as models
from aztk.utils import ssh as ssh_lib
def ssh_into_node(base_client, pool_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
def ssh_into_node(base_client,
pool_id,
node_id,
username,
ssh_key=None,
password=None,
port_forward_list=None,
internal=False):
if internal:
result = base_client.batch_client.compute_node.get(pool_id=pool_id, node_id=node_id)
rls = models.RemoteLogin(ip_address=result.ip_address, port="22")

Просмотреть файл

@ -26,6 +26,7 @@ class CoreClient:
should be used.**
"""
def _get_context(self, secrets_configuration: models.SecretsConfiguration):
self.secrets_configuration = secrets_configuration
@ -86,7 +87,8 @@ class CoreClient:
return job_exists or pool_exists
@deprecated("0.10.0")
def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, VmImageModel):
def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task,
VmImageModel):
"""
Create a pool and job
:param cluster_conf: the configuration object used to create the cluster
@ -108,8 +110,7 @@ class CoreClient:
network_conf = None
if cluster_conf.subnet_id is not None:
network_conf = batch_models.NetworkConfiguration(
subnet_id=cluster_conf.subnet_id)
network_conf = batch_models.NetworkConfiguration(subnet_id=cluster_conf.subnet_id)
auto_scale_formula = "$TargetDedicatedNodes={0}; $TargetLowPriorityNodes={1}".format(
cluster_conf.size, cluster_conf.size_low_priority)
@ -117,8 +118,7 @@ class CoreClient:
pool = batch_models.PoolAddParameter(
id=pool_id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use,
node_agent_sku_id=sku_to_use),
image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use),
vm_size=cluster_conf.vm_size,
enable_auto_scale=True,
auto_scale_formula=auto_scale_formula,
@ -128,8 +128,7 @@ class CoreClient:
max_tasks_per_node=4,
network_configuration=network_conf,
metadata=[
batch_models.MetadataItem(
name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(
name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_CLUSTER_MODE_METADATA)
])
@ -138,9 +137,7 @@ class CoreClient:
helpers.create_pool_if_not_exist(pool, self.batch_client)
# Create job
job = batch_models.JobAddParameter(
id=job_id,
pool_info=batch_models.PoolInformation(pool_id=pool_id))
job = batch_models.JobAddParameter(id=job_id, pool_info=batch_models.PoolInformation(pool_id=pool_id))
# Add job to batch
self.batch_client.job.add(job)
@ -164,10 +161,8 @@ class CoreClient:
List all the cluster on your account.
"""
pools = self.batch_client.pool.list()
software_metadata = (
constants.AZTK_SOFTWARE_METADATA_KEY, software_metadata_key)
cluster_metadata = (
constants.AZTK_MODE_METADATA_KEY, constants.AZTK_CLUSTER_MODE_METADATA)
software_metadata = (constants.AZTK_SOFTWARE_METADATA_KEY, software_metadata_key)
cluster_metadata = (constants.AZTK_MODE_METADATA_KEY, constants.AZTK_CLUSTER_MODE_METADATA)
aztk_pools = []
for pool in [pool for pool in pools if pool.metadata]:
@ -177,7 +172,8 @@ class CoreClient:
return aztk_pools
@deprecated("0.10.0")
def __create_user(self, pool_id: str, node_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
def __create_user(self, pool_id: str, node_id: str, username: str, password: str = None,
ssh_key: str = None) -> str:
"""
Create a pool user
:param pool: the pool to add the user to
@ -188,14 +184,12 @@ class CoreClient:
"""
# Create new ssh user for the given node
self.batch_client.compute_node.add_user(
pool_id,
node_id,
pool_id, node_id,
batch_models.ComputeNodeUser(
name=username,
is_admin=True,
password=password,
ssh_public_key=get_ssh_key.get_user_public_key(
ssh_key, self.secrets_configuration),
ssh_public_key=get_ssh_key.get_user_public_key(ssh_key, self.secrets_configuration),
expiry_time=datetime.now(timezone.utc) + timedelta(days=365)))
@deprecated("0.10.0")
@ -217,8 +211,7 @@ class CoreClient:
:param node_id
:returns aztk.models.RemoteLogin
"""
result = self.batch_client.compute_node.get_remote_login_settings(
pool_id, node_id)
result = self.batch_client.compute_node.get_remote_login_settings(pool_id, node_id)
return models.RemoteLogin(ip_address=result.remote_login_ip_address, port=str(result.remote_login_port))
@deprecated("0.10.0")
@ -246,11 +239,10 @@ class CoreClient:
ssh_key = RSA.generate(2048)
ssh_pub_key = ssh_key.publickey().exportKey('OpenSSH').decode('utf-8')
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(self.__create_user_on_node,
generated_username,
pool_id,
node.id,
ssh_pub_key): node for node in nodes}
futures = {
executor.submit(self.__create_user_on_node, generated_username, pool_id, node.id, ssh_pub_key): node
for node in nodes
}
concurrent.futures.wait(futures)
return generated_username, ssh_key
@ -258,12 +250,10 @@ class CoreClient:
@deprecated("0.10.0")
def __create_user_on_pool(self, username, pool_id, nodes, ssh_pub_key=None, password=None):
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(self.__create_user_on_node,
username,
pool_id,
node.id,
ssh_pub_key,
password): node for node in nodes}
futures = {
executor.submit(self.__create_user_on_node, username, pool_id, node.id, ssh_pub_key, password): node
for node in nodes
}
concurrent.futures.wait(futures)
@deprecated("0.10.0")
@ -295,8 +285,7 @@ class CoreClient:
node_rls.port,
ssh_key=ssh_key.exportKey().decode('utf-8'),
container_name=container_name,
timeout=timeout
)
timeout=timeout)
return output
finally:
self.__delete_user(cluster_id, node.id, generated_username)
@ -319,9 +308,7 @@ class CoreClient:
cluster_nodes,
ssh_key=ssh_key.exportKey().decode('utf-8'),
container_name=container_name,
timeout=timeout
)
)
timeout=timeout))
return output
except OSError as exc:
raise exc
@ -329,7 +316,14 @@ class CoreClient:
self.__delete_user_on_pool(generated_username, pool.id, nodes)
@deprecated("0.10.0")
def __cluster_copy(self, cluster_id, source_path, destination_path=None, container_name=None, internal=False, get=False, timeout=None):
def __cluster_copy(self,
cluster_id,
source_path,
destination_path=None,
container_name=None,
internal=False,
get=False,
timeout=None):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = list(nodes)
if internal:
@ -348,9 +342,7 @@ class CoreClient:
destination_path=destination_path,
ssh_key=ssh_key.exportKey().decode('utf-8'),
get=get,
timeout=timeout
)
)
timeout=timeout))
return output
except (OSError, batch_error.BatchErrorException) as exc:
raise exc
@ -358,7 +350,14 @@ class CoreClient:
self.__delete_user_on_pool(generated_username, pool.id, nodes)
@deprecated("0.10.0")
def __ssh_into_node(self, pool_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
def __ssh_into_node(self,
pool_id,
node_id,
username,
ssh_key=None,
password=None,
port_forward_list=None,
internal=False):
if internal:
result = self.batch_client.compute_node.get(pool_id=pool_id, node_id=node_id)
rls = models.RemoteLogin(ip_address=result.ip_address, port="22")
@ -376,14 +375,8 @@ class CoreClient:
)
@deprecated("0.10.0")
def __submit_job(self,
job_configuration,
start_task,
job_manager_task,
autoscale_formula,
software_metadata_key: str,
vm_image_model,
application_metadata):
def __submit_job(self, job_configuration, start_task, job_manager_task, autoscale_formula,
software_metadata_key: str, vm_image_model, application_metadata):
"""
Job Submission
:param job_configuration -> aztk_sdk.spark.models.JobConfiguration
@ -404,8 +397,7 @@ class CoreClient:
# set up subnet if necessary
network_conf = None
if job_configuration.subnet_id:
network_conf = batch_models.NetworkConfiguration(
subnet_id=job_configuration.subnet_id)
network_conf = batch_models.NetworkConfiguration(subnet_id=job_configuration.subnet_id)
# set up a schedule for a recurring job
auto_pool_specification = batch_models.AutoPoolSpecification(
@ -415,8 +407,7 @@ class CoreClient:
pool=batch_models.PoolSpecification(
display_name=job_configuration.id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use,
node_agent_sku_id=sku_to_use),
image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use),
vm_size=job_configuration.vm_size,
enable_auto_scale=True,
auto_scale_formula=autoscale_formula,
@ -426,13 +417,10 @@ class CoreClient:
network_configuration=network_conf,
max_tasks_per_node=4,
metadata=[
batch_models.MetadataItem(
name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(
name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_JOB_MODE_METADATA)
]
)
)
]))
# define job specification
job_spec = batch_models.JobSpecification(
@ -440,25 +428,15 @@ class CoreClient:
display_name=job_configuration.id,
on_all_tasks_complete=batch_models.OnAllTasksComplete.terminate_job,
job_manager_task=job_manager_task,
metadata=[
batch_models.MetadataItem(
name='applications', value=application_metadata)
]
)
metadata=[batch_models.MetadataItem(name='applications', value=application_metadata)])
# define schedule
schedule = batch_models.Schedule(
do_not_run_until=None,
do_not_run_after=None,
start_window=None,
recurrence_interval=None
)
do_not_run_until=None, do_not_run_after=None, start_window=None, recurrence_interval=None)
# create job schedule and add task
setup = batch_models.JobScheduleAddParameter(
id=job_configuration.id,
schedule=schedule,
job_specification=job_spec)
id=job_configuration.id, schedule=schedule, job_specification=job_spec)
self.batch_client.job_schedule.add(setup)

Просмотреть файл

@ -8,7 +8,14 @@ from aztk.utils import ssh as ssh_lib
from aztk.utils import helpers
def cluster_copy(cluster_operations, cluster_id, source_path, destination_path=None, container_name=None, internal=False, get=False, timeout=None):
def cluster_copy(cluster_operations,
cluster_id,
source_path,
destination_path=None,
container_name=None,
internal=False,
get=False,
timeout=None):
cluster = cluster_operations.get(cluster_id)
pool, nodes = cluster.pool, list(cluster.nodes)
if internal:
@ -31,9 +38,7 @@ def cluster_copy(cluster_operations, cluster_id, source_path, destination_path=N
destination_path=destination_path,
ssh_key=ssh_key.exportKey().decode('utf-8'),
get=get,
timeout=timeout
)
)
timeout=timeout))
return output
except (OSError, batch_error.BatchErrorException) as exc:
raise exc

Просмотреть файл

@ -5,7 +5,8 @@ from aztk import models
from aztk.utils import helpers, constants
def create_pool_and_job(core_cluster_operations, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, VmImageModel):
def create_pool_and_job(core_cluster_operations, cluster_conf: models.ClusterConfiguration, software_metadata_key: str,
start_task, VmImageModel):
"""
Create a pool and job
:param cluster_conf: the configuration object used to create the cluster
@ -27,8 +28,7 @@ def create_pool_and_job(core_cluster_operations, cluster_conf: models.ClusterCon
network_conf = None
if cluster_conf.subnet_id is not None:
network_conf = batch_models.NetworkConfiguration(
subnet_id=cluster_conf.subnet_id)
network_conf = batch_models.NetworkConfiguration(subnet_id=cluster_conf.subnet_id)
auto_scale_formula = "$TargetDedicatedNodes={0}; $TargetLowPriorityNodes={1}".format(
cluster_conf.size, cluster_conf.size_low_priority)
@ -36,8 +36,7 @@ def create_pool_and_job(core_cluster_operations, cluster_conf: models.ClusterCon
pool = batch_models.PoolAddParameter(
id=pool_id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use,
node_agent_sku_id=sku_to_use),
image_reference=image_ref_to_use, node_agent_sku_id=sku_to_use),
vm_size=cluster_conf.vm_size,
enable_auto_scale=True,
auto_scale_formula=auto_scale_formula,
@ -47,8 +46,7 @@ def create_pool_and_job(core_cluster_operations, cluster_conf: models.ClusterCon
max_tasks_per_node=4,
network_configuration=network_conf,
metadata=[
batch_models.MetadataItem(
name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(
name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_CLUSTER_MODE_METADATA)
])
@ -57,9 +55,7 @@ def create_pool_and_job(core_cluster_operations, cluster_conf: models.ClusterCon
helpers.create_pool_if_not_exist(pool, core_cluster_operations.batch_client)
# Create job
job = batch_models.JobAddParameter(
id=job_id,
pool_info=batch_models.PoolInformation(pool_id=pool_id))
job = batch_models.JobAddParameter(id=job_id, pool_info=batch_models.PoolInformation(pool_id=pool_id))
# Add job to batch
core_cluster_operations.batch_client.job.add(job)

Просмотреть файл

@ -1,5 +1,3 @@
#TODO: return Cluster instead of (pool, nodes)
from aztk import models

Просмотреть файл

@ -7,10 +7,8 @@ def list_clusters(cluster_client, software_metadata_key):
List all the cluster on your account.
"""
pools = cluster_client.batch_client.pool.list()
software_metadata = (
constants.AZTK_SOFTWARE_METADATA_KEY, software_metadata_key)
cluster_metadata = (
constants.AZTK_MODE_METADATA_KEY, constants.AZTK_CLUSTER_MODE_METADATA)
software_metadata = (constants.AZTK_SOFTWARE_METADATA_KEY, software_metadata_key)
cluster_metadata = (constants.AZTK_MODE_METADATA_KEY, constants.AZTK_CLUSTER_MODE_METADATA)
aztk_clusters = []
for pool in [pool for pool in pools if pool.metadata]:

Просмотреть файл

@ -4,15 +4,8 @@ import azure.batch.models as batch_models
from aztk.utils import helpers, constants
def submit_job(
job_client,
job_configuration,
start_task,
job_manager_task,
autoscale_formula,
software_metadata_key: str,
vm_image_model,
application_metadata):
def submit_job(job_client, job_configuration, start_task, job_manager_task, autoscale_formula,
software_metadata_key: str, vm_image_model, application_metadata):
"""
Job Submission
:param job_configuration -> aztk_sdk.spark.models.JobConfiguration
@ -54,7 +47,8 @@ def submit_job(
max_tasks_per_node=4,
metadata=[
batch_models.MetadataItem(name=constants.AZTK_SOFTWARE_METADATA_KEY, value=software_metadata_key),
batch_models.MetadataItem(name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_JOB_MODE_METADATA)
batch_models.MetadataItem(
name=constants.AZTK_MODE_METADATA_KEY, value=constants.AZTK_JOB_MODE_METADATA)
]))
# define job specification
@ -66,7 +60,8 @@ def submit_job(
metadata=[batch_models.MetadataItem(name='applications', value=application_metadata)])
# define schedule
schedule = batch_models.Schedule(do_not_run_until=None, do_not_run_after=None, start_window=None, recurrence_interval=None)
schedule = batch_models.Schedule(
do_not_run_until=None, do_not_run_after=None, start_window=None, recurrence_interval=None)
# create job schedule and add task
setup = batch_models.JobScheduleAddParameter(id=job_configuration.id, schedule=schedule, job_specification=job_spec)

Просмотреть файл

@ -2,8 +2,10 @@ import collections
import enum
from aztk.error import InvalidModelFieldError
from . import validators as aztk_validators
class ModelMergeStrategy(enum.Enum):
Override = 1
"""
@ -14,6 +16,7 @@ class ModelMergeStrategy(enum.Enum):
Try to merge value nested
"""
class ListMergeStrategy(enum.Enum):
Replace = 1
"""
@ -24,11 +27,13 @@ class ListMergeStrategy(enum.Enum):
Append all the values of the new list
"""
# pylint: disable=W0212
class Field:
"""
Base class for all model fields
"""
def __init__(self, *validators, **kwargs):
self.default = kwargs.get('default')
self.required = 'default' not in kwargs
@ -99,6 +104,7 @@ class Integer(Field):
"""
Model Integer field
"""
def __init__(self, *args, **kwargs):
super().__init__(aztk_validators.Integer(), *args, **kwargs)
@ -132,8 +138,7 @@ class List(Field):
self.merge_strategy = kwargs.get('merge_strategy', ListMergeStrategy.Append)
self.skip_none = kwargs.get('skip_none', True)
super().__init__(
aztk_validators.List(*kwargs.get('inner_validators', [])), **kwargs)
super().__init__(aztk_validators.List(*kwargs.get('inner_validators', [])), **kwargs)
def __set__(self, instance, value):
if isinstance(value, collections.MutableSequence):
@ -176,6 +181,7 @@ class List(Field):
output.append(item)
return output
class Model(Field):
"""
Field is another model
@ -214,10 +220,12 @@ class Model(Field):
else:
return None
class Enum(Field):
"""
Field that should be an enum
"""
def __init__(self, model, *args, **kwargs):
super().__init__(aztk_validators.InstanceOf(model), *args, **kwargs)
@ -232,7 +240,6 @@ class Enum(Field):
raise InvalidModelFieldError("{0} is not a valid option. Use one of {1}".format(value, available))
super().__set__(instance, value)
def serialize(self, instance):
val = super().serialize(instance)
if val is not None:

Просмотреть файл

@ -9,6 +9,7 @@ class Validator:
To write your validator extend this class and implement the validate method.
To raise an error raise InvalidModelFieldError
"""
def __call__(self, value):
self.validate(value)
@ -16,7 +17,6 @@ class Validator:
raise NotImplementedError()
class Required(Validator):
"""
Validate the field value is not `None`
@ -77,7 +77,6 @@ class Boolean(Validator):
raise InvalidModelFieldError('{0} should be a boolean'.format(value))
class In(Validator):
"""
Validate the field value is in the list of allowed choices
@ -93,6 +92,7 @@ class In(Validator):
if value not in self.choices:
raise InvalidModelFieldError('{0} should be in {1}'.format(value, self.choices))
class InstanceOf(Validator):
"""
Check if the field is an instance of the given type
@ -106,8 +106,7 @@ class InstanceOf(Validator):
return
if not isinstance(value, self.type):
raise InvalidModelFieldError(
"should be an instance of '{}'".format(self.type.__name__))
raise InvalidModelFieldError("should be an instance of '{}'".format(self.type.__name__))
class Model(Validator):
@ -123,8 +122,7 @@ class Model(Validator):
return
if not isinstance(value, self.model):
raise InvalidModelFieldError(
"should be an instance of '{}'".format(self.model.__name__))
raise InvalidModelFieldError("should be an instance of '{}'".format(self.model.__name__))
value.validate()

Просмотреть файл

@ -11,15 +11,19 @@ class AztkError(Exception):
class AztkAttributeError(AztkError):
pass
class ClusterNotReadyError(AztkError):
pass
class AzureApiInitError(AztkError):
pass
class InvalidPluginConfigurationError(AztkError):
pass
class InvalidModelError(AztkError):
def __init__(self, message: str, model=None):
super().__init__()
@ -34,12 +38,15 @@ class InvalidModelError(AztkError):
class MissingRequiredAttributeError(InvalidModelError):
pass
class InvalidCustomScriptError(InvalidModelError):
pass
class InvalidPluginReferenceError(InvalidModelError):
pass
class InvalidModelFieldError(InvalidModelError):
def __init__(self, message: str, model=None, field=None):
super().__init__(message, model)

Просмотреть файл

@ -2,17 +2,18 @@ import azure.batch.models as batch_models
import datetime
from azure.storage.blob import BlockBlobService, BlobPermissions
class BlobData:
"""
Object mapping to a blob entry. Can generate resource files for batch
"""
def __init__(self, blob_client: BlockBlobService, container: str, blob: str):
self.container = container
self.blob = blob
self.dest = blob
self.blob_client = blob_client
def to_resource_file(self, dest: str = None) -> batch_models.ResourceFile:
sas_token = self.blob_client.generate_blob_shared_access_signature(
self.container,
@ -20,7 +21,6 @@ class BlobData:
permission=BlobPermissions.READ,
expiry=datetime.datetime.utcnow() + datetime.timedelta(days=365))
sas_url = self.blob_client.make_blob_url(
self.container, self.blob, sas_token=sas_token)
sas_url = self.blob_client.make_blob_url(self.container, self.blob, sas_token=sas_token)
return batch_models.ResourceFile(file_path=dest or self.dest, blob_source=sas_url)

Просмотреть файл

@ -70,7 +70,7 @@ class NodeData:
relative_folder = os.path.relpath(base, path)
for file in files:
if self._includeFile(file, exclude):
self.add_file(os.path.join(base, file), os.path.join(dest, relative_folder), binary = False)
self.add_file(os.path.join(base, file), os.path.join(dest, relative_folder), binary=False)
def _add_custom_scripts(self):
data = []
@ -90,7 +90,8 @@ class NodeData:
raise InvalidCustomScriptError("Custom script '{0}' doesn't exists.".format(custom_script.script))
elif isinstance(custom_script.script, models.File):
new_file_name = str(index) + '_' + custom_script.script.name
self.zipf.writestr(os.path.join('custom-scripts', new_file_name), custom_script.script.payload.getvalue())
self.zipf.writestr(
os.path.join('custom-scripts', new_file_name), custom_script.script.payload.getvalue())
self.zipf.writestr(
os.path.join(CUSTOM_SCRIPT_FOLDER, CUSTOM_SCRIPT_METADATA_FILE), yaml.dump(data, default_flow_style=False))
@ -108,8 +109,8 @@ class NodeData:
binary=False)
# add ssh keys for passwordless ssh
self.zipf.writestr( 'id_rsa.pub', spark_configuration.ssh_key_pair['pub_key'])
self.zipf.writestr( 'id_rsa', spark_configuration.ssh_key_pair['priv_key'])
self.zipf.writestr('id_rsa.pub', spark_configuration.ssh_key_pair['pub_key'])
self.zipf.writestr('id_rsa', spark_configuration.ssh_key_pair['priv_key'])
if spark_configuration.jars:
for jar in spark_configuration.jars:
@ -141,7 +142,8 @@ class NodeData:
for file in plugin.files:
zipf = self.zipf.writestr('plugins/{0}/{1}'.format(plugin.name, file.target), file.content())
if plugin.execute:
data.append(dict(
data.append(
dict(
name=plugin.name,
execute='{0}/{1}'.format(plugin.name, plugin.execute),
args=plugin.args,

Просмотреть файл

@ -1,6 +1,7 @@
import yaml
from aztk.error import AztkError, InvalidModelError
class ConfigurationBase:
"""
Base class for any configuration.
@ -19,7 +20,6 @@ class ConfigurationBase:
pretty_args = yaml.dump(args, default_flow_style=False)
raise AztkError("{0} {1}\n{2}".format(cls.__name__, str(e), pretty_args))
@classmethod
def _from_dict(cls, args: dict):
clean = dict((k, v) for k, v in args.items() if v)

Просмотреть файл

@ -1,6 +1,7 @@
import os
from aztk.utils.command_builder import CommandBuilder
class DockerCmd:
"""
Class helping to write a docker command
@ -17,7 +18,6 @@ class DockerCmd:
self.cmd.add_argument(docker_repo)
self.cmd.add_argument(cmd)
def add_env(self, env: str, value: str):
self.cmd.add_option('-e', '{0}={1}'.format(env, value))
@ -33,6 +33,5 @@ class DockerCmd:
def open_port(self, port: int):
self.cmd.add_option('-p', '{0}:{0}'.format(port)) # Spark Master UI
def to_str(self):
return self.cmd.to_str()

Просмотреть файл

@ -1,9 +1,8 @@
import azure.batch.models as batch_models
class Cluster:
def __init__(self,
pool: batch_models.CloudPool,
nodes: batch_models.ComputeNodePaged = None):
def __init__(self, pool: batch_models.CloudPool, nodes: batch_models.ComputeNodePaged = None):
self.id = pool.id
self.pool = pool
self.nodes = nodes
@ -20,4 +19,3 @@ class Cluster:
self.current_low_pri_nodes = pool.current_low_priority_nodes
self.target_dedicated_nodes = pool.target_dedicated_nodes
self.target_low_pri_nodes = pool.target_low_priority_nodes

Просмотреть файл

@ -1,6 +1,6 @@
import aztk.error as error
from aztk.core.models import Model, fields
from aztk.utils import deprecated,deprecate, helpers
from aztk.utils import deprecated, deprecate, helpers
from .custom_script import CustomScript
from .file_share import FileShare
@ -9,6 +9,7 @@ from .toolkit import Toolkit
from .user_configuration import UserConfiguration
from .scheduling_target import SchedulingTarget
class ClusterConfiguration(Model):
"""
Cluster configuration model
@ -45,7 +46,8 @@ class ClusterConfiguration(Model):
kwargs['size'] = kwargs.pop('vm_count')
if 'vm_low_pri_count' in kwargs:
deprecate("vm_low_pri_count is deprecated for ClusterConfiguration.", "Please use size_low_priority instead.")
deprecate("vm_low_pri_count is deprecated for ClusterConfiguration.",
"Please use size_low_priority instead.")
kwargs['size_low_priority'] = kwargs.pop('vm_low_pri_count')
super().__init__(*args, **kwargs)
@ -77,7 +79,6 @@ class ClusterConfiguration(Model):
"""
return self.size > 0 and self.size_low_priority > 0
def gpu_enabled(self):
return helpers.is_gpu_enabled(self.vm_size)
@ -92,8 +93,7 @@ class ClusterConfiguration(Model):
if self.vm_size is None:
raise error.InvalidModelError(
"Please supply a vm_size in either the cluster.yaml configuration file or with a parameter (--vm-size)"
)
"Please supply a vm_size in either the cluster.yaml configuration file or with a parameter (--vm-size)")
if self.mixed_mode() and not self.subnet_id:
raise error.InvalidModelError(
@ -101,7 +101,8 @@ class ClusterConfiguration(Model):
)
if self.custom_scripts:
deprecate("0.9.0", "Custom scripts are DEPRECATED.", "Use plugins instead. See https://aztk.readthedocs.io/en/v0.7.0/15-plugins.html.")
deprecate("0.9.0", "Custom scripts are DEPRECATED.",
"Use plugins instead. See https://aztk.readthedocs.io/en/v0.7.0/15-plugins.html.")
if self.scheduling_target == SchedulingTarget.Dedicated and self.size == 0:
raise error.InvalidModelError("Scheduling target cannot be Dedicated if dedicated vm size is 0")

Просмотреть файл

@ -1,5 +1,6 @@
from aztk.core.models import Model, fields
class CustomScript(Model):
name = fields.String()
script = fields.String()

Просмотреть файл

@ -1,5 +1,6 @@
import io
class File:
def __init__(self, name: str, payload: io.StringIO):
self.name = name

Просмотреть файл

@ -1,5 +1,6 @@
from aztk.core.models import Model, fields
class FileShare(Model):
storage_account_name = fields.String()
storage_account_key = fields.String()

Просмотреть файл

@ -1,6 +1,7 @@
from tempfile import SpooledTemporaryFile
from typing import Union
class NodeOutput:
def __init__(self, id: str, output: Union[SpooledTemporaryFile, str] = None, error: Exception = None):
self.id = id

Просмотреть файл

@ -28,8 +28,7 @@ class PluginManager:
nvblas=plugins.NvBLASPlugin,
apt_get=plugins.AptGetPlugin,
pip_install=plugins.PipPlugin,
conda_install=plugins.CondaPlugin
)
conda_install=plugins.CondaPlugin)
def __init__(self):
self.loaded = False
@ -51,7 +50,8 @@ class PluginManager:
args = dict()
for key, param in signature.parameters.items():
if param.kind == param.POSITIONAL_OR_KEYWORD or param.kind == param.KEYWORD_ONLY:
args[key] = PluginArgument(key, default=param.default, required=param.default is inspect.Parameter.empty)
args[key] = PluginArgument(
key, default=param.default, required=param.default is inspect.Parameter.empty)
return args
@ -66,17 +66,14 @@ class PluginManager:
for arg in plugin_args.values():
if args.get(arg.name) is None:
if arg.required:
message = "Missing a required argument {0} for plugin {1}".format(
arg.name, plugin_cls.__name__)
message = "Missing a required argument {0} for plugin {1}".format(arg.name, plugin_cls.__name__)
raise InvalidPluginReferenceError(message)
args[arg.name] = arg.default
def _validate_no_extra_args(self, plugin_cls, plugin_args: dict, args: dict):
for name in args:
if not name in plugin_args:
message = "Plugin {0} doesn't have an argument called '{1}'".format(
plugin_cls.__name__, name)
message = "Plugin {0} doesn't have an argument called '{1}'".format(plugin_cls.__name__, name)
raise InvalidPluginReferenceError(message)

Просмотреть файл

@ -42,6 +42,7 @@ class PluginPort(Model):
return self.public
return None
class PluginConfiguration(Model):
"""
Plugin manifest that should be returned in the main.py of your plugin

Просмотреть файл

@ -2,6 +2,7 @@ import io
from typing import Union
from aztk.core.models import Model, fields
class PluginFile(Model):
"""
Reference to a file for a plugin.
@ -29,7 +30,7 @@ class TextPluginFile(Model):
target = fields.String()
def __init__(self, target: str, content: Union[str,io.StringIO]):
def __init__(self, target: str, content: Union[str, io.StringIO]):
super().__init__(target=target)
if isinstance(content, str):
self._content = content

Просмотреть файл

@ -1,5 +1,6 @@
from aztk.core.models import Model, fields
class PortForwardingSpecification(Model):
remote_port = fields.Integer()
local_port = fields.Integer()

Просмотреть файл

@ -1,5 +1,6 @@
from enum import Enum
class SchedulingTarget(Enum):
"""
Target where task will get scheduled.

Просмотреть файл

@ -1,6 +1,7 @@
from aztk.core.models import Model, fields
from aztk.error import InvalidModelError
class ServicePrincipalConfiguration(Model):
"""
Container class for AAD authentication
@ -11,6 +12,7 @@ class ServicePrincipalConfiguration(Model):
batch_account_resource_id = fields.String()
storage_account_resource_id = fields.String()
class SharedKeyConfiguration(Model):
"""
Container class for shared key authentication
@ -46,14 +48,10 @@ class SecretsConfiguration(Model):
def __validate__(self):
if self.service_principal and self.shared_key:
raise InvalidModelError(
"Both service_principal and shared_key auth are configured, must use only one"
)
raise InvalidModelError("Both service_principal and shared_key auth are configured, must use only one")
if not self.service_principal and not self.shared_key:
raise InvalidModelError(
"Neither service_principal and shared_key auth are configured, must use only one"
)
raise InvalidModelError("Neither service_principal and shared_key auth are configured, must use only one")
def is_aad(self):
return self.service_principal is not None

Просмотреть файл

@ -2,16 +2,19 @@ from aztk.error import InvalidModelError
from aztk.utils import constants, deprecate
from aztk.core.models import Model, fields
class ToolkitDefinition:
def __init__(self, versions, environments):
self.versions = versions
self.environments = environments
class ToolkitEnvironmentDefinition:
def __init__(self, versions=None, default=""):
self.versions = versions or [""]
self.default = default
TOOLKIT_MAP = dict(
spark=ToolkitDefinition(
versions=["1.6.3", "2.1.0", "2.2.0", "2.3.0"],
@ -20,8 +23,7 @@ TOOLKIT_MAP = dict(
r=ToolkitEnvironmentDefinition(),
miniconda=ToolkitEnvironmentDefinition(),
anaconda=ToolkitEnvironmentDefinition(),
)
),
)),
)
@ -69,7 +71,6 @@ class Toolkit(Model):
"Environment '{0}' version '{1}' for toolkit '{2}' is not available. Use one of: {3}".format(
self.environment, self.environment_version, self.software, env_def.versions))
def get_docker_repo(self, gpu: bool):
if self.docker_repo:
return self.docker_repo
@ -97,7 +98,6 @@ class Toolkit(Model):
return '-'.join(array)
def _get_environment_definition(self) -> ToolkitEnvironmentDefinition:
toolkit = TOOLKIT_MAP.get(self.software)

Просмотреть файл

@ -1,5 +1,6 @@
from aztk.core.models import Model, fields
class UserConfiguration(Model):
username = fields.String()
ssh_key = fields.String(default=None)

Просмотреть файл

@ -10,8 +10,6 @@ from azure.mgmt.batch import BatchManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.storage.common import CloudStorageAccount
RESOURCE_ID_PATTERN = re.compile('^/subscriptions/(?P<subscription>[^/]+)'
'/resourceGroups/(?P<resourcegroup>[^/]+)'
'/providers/[^/]+'
@ -39,52 +37,42 @@ storage_account_name = os.environ.get("STORAGE_ACCOUNT_NAME")
storage_account_key = os.environ.get("STORAGE_ACCOUNT_KEY")
storage_account_suffix = os.environ.get("STORAGE_ACCOUNT_SUFFIX")
def get_blob_client() -> blob.BlockBlobService:
if not storage_resource_id:
return blob.BlockBlobService(
account_name=storage_account_name,
account_key=storage_account_key,
endpoint_suffix=storage_account_suffix)
account_name=storage_account_name, account_key=storage_account_key, endpoint_suffix=storage_account_suffix)
else:
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=credential,
tenant=tenant_id,
resource='https://management.core.windows.net/')
client_id=client_id, secret=credential, tenant=tenant_id, resource='https://management.core.windows.net/')
m = RESOURCE_ID_PATTERN.match(storage_resource_id)
accountname = m.group('account')
subscription = m.group('subscription')
resourcegroup = m.group('resourcegroup')
mgmt_client = StorageManagementClient(credentials, subscription)
key = mgmt_client.storage_accounts.list_keys(resource_group_name=resourcegroup,
account_name=accountname).keys[0].value
key = mgmt_client.storage_accounts.list_keys(
resource_group_name=resourcegroup, account_name=accountname).keys[0].value
storage_client = CloudStorageAccount(accountname, key)
return storage_client.create_block_blob_service()
def get_batch_client() -> batch.BatchServiceClient:
if not batch_resource_id:
base_url = batch_service_url
credentials = batchauth.SharedKeyCredentials(
batch_account_name,
batch_account_key)
credentials = batchauth.SharedKeyCredentials(batch_account_name, batch_account_key)
else:
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=credential,
tenant=tenant_id,
resource='https://management.core.windows.net/')
client_id=client_id, secret=credential, tenant=tenant_id, resource='https://management.core.windows.net/')
m = RESOURCE_ID_PATTERN.match(batch_resource_id)
batch_client = BatchManagementClient(credentials, m.group('subscription'))
account = batch_client.batch_account.get(m.group('resourcegroup'), m.group('account'))
base_url = 'https://%s/' % account.account_endpoint
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=credential,
tenant=tenant_id,
resource='https://batch.core.windows.net/')
client_id=client_id, secret=credential, tenant=tenant_id, resource='https://batch.core.windows.net/')
return batch.BatchServiceClient(credentials, base_url=base_url)
batch_client = get_batch_client()
blob_client = get_blob_client()

Просмотреть файл

@ -5,6 +5,7 @@ log = logging.getLogger("aztk.node-agent")
DEFAULT_FORMAT = '%(message)s'
def setup_logging():
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)

Просмотреть файл

@ -9,6 +9,7 @@ import yaml
Creates a user if the user configuration file at $AZTK_WORKING_DIR/user.yaml exists
'''
def create_user(batch_client):
path = os.path.join(os.environ['AZTK_WORKING_DIR'], "user.yaml")
@ -30,12 +31,11 @@ def create_user(batch_client):
is_admin=True,
password=password,
ssh_public_key=str(user_conf['ssh-key']),
expiry_time=datetime.now(timezone.utc) + timedelta(days=365)
)
)
expiry_time=datetime.now(timezone.utc) + timedelta(days=365)))
except batch_error.BatchErrorException as e:
print(e)
def decrypt_password(user_conf):
cipher_text = user_conf['password']
encrypted_aes_session_key = user_conf['aes_session_key']

Просмотреть файл

@ -13,6 +13,7 @@ def read_cluster_config():
print("Got cluster config", cluster_config)
return cluster_config
def setup_host(docker_repo: str):
"""
Code to be run on the node(NOT in a container)

Просмотреть файл

@ -2,6 +2,7 @@ from azure import batch
from aztk.models import ClusterConfiguration, SchedulingTarget
from core import config, log
def disable_scheduling(batch_client: batch.BatchServiceClient):
"""
Disable scheduling for the current node
@ -16,6 +17,7 @@ def disable_scheduling(batch_client: batch.BatchServiceClient):
else:
log.info("Task scheduling is already disabled for this node")
def enable_scheduling(batch_client: batch.BatchServiceClient):
"""
Disable scheduling for the current node
@ -31,9 +33,7 @@ def enable_scheduling(batch_client: batch.BatchServiceClient):
log.info("Task scheduling is already enabled for this node")
def setup_node_scheduling(
batch_client: batch.BatchServiceClient,
cluster_config: ClusterConfiguration,
def setup_node_scheduling(batch_client: batch.BatchServiceClient, cluster_config: ClusterConfiguration,
is_master: bool):
is_dedicated = config.is_dedicated

Просмотреть файл

@ -10,9 +10,11 @@ from core import config
MASTER_NODE_METADATA_KEY = "_spark_master_node"
class CannotAllocateMasterError(Exception):
pass
def get_master_node_id(pool: batchmodels.CloudPool):
"""
:returns: the id of the node that is the assigned master of this pool
@ -26,15 +28,16 @@ def get_master_node_id(pool: batchmodels.CloudPool):
return None
def try_assign_self_as_master(client: batch.BatchServiceClient, pool: batchmodels.CloudPool):
current_metadata = pool.metadata or []
new_metadata = current_metadata + [{"name": MASTER_NODE_METADATA_KEY, "value": config.node_id}]
try:
client.pool.patch(config.pool_id, batchmodels.PoolPatchParameter(
metadata=new_metadata
), batchmodels.PoolPatchOptions(
if_match=pool.e_tag,
client.pool.patch(
config.pool_id,
batchmodels.PoolPatchParameter(metadata=new_metadata),
batchmodels.PoolPatchOptions(if_match=pool.e_tag,
))
return True
except (batcherror.BatchErrorException, ClientRequestError):

Просмотреть файл

@ -5,8 +5,8 @@ import subprocess
from pathlib import Path
from aztk.models.plugins import PluginTarget, PluginTargetRole
log_folder = os.path.join(os.environ['AZTK_WORKING_DIR'], 'logs', 'plugins')
log_folder = os.path.join(os.environ['AZTK_WORKING_DIR'], 'logs','plugins')
def _read_manifest_file(path=None):
if not os.path.isfile(path):
@ -19,12 +19,10 @@ def _read_manifest_file(path=None):
print("Error in plugins manifest: {0}".format(err))
def setup_plugins(target: PluginTarget, is_master: bool = False, is_worker: bool = False):
plugins_dir = _plugins_dir()
plugins_manifest = _read_manifest_file(
os.path.join(plugins_dir, 'plugins-manifest.yaml'))
plugins_manifest = _read_manifest_file(os.path.join(plugins_dir, 'plugins-manifest.yaml'))
if not os.path.exists(log_folder):
os.makedirs(log_folder)
@ -39,14 +37,12 @@ def _plugins_dir():
def _run_on_this_node(plugin_obj, target: PluginTarget, is_master, is_worker):
print("Loading plugin {} in {} on {}".format(
plugin_obj["execute"],
plugin_obj['target'],
plugin_obj['target_role']
))
print("Loading plugin {} in {} on {}".format(plugin_obj["execute"], plugin_obj['target'],
plugin_obj['target_role']))
if plugin_obj['target'] != target.value:
print("Ignoring ", plugin_obj["execute"], "as target is for ", plugin_obj['target'], "but is currently running in ", target.value)
print("Ignoring ", plugin_obj["execute"], "as target is for ", plugin_obj['target'],
"but is currently running in ", target.value)
return False
if plugin_obj['target_role'] == PluginTargetRole.Master.value and is_master is True:
@ -56,7 +52,8 @@ def _run_on_this_node(plugin_obj, target: PluginTarget, is_master, is_worker):
if plugin_obj['target_role'] == PluginTargetRole.All.value:
return True
print("Ignoring plugin", plugin_obj["execute"], "as target role is ", plugin_obj['target_role'], "and node is master: ", is_master, is_worker)
print("Ignoring plugin", plugin_obj["execute"], "as target role is ", plugin_obj['target_role'],
"and node is master: ", is_master, is_worker)
return False
@ -72,8 +69,7 @@ def _setup_plugins(plugins_manifest, target: PluginTarget, is_master, is_worker)
def _run_script(name: str, script_path: str = None, args: dict = None, env: dict = None):
if not os.path.isfile(script_path):
print("Cannot run plugin script: {0} file does not exist".format(
script_path))
print("Cannot run plugin script: {0} file does not exist".format(script_path))
return
file_stat = os.stat(script_path)
os.chmod(script_path, file_stat.st_mode | 0o777)
@ -90,11 +86,7 @@ def _run_script(name: str, script_path: str = None, args: dict = None, env: dict
out_file = open(os.path.join(log_folder, '{0}.txt'.format(name)), 'w', encoding='UTF-8')
try:
subprocess.call(
[script_path] + args,
env=my_env,
stdout=out_file,
stderr=out_file)
subprocess.call([script_path] + args, env=my_env, stdout=out_file, stderr=out_file)
print("Finished running")
print("------------------------------------------------------------------")
except Exception as e:

Просмотреть файл

@ -47,10 +47,11 @@ def _run_script(script_path: str = None):
os.chmod(script_path, file_stat.st_mode | 0o777)
print("Running custom script:", script_path)
try:
subprocess.call([script_path], shell = True)
subprocess.call([script_path], shell=True)
except Exception as e:
print(e)
def _run_scripts_dir(root: str = None):
try:
for path, subdirs, files in os.walk(root):

Просмотреть файл

@ -29,6 +29,7 @@ def setup_as_worker():
setup_connection()
start_spark_worker()
def get_pool() -> batchmodels.CloudPool:
return batch_client.pool.get(config.pool_id)
@ -50,15 +51,13 @@ def setup_connection():
"""
This setup spark config with which nodes are slaves and which are master
"""
master_node_id = pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
master_node_id = pick_master.get_master_node_id(batch_client.pool.get(config.pool_id))
master_node = get_node(master_node_id)
master_config_file = os.path.join(spark_conf_folder, "master")
master_file = open(master_config_file, 'w', encoding='UTF-8')
print("Adding master node ip {0} to config file '{1}'".format(
master_node.ip_address, master_config_file))
print("Adding master node ip {0} to config file '{1}'".format(master_node.ip_address, master_config_file))
master_file.write("{0}\n".format(master_node.ip_address))
master_file.close()
@ -66,8 +65,7 @@ def setup_connection():
def wait_for_master():
print("Waiting for master to be ready.")
master_node_id = pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
master_node_id = pick_master.get_master_node_id(batch_client.pool.get(config.pool_id))
if master_node_id == config.node_id:
return
@ -85,8 +83,7 @@ def wait_for_master():
def start_spark_master():
master_ip = get_node(config.node_id).ip_address
exe = os.path.join(spark_home, "sbin", "start-master.sh")
cmd = [exe, "-h", master_ip, "--webui-port",
str(config.spark_web_ui_port)]
cmd = [exe, "-h", master_ip, "--webui-port", str(config.spark_web_ui_port)]
print("Starting master with '{0}'".format(" ".join(cmd)))
call(cmd)
try:
@ -99,12 +96,10 @@ def start_spark_master():
def start_spark_worker():
wait_for_master()
exe = os.path.join(spark_home, "sbin", "start-slave.sh")
master_node_id = pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
master_node_id = pick_master.get_master_node_id(batch_client.pool.get(config.pool_id))
master_node = get_node(master_node_id)
cmd = [exe, "spark://{0}:7077".format(master_node.ip_address),
"--webui-port", str(config.spark_worker_ui_port)]
cmd = [exe, "spark://{0}:7077".format(master_node.ip_address), "--webui-port", str(config.spark_worker_ui_port)]
print("Connecting to master with '{0}'".format(" ".join(cmd)))
call(cmd)

Просмотреть файл

@ -3,11 +3,8 @@ import subprocess
from aztk.internal import DockerCmd
from aztk.utils import constants
def start_spark_container(
docker_repo: str=None,
gpu_enabled: bool=False,
file_mounts=None,
plugins=None):
def start_spark_container(docker_repo: str = None, gpu_enabled: bool = False, file_mounts=None, plugins=None):
cmd = DockerCmd(
name=constants.DOCKER_SPARK_CONTAINER_NAME,
@ -62,10 +59,10 @@ def start_spark_container(
for port in plugin.ports:
cmd.open_port(port.internal)
print("="*60)
print("=" * 60)
print(" Starting docker container")
print("-"*60)
print("-" * 60)
print(cmd.to_str())
print("="*60)
print("=" * 60)
subprocess.call(['/bin/bash', '-c', 'echo Is master?: $AZTK_IS_MASTER _ $AZTK_IS_WORKER'])
subprocess.call(['/bin/bash', '-c', cmd.to_str()])

Просмотреть файл

@ -3,7 +3,6 @@ from install import install
from core import logger
def run():
if len(sys.argv) < 2:
print("Error: Expected at least one argument")

Просмотреть файл

@ -12,7 +12,6 @@ from core import config
# limit azure.storage logging
logging.getLogger("azure.storage").setLevel(logging.CRITICAL)
'''
Submit helper methods
'''
@ -46,12 +45,9 @@ def upload_file_to_container(container_name,
if not node_path:
node_path = blob_name
blob_client.create_container(container_name,
fail_on_exist=False)
blob_client.create_container(container_name, fail_on_exist=False)
blob_client.create_blob_from_path(container_name,
blob_path,
file_path)
blob_client.create_blob_from_path(container_name, blob_path, file_path)
sas_token = blob_client.generate_blob_shared_access_signature(
container_name,
@ -59,32 +55,17 @@ def upload_file_to_container(container_name,
permission=blob.BlobPermissions.READ,
expiry=datetime.datetime.utcnow() + datetime.timedelta(days=7))
sas_url = blob_client.make_blob_url(container_name,
blob_path,
sas_token=sas_token)
sas_url = blob_client.make_blob_url(container_name, blob_path, sas_token=sas_token)
return batch_models.ResourceFile(file_path=node_path,
blob_source=sas_url)
return batch_models.ResourceFile(file_path=node_path, blob_source=sas_url)
def __app_submit_cmd(
name: str,
app: str,
app_args: List[str],
main_class: str,
jars: List[str],
py_files: List[str],
files: List[str],
driver_java_options: str,
driver_library_path: str,
driver_class_path: str,
driver_memory: str,
executor_memory: str,
driver_cores: int,
executor_cores: int):
def __app_submit_cmd(name: str, app: str, app_args: List[str], main_class: str, jars: List[str], py_files: List[str],
files: List[str], driver_java_options: str, driver_library_path: str, driver_class_path: str,
driver_memory: str, executor_memory: str, driver_cores: int, executor_cores: int):
cluster_id = os.environ['AZ_BATCH_POOL_ID']
spark_home = os.environ['SPARK_HOME']
with open (os.path.join(spark_home, 'conf', 'master')) as f:
with open(os.path.join(spark_home, 'conf', 'master')) as f:
master_ip = f.read().rstrip()
# set file paths to correct path on container
@ -94,10 +75,8 @@ def __app_submit_cmd(
files = [os.path.join(files_path, os.path.basename(f)) for f in files]
# 2>&1 redirect stdout and stderr to be in the same file
spark_submit_cmd = CommandBuilder(
'{0}/bin/spark-submit'.format(spark_home))
spark_submit_cmd.add_option(
'--master', 'spark://{0}:7077'.format(master_ip))
spark_submit_cmd = CommandBuilder('{0}/bin/spark-submit'.format(spark_home))
spark_submit_cmd.add_option('--master', 'spark://{0}:7077'.format(master_ip))
spark_submit_cmd.add_option('--name', name)
spark_submit_cmd.add_option('--class', main_class)
spark_submit_cmd.add_option('--jars', jars and ','.join(jars))
@ -114,8 +93,7 @@ def __app_submit_cmd(
spark_submit_cmd.add_option('--executor-cores', str(executor_cores))
spark_submit_cmd.add_argument(
os.path.expandvars(app) + ' ' +
' '.join(['\'' + str(app_arg) + '\'' for app_arg in (app_args or [])]))
os.path.expandvars(app) + ' ' + ' '.join(['\'' + str(app_arg) + '\'' for app_arg in (app_args or [])]))
with open("spark-submit.txt", mode="w", encoding="UTF-8") as stream:
stream.write(spark_submit_cmd.to_str())
@ -146,7 +124,6 @@ def upload_log(blob_client, application):
def receive_submit_request(application_file_path):
'''
Handle the request to submit a task
'''

Просмотреть файл

@ -18,5 +18,6 @@ def main():
print(e)
time.sleep(1)
if __name__ == "__main__":
main()

Просмотреть файл

@ -1,7 +1,6 @@
import time
import os
while not os.path.exists('/tmp/setup_complete'):
time.sleep(1)

Просмотреть файл

@ -44,8 +44,9 @@ class SparkBaseOperations:
Returns:
:obj:`azure.batch.models.StartTask`: the StartTask definition to provision the cluster.
"""
return generate_cluster_start_task.generate_cluster_start_task(
core_base_operations, zip_resource_file, id, gpu_enabled, docker_repo, file_shares, plugins, mixed_mode, worker_on_master)
return generate_cluster_start_task.generate_cluster_start_task(core_base_operations, zip_resource_file, id,
gpu_enabled, docker_repo, file_shares, plugins,
mixed_mode, worker_on_master)
#TODO: make this private or otherwise not public
def _generate_application_task(self, core_base_operations, container_id, application, remote=False):
@ -61,4 +62,5 @@ class SparkBaseOperations:
Returns:
:obj:`azure.batch.models.TaskAddParameter`: the Task definition for the Application.
"""
return generate_application_task.generate_application_task(core_base_operations, container_id, application, remote)
return generate_application_task.generate_application_task(core_base_operations, container_id, application,
remote)

Просмотреть файл

@ -26,11 +26,14 @@ class Client(CoreClient):
cluster (:obj:`aztk.spark.client.cluster.ClusterOperations`): Cluster
job (:obj:`aztk.spark.client.job.JobOperations`): Job
"""
def __init__(self, secrets_configuration: models.SecretsConfiguration = None, **kwargs):
self.secrets_configuration = None
context = None
if kwargs.get("secrets_config"):
deprecate(version="0.10.0", message="secrets_config key is deprecated in secrets.yaml",
deprecate(
version="0.10.0",
message="secrets_config key is deprecated in secrets.yaml",
advice="Please use secrets_configuration key instead.")
context = self._get_context(kwargs.get("secrets_config"))
else:
@ -38,7 +41,6 @@ class Client(CoreClient):
self.cluster = ClusterOperations(context)
self.job = JobOperations(context)
# ALL THE FOLLOWING METHODS ARE DEPRECATED AND WILL BE REMOVED IN 0.10.0
@deprecated("0.10.0")
@ -171,7 +173,8 @@ class Client(CoreClient):
password=None,
port_forward_list=None,
internal=False):
return self.cluster._core_cluster_operations.ssh_into_node(cluster_id, node_id, username, ssh_key, password, port_forward_list, internal)
return self.cluster._core_cluster_operations.ssh_into_node(cluster_id, node_id, username, ssh_key, password,
port_forward_list, internal)
'''
job submission

Просмотреть файл

@ -4,7 +4,13 @@ from aztk import error
from aztk.utils import helpers
def cluster_copy(core_cluster_operations, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout: int = None):
def cluster_copy(core_cluster_operations,
cluster_id: str,
source_path: str,
destination_path: str,
host: bool = False,
internal: bool = False,
timeout: int = None):
try:
container_name = None if host else 'spark'
return core_cluster_operations.copy(

Просмотреть файл

@ -12,6 +12,7 @@ POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool, elevation_level=batch_models.ElevationLevel.admin))
def _default_scheduling_target(vm_count: int):
if vm_count == 0:
return models.SchedulingTarget.Any
@ -27,7 +28,10 @@ def _apply_default_for_cluster_config(configuration: models.ClusterConfiguration
return cluster_conf
def create_cluster(core_cluster_operations, spark_cluster_operations, cluster_conf: models.ClusterConfiguration, wait: bool = False):
def create_cluster(core_cluster_operations,
spark_cluster_operations,
cluster_conf: models.ClusterConfiguration,
wait: bool = False):
"""
Create a new aztk spark cluster
@ -47,14 +51,15 @@ def create_cluster(core_cluster_operations, spark_cluster_operations, cluster_co
node_data = NodeData(cluster_conf).add_core().done()
zip_resource_files = cluster_data.upload_node_data(node_data).to_resource_file()
start_task = spark_cluster_operations._generate_cluster_start_task(core_cluster_operations, zip_resource_files, cluster_conf.cluster_id,
cluster_conf.gpu_enabled(), cluster_conf.get_docker_repo(),
cluster_conf.file_shares, cluster_conf.plugins,
cluster_conf.mixed_mode(), cluster_conf.worker_on_master)
start_task = spark_cluster_operations._generate_cluster_start_task(
core_cluster_operations, zip_resource_files, cluster_conf.cluster_id, cluster_conf.gpu_enabled(),
cluster_conf.get_docker_repo(), cluster_conf.file_shares, cluster_conf.plugins, cluster_conf.mixed_mode(),
cluster_conf.worker_on_master)
software_metadata_key = base_models.Software.spark
cluster = core_cluster_operations.create(cluster_conf, software_metadata_key, start_task, constants.SPARK_VM_IMAGE)
cluster = core_cluster_operations.create(cluster_conf, software_metadata_key, start_task,
constants.SPARK_VM_IMAGE)
# Wait for the master to be ready
if wait:

Просмотреть файл

@ -4,7 +4,12 @@ from aztk import error
from aztk.utils import helpers
def create_user(core_cluster_operations, spark_cluster_operations, cluster_id: str, username: str, password: str = None, ssh_key: str = None) -> str:
def create_user(core_cluster_operations,
spark_cluster_operations,
cluster_id: str,
username: str,
password: str = None,
ssh_key: str = None) -> str:
try:
cluster = spark_cluster_operations.get(cluster_id)
master_node_id = cluster.master_node_id

Просмотреть файл

@ -1,6 +1,3 @@
import os
from azure.batch.models import batch_error
@ -11,7 +8,8 @@ from aztk.utils import helpers
def _run(spark_cluster_operations, cluster_id, output_directory=None):
# copy debug program to each node
output = spark_cluster_operations.copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
output = spark_cluster_operations.copy(
cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
ssh_cmd = _build_diagnostic_ssh_command()
run_output = spark_cluster_operations.run(cluster_id, ssh_cmd, host=True)
remote_path = "/tmp/debug.zip"

Просмотреть файл

@ -1,14 +1,20 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def cluster_download(core_cluster_operations, cluster_id: str, source_path: str, destination_path: str = None, host: bool = False, internal: bool = False, timeout: int = None):
def cluster_download(core_cluster_operations,
cluster_id: str,
source_path: str,
destination_path: str = None,
host: bool = False,
internal: bool = False,
timeout: int = None):
try:
container_name = None if host else 'spark'
return core_cluster_operations.copy(cluster_id,
return core_cluster_operations.copy(
cluster_id,
source_path,
destination_path=destination_path,
container_name=container_name,

Просмотреть файл

@ -1,7 +1,10 @@
from aztk.spark import models
def get_application_log(core_base_operations, cluster_id: str, application_name: str, tail=False, current_bytes: int = 0):
base_application_log = core_base_operations.get_application_log(
cluster_id, application_name, tail, current_bytes)
def get_application_log(core_base_operations,
cluster_id: str,
application_name: str,
tail=False,
current_bytes: int = 0):
base_application_log = core_base_operations.get_application_log(cluster_id, application_name, tail, current_bytes)
return models.ApplicationLog(base_application_log)

Просмотреть файл

@ -4,7 +4,12 @@ from aztk import error
from aztk.utils import helpers
def cluster_run(core_cluster_operations, cluster_id: str, command: str, host=False, internal: bool = False, timeout=None):
def cluster_run(core_cluster_operations,
cluster_id: str,
command: str,
host=False,
internal: bool = False,
timeout=None):
try:
return core_cluster_operations.run(
cluster_id, command, internal, container_name='spark' if not host else None, timeout=timeout)

Просмотреть файл

@ -1,12 +1,19 @@
import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def cluster_ssh_into_master(spark_cluster_operations, cluster_id, node_id, username, ssh_key=None, password=None, port_forward_list=None, internal=False):
def cluster_ssh_into_master(spark_cluster_operations,
cluster_id,
node_id,
username,
ssh_key=None,
password=None,
port_forward_list=None,
internal=False):
try:
spark_cluster_operations.ssh_into_node(cluster_id, node_id, username, ssh_key, password, port_forward_list, internal)
spark_cluster_operations.ssh_into_node(cluster_id, node_id, username, ssh_key, password, port_forward_list,
internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -15,12 +15,18 @@ def affinitize_task_to_master(core_cluster_operations, spark_cluster_operations,
cluster = spark_cluster_operations.get(cluster_id)
if cluster.master_node_id is None:
raise AztkError("Master has not yet been selected. Please wait until the cluster is finished provisioning.")
master_node = core_cluster_operations.batch_client.compute_node.get(pool_id=cluster_id, node_id=cluster.master_node_id)
master_node = core_cluster_operations.batch_client.compute_node.get(
pool_id=cluster_id, node_id=cluster.master_node_id)
task.affinity_info = batch_models.AffinityInformation(affinity_id=master_node.affinity_id)
return task
def submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote: bool = False, wait: bool = False):
def submit_application(core_cluster_operations,
spark_cluster_operations,
cluster_id,
application,
remote: bool = False,
wait: bool = False):
"""
Submit a spark app
"""
@ -32,7 +38,8 @@ def submit_application(core_cluster_operations, spark_cluster_operations, cluste
core_cluster_operations.batch_client.task.add(job_id=job_id, task=task)
if wait:
helpers.wait_for_task_to_complete(job_id=job_id, task_id=task.id, batch_client=core_cluster_operations.batch_client)
helpers.wait_for_task_to_complete(
job_id=job_id, task_id=task.id, batch_client=core_cluster_operations.batch_client)
def submit(core_cluster_operations,

Просмотреть файл

@ -3,6 +3,7 @@ import azure.batch.models.batch_error as batch_error
from aztk import error
from aztk.utils import helpers
def wait_for_application_to_complete(core_cluster_operations, id, application_name):
try:
return core_cluster_operations.wait(id, application_name)

Просмотреть файл

@ -163,7 +163,8 @@ class ClusterOperations(SparkBaseOperations):
Returns:
:obj:`List[aztk.spark.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command.
"""
return copy.cluster_copy(self._core_cluster_operations, id, source_path, destination_path, host, internal, timeout)
return copy.cluster_copy(self._core_cluster_operations, id, source_path, destination_path, host, internal,
timeout)
def download(self,
id: str,
@ -190,8 +191,8 @@ class ClusterOperations(SparkBaseOperations):
Returns:
:obj:`List[aztk.spark.models.NodeOutput]`: A list of NodeOutput objects representing the output of the copy command.
"""
return download.cluster_download(self._core_cluster_operations, id, source_path, destination_path, host, internal,
timeout)
return download.cluster_download(self._core_cluster_operations, id, source_path, destination_path, host,
internal, timeout)
def diagnostics(self, id, output_directory=None):
"""Download a file from every node in a cluster.
@ -221,7 +222,8 @@ class ClusterOperations(SparkBaseOperations):
Returns:
:obj:`aztk.spark.models.ApplicationLog`: a model representing the output of the application.
"""
return get_application_log.get_application_log(self._core_cluster_operations, id, application_name, tail, current_bytes)
return get_application_log.get_application_log(self._core_cluster_operations, id, application_name, tail,
current_bytes)
def get_remote_login_settings(self, id: str, node_id: str):
"""Get the remote login information for a node in a cluster

Просмотреть файл

@ -10,7 +10,8 @@ from .get_recent_job import get_recent_job
def _get_job(core_job_operations, job_id):
job = core_job_operations.batch_client.job_schedule.get(job_id)
job_apps = [
app for app in core_job_operations.batch_client.task.list(job_id=job.execution_info.recent_job.id) if app.id != job_id
app for app in core_job_operations.batch_client.task.list(job_id=job.execution_info.recent_job.id)
if app.id != job_id
]
recent_run_job = get_recent_job(core_job_operations, job_id)
pool_prefix = recent_run_job.pool_info.auto_pool_specification.auto_pool_id_prefix

Просмотреть файл

@ -12,7 +12,8 @@ def _get_application(spark_job_operations, job_id, application_name):
# info about the app
recent_run_job = get_recent_job(spark_job_operations._core_job_operations, job_id)
try:
return spark_job_operations._core_job_operations.batch_client.task.get(job_id=recent_run_job.id, task_id=application_name)
return spark_job_operations._core_job_operations.batch_client.task.get(
job_id=recent_run_job.id, task_id=application_name)
except batch_models.batch_error.BatchErrorException:
raise error.AztkError(
"The Spark application {0} is still being provisioned or does not exist.".format(application_name))

Просмотреть файл

@ -5,6 +5,7 @@ from aztk.spark import models
from aztk.utils import helpers
from .get_recent_job import get_recent_job
def stop_app(core_job_operations, job_id, application_name):
recent_run_job = get_recent_job(core_job_operations, job_id)

Просмотреть файл

@ -64,7 +64,10 @@ def _apply_default_for_job_config(job_conf: models.JobConfiguration):
return job_conf
def submit_job(core_job_operations, spark_job_operations, job_configuration: models.JobConfiguration, wait: bool = False):
def submit_job(core_job_operations,
spark_job_operations,
job_configuration: models.JobConfiguration,
wait: bool = False):
try:
job_configuration = _apply_default_for_job_config(job_configuration)
job_configuration.validate()
@ -84,8 +87,8 @@ def submit_job(core_job_operations, spark_job_operations, job_configuration: mod
application_tasks = []
for application in job_configuration.applications:
application_tasks.append((application,
spark_job_operations._generate_application_task(core_job_operations, job_configuration.id,
application)))
spark_job_operations._generate_application_task(
core_job_operations, job_configuration.id, application)))
job_manager_task = generate_job_manager_task(core_job_operations, job_configuration, application_tasks)

Просмотреть файл

@ -17,7 +17,6 @@ class JobOperations(SparkBaseOperations):
self._core_job_operations = CoreJobOperations(context)
# self._spark_base_cluster_operations = SparkBaseOperations()
def list(self):
"""List all jobs.

Просмотреть файл

@ -1,2 +1 @@
# ALL FILES IN THIS DIRECTORY ARE DEPRECATED, WILL BE REMOTE IN v0.9.0

Просмотреть файл

@ -4,9 +4,11 @@ from aztk.utils.command_builder import CommandBuilder
from aztk import models as aztk_models
import azure.batch.models as batch_models
def run(spark_client, cluster_id, output_directory=None):
# copy debug program to each node
output = spark_client.cluster_copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
output = spark_client.cluster_copy(
cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
ssh_cmd = _build_diagnostic_ssh_command()
run_output = spark_client.cluster_run(cluster_id, ssh_cmd, host=True)
remote_path = "/tmp/debug.zip"

Просмотреть файл

@ -8,30 +8,27 @@ import azure.batch.models as batch_models
POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool,
elevation_level=batch_models.ElevationLevel.admin))
scope=batch_models.AutoUserScope.pool, elevation_level=batch_models.ElevationLevel.admin))
def _get_aztk_environment(cluster_id, worker_on_master, mixed_mode):
envs = []
envs.append(batch_models.EnvironmentSetting(name="AZTK_MIXED_MODE", value=helpers.bool_env(mixed_mode)))
envs.append(batch_models.EnvironmentSetting(
name="AZTK_WORKER_ON_MASTER", value=helpers.bool_env(worker_on_master)))
envs.append(batch_models.EnvironmentSetting(name="AZTK_WORKER_ON_MASTER", value=helpers.bool_env(worker_on_master)))
envs.append(batch_models.EnvironmentSetting(name="AZTK_CLUSTER_ID", value=cluster_id))
return envs
def __get_docker_credentials(spark_client):
creds = []
docker = spark_client.secrets_config.docker
if docker:
if docker.endpoint:
creds.append(batch_models.EnvironmentSetting(
name="DOCKER_ENDPOINT", value=docker.endpoint))
creds.append(batch_models.EnvironmentSetting(name="DOCKER_ENDPOINT", value=docker.endpoint))
if docker.username:
creds.append(batch_models.EnvironmentSetting(
name="DOCKER_USERNAME", value=docker.username))
creds.append(batch_models.EnvironmentSetting(name="DOCKER_USERNAME", value=docker.username))
if docker.password:
creds.append(batch_models.EnvironmentSetting(
name="DOCKER_PASSWORD", value=docker.password))
creds.append(batch_models.EnvironmentSetting(name="DOCKER_PASSWORD", value=docker.password))
return creds
@ -41,25 +38,17 @@ def __get_secrets_env(spark_client):
service_principal = spark_client.secrets_config.service_principal
if shared_key:
return [
batch_models.EnvironmentSetting(
name="BATCH_SERVICE_URL", value=shared_key.batch_service_url),
batch_models.EnvironmentSetting(
name="BATCH_ACCOUNT_KEY", value=shared_key.batch_account_key),
batch_models.EnvironmentSetting(
name="STORAGE_ACCOUNT_NAME", value=shared_key.storage_account_name),
batch_models.EnvironmentSetting(
name="STORAGE_ACCOUNT_KEY", value=shared_key.storage_account_key),
batch_models.EnvironmentSetting(
name="STORAGE_ACCOUNT_SUFFIX", value=shared_key.storage_account_suffix),
batch_models.EnvironmentSetting(name="BATCH_SERVICE_URL", value=shared_key.batch_service_url),
batch_models.EnvironmentSetting(name="BATCH_ACCOUNT_KEY", value=shared_key.batch_account_key),
batch_models.EnvironmentSetting(name="STORAGE_ACCOUNT_NAME", value=shared_key.storage_account_name),
batch_models.EnvironmentSetting(name="STORAGE_ACCOUNT_KEY", value=shared_key.storage_account_key),
batch_models.EnvironmentSetting(name="STORAGE_ACCOUNT_SUFFIX", value=shared_key.storage_account_suffix),
]
else:
return [
batch_models.EnvironmentSetting(
name="SP_TENANT_ID", value=service_principal.tenant_id),
batch_models.EnvironmentSetting(
name="SP_CLIENT_ID", value=service_principal.client_id),
batch_models.EnvironmentSetting(
name="SP_CREDENTIAL", value=service_principal.credential),
batch_models.EnvironmentSetting(name="SP_TENANT_ID", value=service_principal.tenant_id),
batch_models.EnvironmentSetting(name="SP_CLIENT_ID", value=service_principal.client_id),
batch_models.EnvironmentSetting(name="SP_CREDENTIAL", value=service_principal.credential),
batch_models.EnvironmentSetting(
name="SP_BATCH_RESOURCE_ID", value=service_principal.batch_account_resource_id),
batch_models.EnvironmentSetting(
@ -70,9 +59,9 @@ def __get_secrets_env(spark_client):
def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
gpu_enabled: bool,
docker_repo: str = None,
plugins = None,
plugins=None,
worker_on_master: bool = True,
file_mounts = None,
file_mounts=None,
mixed_mode: bool = False):
"""
For Docker on ubuntu 16.04 - return the command line
@ -89,12 +78,9 @@ def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
shares.append('mkdir -p {0}'.format(mount.mount_path))
# Mount the file share
shares.append('mount -t cifs //{0}.file.core.windows.net/{2} {3} -o vers=3.0,username={0},password={1},dir_mode=0777,file_mode=0777,sec=ntlmssp'.format(
mount.storage_account_name,
mount.storage_account_key,
mount.file_share_path,
mount.mount_path
))
shares.append(
'mount -t cifs //{0}.file.core.windows.net/{2} {3} -o vers=3.0,username={0},password={1},dir_mode=0777,file_mode=0777,sec=ntlmssp'.
format(mount.storage_account_name, mount.storage_account_key, mount.file_share_path, mount.mount_path))
setup = [
'time('\
@ -112,8 +98,8 @@ def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
commands = shares + setup
return commands
def generate_cluster_start_task(
spark_client,
def generate_cluster_start_task(spark_client,
zip_resource_file: batch_models.ResourceFile,
cluster_id: str,
gpu_enabled: bool,
@ -138,22 +124,17 @@ def generate_cluster_start_task(
# TODO use certificate
environment_settings = __get_secrets_env(spark_client) + [
batch_models.EnvironmentSetting(
name="SPARK_WEB_UI_PORT", value=spark_web_ui_port),
batch_models.EnvironmentSetting(
name="SPARK_WORKER_UI_PORT", value=spark_worker_ui_port),
batch_models.EnvironmentSetting(
name="SPARK_JOB_UI_PORT", value=spark_job_ui_port),
batch_models.EnvironmentSetting(
name="SPARK_CONTAINER_NAME", value=spark_container_name),
batch_models.EnvironmentSetting(
name="SPARK_SUBMIT_LOGS_FILE", value=spark_submit_logs_file),
batch_models.EnvironmentSetting(
name="AZTK_GPU_ENABLED", value=helpers.bool_env(gpu_enabled)),
batch_models.EnvironmentSetting(name="SPARK_WEB_UI_PORT", value=spark_web_ui_port),
batch_models.EnvironmentSetting(name="SPARK_WORKER_UI_PORT", value=spark_worker_ui_port),
batch_models.EnvironmentSetting(name="SPARK_JOB_UI_PORT", value=spark_job_ui_port),
batch_models.EnvironmentSetting(name="SPARK_CONTAINER_NAME", value=spark_container_name),
batch_models.EnvironmentSetting(name="SPARK_SUBMIT_LOGS_FILE", value=spark_submit_logs_file),
batch_models.EnvironmentSetting(name="AZTK_GPU_ENABLED", value=helpers.bool_env(gpu_enabled)),
] + __get_docker_credentials(spark_client) + _get_aztk_environment(cluster_id, worker_on_master, mixed_mode)
# start task command
command = __cluster_install_cmd(zip_resource_file, gpu_enabled, docker_repo, plugins, worker_on_master, file_shares, mixed_mode)
command = __cluster_install_cmd(zip_resource_file, gpu_enabled, docker_repo, plugins, worker_on_master, file_shares,
mixed_mode)
return batch_models.StartTask(
command_line=helpers.wrap_commands_in_shell(command),

Просмотреть файл

@ -12,14 +12,15 @@ from aztk.utils import constants, helpers
output_file = constants.TASK_WORKING_DIR + \
"/" + constants.SPARK_SUBMIT_LOGS_FILE
def __check_task_node_exist(batch_client, cluster_id: str, task: batch_models.CloudTask) -> bool:
try:
batch_client.compute_node.get(
cluster_id, task.node_info.node_id)
batch_client.compute_node.get(cluster_id, task.node_info.node_id)
return True
except batch_error.BatchErrorException:
return False
def __wait_for_app_to_be_running(batch_client, cluster_id: str, application_name: str) -> batch_models.CloudTask:
"""
Wait for the batch task to leave the waiting state into running(or completed if it was fast enough)
@ -33,11 +34,11 @@ def __wait_for_app_to_be_running(batch_client, cluster_id: str, application_name
else:
return task
def __get_output_file_properties(batch_client, cluster_id: str, application_name: str):
while True:
try:
file = helpers.get_file_properties(
cluster_id, application_name, output_file, batch_client)
file = helpers.get_file_properties(cluster_id, application_name, output_file, batch_client)
return file
except batch_error.BatchErrorException as e:
if e.response.status_code == 404:
@ -79,8 +80,7 @@ def get_log(batch_client, blob_client, cluster_id: str, application_name: str, t
ocp_range = None
if tail:
ocp_range = "bytes={0}-{1}".format(
current_bytes, target_bytes - 1)
ocp_range = "bytes={0}-{1}".format(current_bytes, target_bytes - 1)
stream = batch_client.file.get_from_task(
job_id, task_id, output_file, batch_models.FileGetFromTaskOptions(ocp_range=ocp_range))

Просмотреть файл

@ -9,11 +9,11 @@ import yaml
import aztk.error as error
from aztk.utils import constants, helpers
from aztk.utils.command_builder import CommandBuilder
'''
Job Submission helper methods
'''
def __app_cmd():
docker_exec = CommandBuilder("sudo docker exec")
docker_exec.add_argument("-i")
@ -30,7 +30,8 @@ def __app_cmd():
def generate_task(spark_client, job, application_tasks):
resource_files = []
for application, task in application_tasks:
task_definition_resource_file = helpers.upload_text_to_container(container_name=job.id,
task_definition_resource_file = helpers.upload_text_to_container(
container_name=job.id,
application_name=application.name + '.yaml',
file_path=application.name + '.yaml',
content=yaml.dump(task),
@ -48,9 +49,7 @@ def generate_task(spark_client, job, application_tasks):
allow_low_priority_node=True,
user_identity=batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.task,
elevation_level=batch_models.ElevationLevel.admin))
)
scope=batch_models.AutoUserScope.task, elevation_level=batch_models.ElevationLevel.admin)))
return task
@ -83,8 +82,9 @@ def list_applications(spark_client, job_id):
def get_job(spark_client, job_id):
job = spark_client.batch_client.job_schedule.get(job_id)
job_apps = [app for app in
spark_client.batch_client.task.list(job_id=job.execution_info.recent_job.id) if app.id != job_id]
job_apps = [
app for app in spark_client.batch_client.task.list(job_id=job.execution_info.recent_job.id) if app.id != job_id
]
recent_run_job = __get_recent_job(spark_client, job_id)
pool_prefix = recent_run_job.pool_info.auto_pool_specification.auto_pool_id_prefix
pool = nodes = None
@ -101,8 +101,8 @@ def disable(spark_client, job_id):
# disable the currently running job from the job schedule if exists
recent_run_job = __get_recent_job(spark_client, job_id)
if recent_run_job.id and recent_run_job.state == batch_models.JobState.active:
spark_client.batch_client.job.disable(job_id=recent_run_job.id,
disable_tasks=batch_models.DisableJobOption.requeue)
spark_client.batch_client.job.disable(
job_id=recent_run_job.id, disable_tasks=batch_models.DisableJobOption.requeue)
# disable the job_schedule
spark_client.batch_client.job_schedule.disable(job_id)
@ -156,7 +156,8 @@ def get_application(spark_client, job_id, application_name):
try:
return spark_client.batch_client.task.get(job_id=recent_run_job.id, task_id=application_name)
except batch_models.batch_error.BatchErrorException:
raise error.AztkError("The Spark application {0} is still being provisioned or does not exist.".format(application_name))
raise error.AztkError(
"The Spark application {0} is still being provisioned or does not exist.".format(application_name))
def get_application_log(spark_client, job_id, application_name):
@ -176,7 +177,8 @@ def get_application_log(spark_client, job_id, application_name):
raise error.AztkError("The application {0} has not yet been created.".format(application))
raise error.AztkError("The application {0} does not exist".format(application_name))
else:
if task.state in (batch_models.TaskState.active, batch_models.TaskState.running, batch_models.TaskState.preparing):
if task.state in (batch_models.TaskState.active, batch_models.TaskState.running,
batch_models.TaskState.preparing):
raise error.AztkError("The application {0} has not yet finished executing.".format(application_name))
return spark_client.get_application_log(job_id, application_name)
@ -192,6 +194,7 @@ def stop_app(spark_client, job_id, application_name):
except batch_models.batch_error.BatchErrorException:
return False
def wait_until_job_finished(spark_client, job_id):
job_state = spark_client.batch_client.job_schedule.get(job_id).state

Просмотреть файл

@ -6,8 +6,6 @@ import azure.batch.models as batch_models
from aztk.error import AztkError
from aztk.utils import constants, helpers
from aztk.utils.command_builder import CommandBuilder
'''
Submit helper methods
'''
@ -22,7 +20,8 @@ def generate_task(spark_client, container_id, application, remote=False):
# The application provided is not hosted remotely and therefore must be uploaded
if not remote:
app_resource_file = helpers.upload_file_to_container(container_name=container_id,
app_resource_file = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=application.application,
blob_client=spark_client.blob_client,
@ -36,7 +35,8 @@ def generate_task(spark_client, container_id, application, remote=False):
# Upload dependent JARS
jar_resource_file_paths = []
for jar in application.jars:
current_jar_resource_file_path = helpers.upload_file_to_container(container_name=container_id,
current_jar_resource_file_path = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=jar,
blob_client=spark_client.blob_client,
@ -47,19 +47,20 @@ def generate_task(spark_client, container_id, application, remote=False):
# Upload dependent python files
py_files_resource_file_paths = []
for py_file in application.py_files:
current_py_files_resource_file_path = helpers.upload_file_to_container(container_name=container_id,
current_py_files_resource_file_path = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=py_file,
blob_client=spark_client.blob_client,
use_full_path=False)
py_files_resource_file_paths.append(
current_py_files_resource_file_path)
py_files_resource_file_paths.append(current_py_files_resource_file_path)
resource_files.append(current_py_files_resource_file_path)
# Upload other dependent files
files_resource_file_paths = []
for file in application.files:
files_resource_file_path = helpers.upload_file_to_container(container_name=container_id,
files_resource_file_path = helpers.upload_file_to_container(
container_name=container_id,
application_name=application.name,
file_path=file,
blob_client=spark_client.blob_client,
@ -95,13 +96,10 @@ def generate_task(spark_client, container_id, application, remote=False):
id=application.name,
command_line=helpers.wrap_commands_in_shell([task_cmd.to_str()]),
resource_files=resource_files,
constraints=batch_models.TaskConstraints(
max_task_retry_count=application.max_retry_count),
constraints=batch_models.TaskConstraints(max_task_retry_count=application.max_retry_count),
user_identity=batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.task,
elevation_level=batch_models.ElevationLevel.admin))
)
scope=batch_models.AutoUserScope.task, elevation_level=batch_models.ElevationLevel.admin)))
return task
@ -122,7 +120,6 @@ def submit_application(spark_client, cluster_id, application, remote: bool = Fal
task = generate_task(spark_client, cluster_id, application, remote)
task = affinitize_task_to_master(spark_client, cluster_id, task)
# Add task to batch job (which has the same name as cluster_id)
job_id = cluster_id
spark_client.batch_client.task.add(job_id=job_id, task=task)

Просмотреть файл

@ -6,6 +6,7 @@ from aztk import error
from aztk.utils import constants, helpers
from aztk.core.models import Model, fields
class SparkToolkit(aztk.models.Toolkit):
def __init__(self, version: str, environment: str = None, environment_version: str = None):
super().__init__(
@ -54,6 +55,7 @@ class RemoteLogin(aztk.models.RemoteLogin):
class PortForwardingSpecification(aztk.models.PortForwardingSpecification):
pass
class File(aztk.models.File):
pass
@ -105,10 +107,12 @@ class PluginConfiguration(aztk.models.PluginConfiguration):
SchedulingTarget = aztk.models.SchedulingTarget
class ClusterConfiguration(aztk.models.ClusterConfiguration):
spark_configuration = fields.Model(SparkConfiguration, default=None)
worker_on_master = fields.Boolean(default=True)
class SecretsConfiguration(aztk.models.SecretsConfiguration):
pass
@ -118,8 +122,7 @@ class VmImage(aztk.models.VmImage):
class ApplicationConfiguration:
def __init__(
self,
def __init__(self,
name=None,
application=None,
application_args=None,
@ -185,11 +188,10 @@ class Application:
class JobConfiguration:
def __init__(
self,
id = None,
applications = None,
vm_size = None,
def __init__(self,
id=None,
applications=None,
vm_size=None,
custom_scripts=None,
spark_configuration=None,
toolkit=None,
@ -240,8 +242,7 @@ class JobConfiguration:
Raises: Error if invalid
"""
if self.toolkit is None:
raise error.InvalidModelError(
"Please supply a toolkit in the cluster configuration")
raise error.InvalidModelError("Please supply a toolkit in the cluster configuration")
self.toolkit.validate()
@ -254,9 +255,7 @@ class JobConfiguration:
)
if self.vm_size is None:
raise error.AztkError(
"Please supply a vm_size in your configuration."
)
raise error.AztkError("Please supply a vm_size in your configuration.")
if self.mixed_mode() and not self.subnet_id:
raise error.AztkError(
@ -277,7 +276,8 @@ class JobState():
class Job():
def __init__(self, cloud_job_schedule: batch_models.CloudJobSchedule,
def __init__(self,
cloud_job_schedule: batch_models.CloudJobSchedule,
cloud_tasks: List[batch_models.CloudTask] = None,
pool: batch_models.CloudPool = None,
nodes: batch_models.ComputeNodePaged = None):

Просмотреть файл

@ -6,9 +6,6 @@ from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
def AptGetPlugin(packages=None):
return InstallPlugin(
name="apt-get",
command="apt-get update && apt-get install -y",
packages=packages
)
return InstallPlugin(name="apt-get", command="apt-get update && apt-get install -y", packages=packages)

Просмотреть файл

@ -6,9 +6,6 @@ from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
def CondaPlugin(packages=None):
return InstallPlugin(
name="conda",
command="conda install -y",
packages=packages
)
return InstallPlugin(name="conda", command="conda install -y", packages=packages)

Просмотреть файл

@ -5,14 +5,12 @@ from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
def InstallPlugin(name, command, packages=None):
return PluginConfiguration(
name=name,
target_role=PluginTargetRole.All,
execute="install.sh",
files=[
PluginFile("install.sh", os.path.join(dir_path, "install.sh"))
],
files=[PluginFile("install.sh", os.path.join(dir_path, "install.sh"))],
args=packages,
env=dict(COMMAND=command)
)
env=dict(COMMAND=command))

Просмотреть файл

@ -6,9 +6,6 @@ from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
def PipPlugin(packages=None):
return InstallPlugin(
name="pip",
command="pip install",
packages=packages
)
return InstallPlugin(name="pip", command="pip install", packages=packages)

Просмотреть файл

@ -4,6 +4,7 @@ from aztk.models.plugins.plugin_file import PluginFile
dir_path = os.path.dirname(os.path.realpath(__file__))
def JupyterPlugin():
return PluginConfiguration(
name="jupyter",

Просмотреть файл

@ -5,6 +5,7 @@ from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
def JupyterLabPlugin():
return PluginConfiguration(
name="jupyterlab",

Просмотреть файл

@ -14,5 +14,4 @@ def NvBLASPlugin():
execute="nvblas.sh",
files=[
PluginFile("nvblas.sh", os.path.join(dir_path, "nvblas.sh")),
]
)
])

Просмотреть файл

@ -5,6 +5,7 @@ from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
class ResourceMonitorPlugin(PluginConfiguration):
def __init__(self):
super().__init__(
@ -22,5 +23,4 @@ class ResourceMonitorPlugin(PluginConfiguration):
PluginFile("start_monitor.sh", os.path.join(dir_path, "start_monitor.sh")),
PluginFile("etc/telegraf.conf", os.path.join(dir_path, "telegraf.conf")),
PluginFile("docker-compose.yml", os.path.join(dir_path, "docker-compose.yml")),
]
)
])

Просмотреть файл

@ -5,6 +5,7 @@ from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
class SimplePlugin(PluginConfiguration):
def __init__(self):
super().__init__(

Просмотреть файл

@ -10,12 +10,7 @@ class SparkUIProxyPlugin(PluginConfiguration):
def __init__(self):
super().__init__(
name="spark_ui_proxy",
ports=[
PluginPort(
internal=9999,
public=True
)
],
ports=[PluginPort(internal=9999, public=True)],
target_role=PluginTargetRole.Master,
execute="spark_ui_proxy.sh",
args=["localhost:8080", "9999"],

Просмотреть файл

@ -41,7 +41,6 @@ def wait_for_master_to_be_ready(core_operations, spark_operations, cluster_id: s
delta = now - start_time
if delta.total_seconds() > constants.WAIT_FOR_MASTER_TIMEOUT:
raise MasterInvalidStateError(
"Master didn't become ready before timeout.")
raise MasterInvalidStateError("Master didn't become ready before timeout.")
time.sleep(10)

Просмотреть файл

@ -1,15 +1,16 @@
import re
import azure.batch.batch_service_client as batch
from typing import Optional
import azure.batch.batch_auth as batch_auth
import azure.batch.batch_service_client as batch
import azure.storage.blob as blob
from aztk import error
from aztk.version import __version__
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.batch import BatchManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.storage.common import CloudStorageAccount
from typing import Optional
from aztk import error
from aztk.version import __version__
RESOURCE_ID_PATTERN = re.compile('^/subscriptions/(?P<subscription>[^/]+)'
'/resourceGroups/(?P<resourcegroup>[^/]+)'
@ -39,8 +40,7 @@ def make_batch_client(secrets):
if secrets.shared_key:
# Set up SharedKeyCredentials
base_url = secrets.shared_key.batch_service_url
credentials = batch_auth.SharedKeyCredentials(
secrets.shared_key.batch_account_name,
credentials = batch_auth.SharedKeyCredentials(secrets.shared_key.batch_account_name,
secrets.shared_key.batch_account_key)
else:
# Set up ServicePrincipalCredentials
@ -60,9 +60,7 @@ def make_batch_client(secrets):
resource='https://batch.core.windows.net/')
# Set up Batch Client
batch_client = batch.BatchServiceClient(
credentials,
base_url=base_url)
batch_client = batch.BatchServiceClient(credentials, base_url=base_url)
# Set retry policy
batch_client.config.retry_policy.retries = 5
@ -97,8 +95,25 @@ def make_blob_client(secrets):
subscription = m.group('subscription')
resourcegroup = m.group('resourcegroup')
mgmt_client = StorageManagementClient(arm_credentials, subscription)
key = mgmt_client.storage_accounts.list_keys(resource_group_name=resourcegroup, account_name=accountname).keys[0].value
key = retry_function(
mgmt_client.storage_accounts.list_keys,
10,
1,
Exception,
resource_group_name=resourcegroup,
account_name=accountname).keys[0].value
storage_client = CloudStorageAccount(accountname, key)
blob_client = storage_client.create_block_blob_service()
return blob_client
def retry_function(function, retry_attempts: int, retry_interval: int, exception: Exception, *args, **kwargs):
import time
for i in range(retry_attempts):
try:
return function(*args, **kwargs)
except exception as e:
if i == retry_attempts - 1:
raise e
time.sleep(retry_interval)

Просмотреть файл

@ -1,8 +1,9 @@
class CommandOption():
def __init__(self, name:str, value: str):
def __init__(self, name: str, value: str):
self.name = name
self.value = value
class CommandBuilder:
"""
Helper class to build a command line
@ -16,7 +17,7 @@ class CommandBuilder:
self.options = []
self.arguments = []
def add_option(self, name: str, value: str = None, enable: bool=None):
def add_option(self, name: str, value: str = None, enable: bool = None):
"""
Add an option to the command line.

Просмотреть файл

@ -2,6 +2,7 @@ import warnings
import functools
import inspect
def deprecated(version: str, advice: str = None):
"""
This is a decorator which can be used to mark functions
@ -23,6 +24,7 @@ def deprecated(version: str, advice: str = None):
def new_func(*args, **kwargs):
deprecate(version=version, message=msg.format(name=func.__name__, advice=advice), advice=advice)
return func(*args, **kwargs)
return new_func
return decorator
@ -38,7 +40,8 @@ def deprecate(version: str, message: str, advice: str = ""):
"""
warnings.simplefilter('always', DeprecationWarning) # turn off filter
warnings.warn("{0} It will be removed in Aztk version {1}. {2}".format(message, version, advice),
warnings.warn(
"{0} It will be removed in Aztk version {1}. {2}".format(message, version, advice),
category=DeprecationWarning,
stacklevel=2)
warnings.simplefilter('default', DeprecationWarning) # reset filter

Просмотреть файл

@ -1,5 +1,6 @@
import os
def ensure_dir(file_path):
directory = os.path.dirname(file_path)
if not os.path.exists(directory):

Просмотреть файл

@ -1,5 +1,6 @@
import os
def get_user_public_key(key_or_path: str = None, secrets_config=None):
"""
Return the ssh key.

Просмотреть файл

@ -41,10 +41,7 @@ def wait_for_tasks_to_complete(job_id, batch_client):
while True:
tasks = batch_client.task.list(job_id)
incomplete_tasks = [
task for task in tasks
if task.state != batch_models.TaskState.completed
]
incomplete_tasks = [task for task in tasks if task.state != batch_models.TaskState.completed]
if not incomplete_tasks:
return
time.sleep(5)
@ -66,10 +63,7 @@ def wait_for_task_to_complete(job_id: str, task_id: str, batch_client):
return
def upload_text_to_container(container_name: str,
application_name: str,
content: str,
file_path: str,
def upload_text_to_container(container_name: str, application_name: str, content: str, file_path: str,
blob_client=None) -> batch_models.ResourceFile:
blob_name = file_path
blob_path = application_name + '/' + blob_name # + '/' + time_stamp + '/' + blob_name
@ -82,8 +76,7 @@ def upload_text_to_container(container_name: str,
permission=blob.BlobPermissions.READ,
expiry=datetime.datetime.utcnow() + datetime.timedelta(days=365))
sas_url = blob_client.make_blob_url(
container_name, blob_path, sas_token=sas_token)
sas_url = blob_client.make_blob_url(container_name, blob_path, sas_token=sas_token)
return batch_models.ResourceFile(file_path=blob_name, blob_source=sas_url)
@ -126,8 +119,7 @@ def upload_file_to_container(container_name,
permission=blob.BlobPermissions.READ,
expiry=datetime.datetime.utcnow() + datetime.timedelta(days=7))
sas_url = blob_client.make_blob_url(
container_name, blob_path, sas_token=sas_token)
sas_url = blob_client.make_blob_url(container_name, blob_path, sas_token=sas_token)
return batch_models.ResourceFile(file_path=node_path, blob_source=sas_url)
@ -145,8 +137,7 @@ def create_pool_if_not_exist(pool, batch_client):
except batch_models.BatchErrorException as e:
if e.error.code == "PoolExists":
raise error.AztkError(
"A cluster with the same id already exists. Use a different id or delete the existing cluster"
)
"A cluster with the same id already exists. Use a different id or delete the existing cluster")
else:
raise
return True
@ -167,20 +158,16 @@ def wait_for_all_nodes_state(pool, node_state, batch_client):
# refresh pool to ensure that there is no resize error
pool = batch_client.pool.get(pool.id)
if pool.resize_errors is not None:
raise RuntimeError(
'resize error encountered for pool {}: {!r}'.format(
pool.id, pool.resize_errors))
raise RuntimeError('resize error encountered for pool {}: {!r}'.format(pool.id, pool.resize_errors))
nodes = list(batch_client.compute_node.list(pool.id))
totalNodes = pool.target_dedicated_nodes + pool.target_low_priority_nodes
if (len(nodes) >= totalNodes
and all(node.state in node_state for node in nodes)):
if (len(nodes) >= totalNodes and all(node.state in node_state for node in nodes)):
return nodes
time.sleep(1)
def select_latest_verified_vm_image_with_node_agent_sku(
publisher, offer, sku_starts_with, batch_client):
def select_latest_verified_vm_image_with_node_agent_sku(publisher, offer, sku_starts_with, batch_client):
"""
Select the latest verified image that Azure Batch supports given
a publisher, offer and sku (starts with filter).
@ -196,25 +183,18 @@ def select_latest_verified_vm_image_with_node_agent_sku(
node_agent_skus = batch_client.account.list_node_agent_skus()
# pick the latest supported sku
skus_to_use = [
(sku, image_ref) for sku in node_agent_skus for image_ref in sorted(
sku.verified_image_references, key=lambda item: item.sku)
if image_ref.publisher.lower() == publisher.lower()
and image_ref.offer.lower() == offer.lower()
and image_ref.sku.startswith(sku_starts_with)
]
skus_to_use = [(sku, image_ref)
for sku in node_agent_skus
for image_ref in sorted(sku.verified_image_references, key=lambda item: item.sku)
if image_ref.publisher.lower() == publisher.lower() and image_ref.offer.lower() == offer.lower()
and image_ref.sku.startswith(sku_starts_with)]
# skus are listed in reverse order, pick first for latest
sku_to_use, image_ref_to_use = skus_to_use[0]
return (sku_to_use.id, image_ref_to_use)
def create_sas_token(container_name,
blob_name,
permission,
blob_client,
expiry=None,
timeout=None):
def create_sas_token(container_name, blob_name, permission, blob_client, expiry=None, timeout=None):
"""
Create a blob sas token
:param blob_client: The storage block blob client to use.
@ -231,18 +211,12 @@ def create_sas_token(container_name,
if expiry is None:
if timeout is None:
timeout = 30
expiry = datetime.datetime.utcnow() + datetime.timedelta(
minutes=timeout)
expiry = datetime.datetime.utcnow() + datetime.timedelta(minutes=timeout)
return blob_client.generate_blob_shared_access_signature(
container_name, blob_name, permission=permission, expiry=expiry)
def upload_blob_and_create_sas(container_name,
blob_name,
file_name,
expiry,
blob_client,
timeout=None):
def upload_blob_and_create_sas(container_name, blob_name, file_name, expiry, blob_client, timeout=None):
"""
Uploads a file from local disk to Azure Storage and creates a SAS for it.
:param blob_client: The storage block blob client to use.
@ -269,8 +243,7 @@ def upload_blob_and_create_sas(container_name,
expiry=expiry,
timeout=timeout)
sas_url = blob_client.make_blob_url(
container_name, blob_name, sas_token=sas_token)
sas_url = blob_client.make_blob_url(container_name, blob_name, sas_token=sas_token)
return sas_url
@ -283,8 +256,7 @@ def wrap_commands_in_shell(commands):
:rtype: str
:return: a shell wrapping commands
"""
return '/bin/bash -c \'set -e; set -o pipefail; {}; wait\''.format(
';'.join(commands))
return '/bin/bash -c \'set -e; set -o pipefail; {}; wait\''.format(';'.join(commands))
def get_connection_info(pool_id, node_id, batch_client):
@ -328,10 +300,8 @@ def normalize_path(path: str) -> str:
return path
def get_file_properties(job_id: str, task_id: str, file_path: str,
batch_client):
raw = batch_client.file.get_properties_from_task(
job_id, task_id, file_path, raw=True)
def get_file_properties(job_id: str, task_id: str, file_path: str, batch_client):
raw = batch_client.file.get_properties_from_task(job_id, task_id, file_path, raw=True)
return batch_models.FileProperties(
content_length=raw.headers["Content-Length"],
@ -393,13 +363,9 @@ def read_cluster_config(cluster_id: str, blob_client: blob.BlockBlobService):
result = blob_client.get_blob_to_text(cluster_id, blob_path)
return yaml.load(result.content)
except azure.common.AzureMissingResourceHttpError:
logging.warn(
"Cluster %s doesn't have cluster configuration in storage",
cluster_id)
logging.warn("Cluster %s doesn't have cluster configuration in storage", cluster_id)
except yaml.YAMLError:
logging.warn(
"Cluster %s contains invalid cluster configuration in blob",
cluster_id)
logging.warn("Cluster %s contains invalid cluster configuration in blob", cluster_id)
def bool_env(value: bool):

Просмотреть файл

@ -14,27 +14,27 @@ from concurrent.futures import ThreadPoolExecutor
from aztk.error import AztkError
from aztk.models import NodeOutput
class ForwardServer(SocketServer.ThreadingTCPServer):
daemon_threads = True
allow_reuse_address = True
# pylint: disable=no-member
class Handler(SocketServer.BaseRequestHandler):
def handle(self):
try:
channel = self.ssh_transport.open_channel('direct-tcpip',
(self.chain_host, self.chain_port),
self.request.getpeername())
(self.chain_host, self.chain_port), self.request.getpeername())
except Exception as e:
logging.debug('Incoming request to %s:%d failed: %s', self.chain_host,
self.chain_port,
repr(e))
logging.debug('Incoming request to %s:%d failed: %s', self.chain_host, self.chain_port, repr(e))
return
if channel is None:
logging.debug('Incoming request to %s:%d was rejected by the SSH server.', self.chain_host, self.chain_port)
return
logging.debug('Connected! Tunnel open %r -> %r -> %r', self.request.getpeername(), channel.getpeername(), (self.chain_host, self.chain_port))
logging.debug('Connected! Tunnel open %r -> %r -> %r', self.request.getpeername(), channel.getpeername(),
(self.chain_host, self.chain_port))
while True:
r, w, x = select.select([self.request, channel], [], [])
if self.request in r:
@ -59,17 +59,13 @@ def forward_tunnel(local_port, remote_host, remote_port, transport):
chain_host = remote_host
chain_port = remote_port
ssh_transport = transport
thread = threading.Thread(target=ForwardServer(('', local_port), SubHandler).serve_forever, daemon=True)
thread.start()
return thread
def connect(hostname,
port=22,
username=None,
password=None,
pkey=None,
timeout=None):
def connect(hostname, port=22, username=None, password=None, pkey=None, timeout=None):
import paramiko
client = paramiko.SSHClient()
@ -96,23 +92,28 @@ def forward_ports(client, port_forward_list):
for port_forwarding_specification in port_forward_list:
threads.append(
forward_tunnel(
port_forwarding_specification.remote_port,
"127.0.0.1",
port_forwarding_specification.local_port,
client.get_transport()
)
)
forward_tunnel(port_forwarding_specification.remote_port, "127.0.0.1",
port_forwarding_specification.local_port, client.get_transport()))
return threads
def node_exec_command(node_id, command, username, hostname, port, ssh_key=None, password=None, container_name=None, timeout=None):
def node_exec_command(node_id,
command,
username,
hostname,
port,
ssh_key=None,
password=None,
container_name=None,
timeout=None):
try:
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
client = connect(
hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
except AztkError as e:
return NodeOutput(node_id, e)
if container_name:
cmd = 'sudo docker exec 2>&1 -t {0} /bin/bash -c \'set -e; set -o pipefail; {1}; wait\''.format(container_name, command)
cmd = 'sudo docker exec 2>&1 -t {0} /bin/bash -c \'set -e; set -o pipefail; {1}; wait\''.format(
container_name, command)
else:
cmd = '/bin/bash 2>&1 -c \'set -e; set -o pipefail; {0}; wait\''.format(command)
stdin, stdout, stderr = client.exec_command(cmd, get_pty=True)
@ -121,38 +122,48 @@ def node_exec_command(node_id, command, username, hostname, port, ssh_key=None,
return NodeOutput(node_id, output, None)
async def clus_exec_command(command, username, nodes, ports=None, ssh_key=None, password=None, container_name=None, timeout=None):
return await asyncio.gather(
*[asyncio.get_event_loop().run_in_executor(ThreadPoolExecutor(),
node_exec_command,
node.id,
command,
async def clus_exec_command(command,
username,
node_rls.ip_address,
node_rls.port,
ssh_key,
password,
container_name,
timeout) for node, node_rls in nodes]
)
nodes,
ports=None,
ssh_key=None,
password=None,
container_name=None,
timeout=None):
return await asyncio.gather(*[
asyncio.get_event_loop()
.run_in_executor(ThreadPoolExecutor(), node_exec_command, node.id, command, username, node_rls.ip_address,
node_rls.port, ssh_key, password, container_name, timeout) for node, node_rls in nodes
])
def copy_from_node(node_id, source_path, destination_path, username, hostname, port, ssh_key=None, password=None, container_name=None, timeout=None):
def copy_from_node(node_id,
source_path,
destination_path,
username,
hostname,
port,
ssh_key=None,
password=None,
container_name=None,
timeout=None):
try:
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
client = connect(
hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
except AztkError as e:
return NodeOutput(node_id, False, e)
sftp_client = client.open_sftp()
try:
if destination_path:
destination_path = os.path.join(os.path.dirname(destination_path), node_id, os.path.basename(destination_path))
destination_path = os.path.join(
os.path.dirname(destination_path), node_id, os.path.basename(destination_path))
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
with open(destination_path, 'wb') as f:
sftp_client.getfo(source_path, f)
else:
import tempfile
# create 2mb temporary file
f = tempfile.SpooledTemporaryFile(2*1024**3)
f = tempfile.SpooledTemporaryFile(2 * 1024**3)
sftp_client.getfo(source_path, f)
return NodeOutput(node_id, f, None)
@ -163,9 +174,19 @@ def copy_from_node(node_id, source_path, destination_path, username, hostname, p
client.close()
def node_copy(node_id, source_path, destination_path, username, hostname, port, ssh_key=None, password=None, container_name=None, timeout=None):
def node_copy(node_id,
source_path,
destination_path,
username,
hostname,
port,
ssh_key=None,
password=None,
container_name=None,
timeout=None):
try:
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
client = connect(
hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
except AztkError as e:
return NodeOutput(node_id, None, e)
sftp_client = client.open_sftp()
@ -193,33 +214,27 @@ def node_copy(node_id, source_path, destination_path, username, hostname, port,
#TODO: progress bar
async def clus_copy(username, nodes, source_path, destination_path, ssh_key=None, password=None, container_name=None, get=False, timeout=None):
return await asyncio.gather(
*[asyncio.get_event_loop().run_in_executor(ThreadPoolExecutor(),
copy_from_node if get else node_copy,
node.id,
async def clus_copy(username,
nodes,
source_path,
destination_path,
username,
node_rls.ip_address,
node_rls.port,
ssh_key,
password,
container_name,
timeout) for node, node_rls in nodes]
)
ssh_key=None,
password=None,
container_name=None,
get=False,
timeout=None):
return await asyncio.gather(*[
asyncio.get_event_loop()
.run_in_executor(ThreadPoolExecutor(), copy_from_node
if get else node_copy, node.id, source_path, destination_path, username, node_rls.ip_address,
node_rls.port, ssh_key, password, container_name, timeout) for node, node_rls in nodes
])
def node_ssh(username, hostname, port, ssh_key=None, password=None, port_forward_list=None, timeout=None):
try:
client = connect(
hostname=hostname,
port=port,
username=username,
password=password,
pkey=ssh_key,
timeout=timeout
)
hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
threads = forward_ports(client=client, port_forward_list=port_forward_list)
except AztkError as e:
raise e

Просмотреть файл

@ -10,6 +10,7 @@ from aztk.utils import deprecate
from aztk.models import Toolkit
from aztk.models.plugins.internal import PluginReference
def load_aztk_secrets() -> SecretsConfiguration:
"""
Loads aztk from .aztk/secrets.yaml files(local and global)
@ -32,6 +33,7 @@ def load_aztk_secrets() -> SecretsConfiguration:
secrets.validate()
return secrets
def _load_config_file(path: str):
if not os.path.isfile(path):
return None
@ -40,27 +42,27 @@ def _load_config_file(path: str):
try:
return yaml.load(stream)
except yaml.YAMLError as err:
raise aztk.error.AztkError(
"Error in {0}:\n {1}".format(path, err))
raise aztk.error.AztkError("Error in {0}:\n {1}".format(path, err))
def _merge_secrets_dict(secrets: SecretsConfiguration, secrets_config):
if 'default' in secrets_config:
deprecate("0.9.0", "default key in secrets.yaml is deprecated.", "Place all child parameters directly at the root")
deprecate("0.9.0", "default key in secrets.yaml is deprecated.",
"Place all child parameters directly at the root")
secrets_config = dict(**secrets_config, **secrets_config.pop('default'))
other = SecretsConfiguration.from_dict(secrets_config)
secrets.merge(other)
def read_cluster_config(
path: str = aztk.utils.constants.DEFAULT_CLUSTER_CONFIG_PATH
) -> ClusterConfiguration:
def read_cluster_config(path: str = aztk.utils.constants.DEFAULT_CLUSTER_CONFIG_PATH) -> ClusterConfiguration:
"""
Reads the config file in the .aztk/ directory (.aztk/cluster.yaml)
"""
config_dict = _load_config_file(path)
return cluster_config_from_dict(config_dict)
def cluster_config_from_dict(config: dict):
wait = False
if config.get('plugins') not in [[None], None]:
@ -92,8 +94,7 @@ class SshConfig:
self.job_history_ui_port = '18080'
self.web_ui_port = '8080'
def _read_config_file(
self, path: str = aztk.utils.constants.DEFAULT_SSH_CONFIG_PATH):
def _read_config_file(self, path: str = aztk.utils.constants.DEFAULT_SSH_CONFIG_PATH):
"""
Reads the config file in the .aztk/ directory (.aztk/ssh.yaml)
"""
@ -104,8 +105,7 @@ class SshConfig:
try:
config = yaml.load(stream)
except yaml.YAMLError as err:
raise aztk.error.AztkError(
"Error in ssh.yaml: {0}".format(err))
raise aztk.error.AztkError("Error in ssh.yaml: {0}".format(err))
if config is None:
return
@ -137,14 +137,11 @@ class SshConfig:
if config.get('internal') is not None:
self.internal = config['internal']
def merge(self, cluster_id, username, job_ui_port, job_history_ui_port,
web_ui_port, host, connect, internal):
def merge(self, cluster_id, username, job_ui_port, job_history_ui_port, web_ui_port, host, connect, internal):
"""
Merges fields with args object
"""
self._read_config_file(
os.path.join(aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk',
'ssh.yaml'))
self._read_config_file(os.path.join(aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk', 'ssh.yaml'))
self._read_config_file()
self._merge_dict(
dict(
@ -164,8 +161,7 @@ class SshConfig:
if self.username is None:
raise aztk.error.AztkError(
"Please supply a username either in the ssh.yaml configuration file or with a parameter (--username)"
)
"Please supply a username either in the ssh.yaml configuration file or with a parameter (--username)")
class JobConfig():
@ -206,7 +202,6 @@ class JobConfig():
if scheduling_target:
self.scheduling_target = SchedulingTarget(scheduling_target)
applications = config.get('applications')
if applications:
self.applications = []
@ -226,9 +221,7 @@ class JobConfig():
driver_memory=application.get('driver_memory'),
executor_memory=application.get('executor_memory'),
driver_cores=application.get('driver_cores'),
executor_cores=application.get('executor_cores')
)
)
executor_cores=application.get('executor_cores')))
spark_configuration = config.get('spark_configuration')
if spark_configuration:
@ -241,13 +234,10 @@ class JobConfig():
if str_path:
abs_path = os.path.abspath(os.path.expanduser(str_path))
if not os.path.exists(abs_path):
raise aztk.error.AztkError(
"Could not find file: {0}\nCheck your configuration file".
format(str_path))
raise aztk.error.AztkError("Could not find file: {0}\nCheck your configuration file".format(str_path))
return abs_path
def _read_config_file(
self, path: str = aztk.utils.constants.DEFAULT_SPARK_JOB_CONFIG):
def _read_config_file(self, path: str = aztk.utils.constants.DEFAULT_SPARK_JOB_CONFIG):
"""
Reads the Job config file in the .aztk/ directory (.aztk/job.yaml)
"""
@ -258,8 +248,7 @@ class JobConfig():
try:
config = yaml.load(stream)
except yaml.YAMLError as err:
raise aztk.error.AztkError(
"Error in job.yaml: {0}".format(err))
raise aztk.error.AztkError("Error in job.yaml: {0}".format(err))
if config is None:
return
@ -278,15 +267,12 @@ class JobConfig():
raise aztk.error.AztkError(
"Application specified with no name. Please verify your configuration in job.yaml")
if entry.application is None:
raise aztk.error.AztkError(
"No path to application specified for {} in job.yaml".format(entry.name))
raise aztk.error.AztkError("No path to application specified for {} in job.yaml".format(entry.name))
def get_file_if_exists(file):
local_conf_file = os.path.join(
aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, file)
global_conf_file = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH,
file)
local_conf_file = os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, file)
global_conf_file = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH, file)
if os.path.exists(local_conf_file):
return local_conf_file
@ -309,16 +295,14 @@ def load_jars():
# try load global
try:
jars_src = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH,
'jars')
jars_src = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH, 'jars')
jars = [os.path.join(jars_src, jar) for jar in os.listdir(jars_src)]
except FileNotFoundError:
pass
# try load local, overwrite if found
try:
jars_src = os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE,
'jars')
jars_src = os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, 'jars')
jars = [os.path.join(jars_src, jar) for jar in os.listdir(jars_src)]
except FileNotFoundError:
pass

Просмотреть файл

@ -24,15 +24,11 @@ def main():
setup_common_args(parser)
subparsers = parser.add_subparsers(
title="Available Softwares", dest="software", metavar="<software>")
subparsers = parser.add_subparsers(title="Available Softwares", dest="software", metavar="<software>")
subparsers.required = True
spark_parser = subparsers.add_parser(
"spark", help="Commands to run spark jobs")
plugins_parser = subparsers.add_parser(
"plugins", help="Commands to list and view plugins")
toolkit_parser = subparsers.add_parser(
"toolkit", help="List current toolkit information and browse available ones")
spark_parser = subparsers.add_parser("spark", help="Commands to run spark jobs")
plugins_parser = subparsers.add_parser("plugins", help="Commands to list and view plugins")
toolkit_parser = subparsers.add_parser("toolkit", help="List current toolkit information and browse available ones")
spark.setup_parser(spark_parser)
plugins.setup_parser(plugins_parser)
@ -50,10 +46,8 @@ def main():
def setup_common_args(parser: argparse.ArgumentParser):
parser.add_argument('--version', action='version',
version=aztk.version.__version__)
parser.add_argument("--verbose", action='store_true',
help="Enable verbose logging.")
parser.add_argument('--version', action='version', version=aztk.version.__version__)
parser.add_argument("--verbose", action='store_true', help="Enable verbose logging.")
def parse_common_args(args: NamedTuple):

Просмотреть файл

@ -74,6 +74,7 @@ def add_coloring_to_emit_windows(fn):
args[0]._set_color(FOREGROUND_WHITE)
# print "after"
return ret
return new
@ -98,6 +99,7 @@ def add_coloring_to_emit_ansi(fn):
args[1].msg = color + args[1].msg + '\x1b[0m' # normal
# print "after"
return fn(*args)
return new
@ -108,10 +110,10 @@ else:
# all non-Windows platforms are supporting ANSI escapes so we use them
logging.StreamHandler.emit = add_coloring_to_emit_ansi(logging.StreamHandler.emit)
logging.PRINT = 19
logging.addLevelName(logging.PRINT, "PRINT")
def print_level(self, message, *args, **kwargs):
self._log(logging.PRINT, message, args, **kwargs)

Просмотреть файл

@ -11,7 +11,7 @@ def setup_parser(parser: argparse.ArgumentParser):
def execute(args: typing.NamedTuple):
plugins = plugin_manager.plugins
log.info("------------------------------------------------------")
log.info(" Plugins (%i available)",len(plugins))
log.info(" Plugins (%i available)", len(plugins))
log.info("------------------------------------------------------")
for name, plugin in plugins.items():
log.info("- %s", name)

Просмотреть файл

@ -28,30 +28,19 @@ class ClusterAction:
def setup_parser(parser: argparse.ArgumentParser):
subparsers = parser.add_subparsers(
title="Actions", dest="cluster_action", metavar="<action>")
subparsers = parser.add_subparsers(title="Actions", dest="cluster_action", metavar="<action>")
subparsers.required = True
create_parser = subparsers.add_parser(
ClusterAction.create, help="Create a new cluster")
add_user_parser = subparsers.add_parser(
ClusterAction.add_user, help="Add a user to the given cluster")
delete_parser = subparsers.add_parser(
ClusterAction.delete, help="Delete a cluster")
get_parser = subparsers.add_parser(
ClusterAction.get, help="Get information about a cluster")
list_parser = subparsers.add_parser(
ClusterAction.list, help="List clusters in your account")
app_logs_parser = subparsers.add_parser(
"app-logs", help="Get the logs from a submitted app")
ssh_parser = subparsers.add_parser(
ClusterAction.ssh, help="SSH into the master node of a cluster")
submit_parser = subparsers.add_parser(
"submit", help="Submit a new spark job to a cluster")
run_parser = subparsers.add_parser(
ClusterAction.run, help="Run a command on all nodes in your spark cluster")
copy_parser = subparsers.add_parser(
ClusterAction.copy, help="Copy files to all nodes in your spark cluster")
create_parser = subparsers.add_parser(ClusterAction.create, help="Create a new cluster")
add_user_parser = subparsers.add_parser(ClusterAction.add_user, help="Add a user to the given cluster")
delete_parser = subparsers.add_parser(ClusterAction.delete, help="Delete a cluster")
get_parser = subparsers.add_parser(ClusterAction.get, help="Get information about a cluster")
list_parser = subparsers.add_parser(ClusterAction.list, help="List clusters in your account")
app_logs_parser = subparsers.add_parser("app-logs", help="Get the logs from a submitted app")
ssh_parser = subparsers.add_parser(ClusterAction.ssh, help="SSH into the master node of a cluster")
submit_parser = subparsers.add_parser("submit", help="Submit a new spark job to a cluster")
run_parser = subparsers.add_parser(ClusterAction.run, help="Run a command on all nodes in your spark cluster")
copy_parser = subparsers.add_parser(ClusterAction.copy, help="Copy files to all nodes in your spark cluster")
debug_parser = subparsers.add_parser(
ClusterAction.debug, help="Debugging tool that aggregates logs and output from the cluster.")

Просмотреть файл

@ -6,16 +6,19 @@ from aztk_cli import config, log, utils
def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id', dest='cluster_id', required=True,
help='The unique id of your spark cluster')
parser.add_argument('-u', '--username',
help='The username to access your spark cluster\'s head node')
parser.add_argument('--id', dest='cluster_id', required=True, help='The unique id of your spark cluster')
parser.add_argument('-u', '--username', help='The username to access your spark cluster\'s head node')
auth_group = parser.add_mutually_exclusive_group()
auth_group.add_argument('-p', '--password',
auth_group.add_argument(
'-p',
'--password',
help="The password to access your spark cluster's master node. If not provided will use ssh public key.")
auth_group.add_argument('--ssh-key',
help="The ssh public key to access your spark cluster's master node. You can also set the ssh-key in the configuration file.")
auth_group.add_argument(
'--ssh-key',
help=
"The ssh public key to access your spark cluster's master node. You can also set the ssh-key in the configuration file."
)
parser.set_defaults(username="admin")
@ -32,14 +35,10 @@ def execute(args: typing.NamedTuple):
else:
ssh_key = spark_client.secrets_configuration.ssh_pub_key
ssh_key, password = utils.get_ssh_key_or_prompt(ssh_key, args.username, args.password, spark_client.secrets_configuration)
ssh_key, password = utils.get_ssh_key_or_prompt(ssh_key, args.username, args.password,
spark_client.secrets_configuration)
spark_client.cluster.create_user(
id=args.cluster_id,
username=args.username,
password=password,
ssh_key=ssh_key
)
spark_client.cluster.create_user(id=args.cluster_id, username=args.username, password=password, ssh_key=ssh_key)
if password:
log.info('password: %s', '*' * len(password))

Просмотреть файл

@ -7,18 +7,13 @@ from aztk_cli import config, utils, log
def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id',
dest='cluster_id',
required=True,
help='The unique id of your spark cluster')
parser.add_argument('--name',
dest='app_name',
required=True,
help='The unique id of your job name')
parser.add_argument('--id', dest='cluster_id', required=True, help='The unique id of your spark cluster')
parser.add_argument('--name', dest='app_name', required=True, help='The unique id of your job name')
output_group = parser.add_mutually_exclusive_group()
output_group.add_argument('--output',
output_group.add_argument(
'--output',
help='Path to the file you wish to output to. If not \
specified, output is printed to stdout')
output_group.add_argument('--tail', dest='tail', action='store_true')

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше