From 8c2bf0c1a6ee67df70469b1cbe01bb9411438452 Mon Sep 17 00:00:00 2001 From: Jacob Freck Date: Fri, 26 Oct 2018 16:58:38 -0700 Subject: [PATCH] Feature: spark submit scheduling internal (#674) * add internal support for scheduling_target cluster submit * add internal support for scheduling target job submission * add cli flag --- .../node_scripts/scheduling/job_submission.py | 2 +- aztk/spark/client/cluster/helpers/submit.py | 21 ++++++++++++++----- aztk/spark/client/cluster/operations.py | 14 +++++++++++-- .../spark/endpoints/cluster/cluster_submit.py | 7 +++++++ 4 files changed, 36 insertions(+), 8 deletions(-) diff --git a/aztk/node_scripts/scheduling/job_submission.py b/aztk/node_scripts/scheduling/job_submission.py index f7153711..c6e24c09 100644 --- a/aztk/node_scripts/scheduling/job_submission.py +++ b/aztk/node_scripts/scheduling/job_submission.py @@ -73,7 +73,7 @@ def schedule_with_target(scheduling_target, task_sas_urls): format(task_working_dir, aztk_cluster_id, task_sas_url, constants.SPARK_SUBMIT_LOGS_FILE)) node_id = select_scheduling_target_node(config.spark_client.cluster, config.pool_id, scheduling_target) node_run_output = config.spark_client.cluster.node_run( - config.pool_id, node_id, task_cmd, timeout=120, block=False) + config.pool_id, node_id, task_cmd, timeout=120, block=False, internal=True) # block job_manager_task until scheduling_target task completion wait_until_tasks_complete(aztk_cluster_id) diff --git a/aztk/spark/client/cluster/helpers/submit.py b/aztk/spark/client/cluster/helpers/submit.py index e44f3e2c..7a4b97e2 100644 --- a/aztk/spark/client/cluster/helpers/submit.py +++ b/aztk/spark/client/cluster/helpers/submit.py @@ -40,7 +40,15 @@ def select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduli return cluster.master_node_id -def schedule_with_target(core_cluster_operations, spark_cluster_operations, cluster_id, scheduling_target, task, wait): +def schedule_with_target( + core_cluster_operations, + spark_cluster_operations, + cluster_id, + scheduling_target, + task, + wait, + internal, +): # upload "real" task definition to storage serialized_task_resource_file = upload_serialized_task_to_storage(core_cluster_operations.blob_client, cluster_id, task) @@ -65,7 +73,8 @@ def schedule_with_target(core_cluster_operations, spark_cluster_operations, clus format(task_working_dir, cluster_id, serialized_task_resource_file.blob_source, constants.SPARK_SUBMIT_LOGS_FILE)) node_id = select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduling_target) - node_run_output = spark_cluster_operations.node_run(cluster_id, node_id, task_cmd, timeout=120, block=wait) + node_run_output = spark_cluster_operations.node_run( + cluster_id, node_id, task_cmd, timeout=120, block=wait, internal=internal) def get_cluster_scheduling_target(core_cluster_operations, cluster_id): @@ -80,6 +89,7 @@ def submit_application( application, remote: bool = False, wait: bool = False, + internal: bool = False, ): """ Submit a spark app @@ -90,7 +100,7 @@ def submit_application( scheduling_target = get_cluster_scheduling_target(core_cluster_operations, cluster_id) if scheduling_target is not models.SchedulingTarget.Any: schedule_with_target(core_cluster_operations, spark_cluster_operations, cluster_id, scheduling_target, task, - wait) + wait, internal) else: # Add task to batch job (which has the same name as cluster_id) core_cluster_operations.batch_client.task.add(job_id=cluster_id, task=task) @@ -107,9 +117,10 @@ def submit( application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False, - scheduling_target: str = None, + internal: bool = False, ): try: - submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait) + submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait, + internal) except BatchErrorException as e: raise error.AztkError(helpers.format_batch_exception(e)) diff --git a/aztk/spark/client/cluster/operations.py b/aztk/spark/client/cluster/operations.py index 54cbd7fc..01ad6d95 100644 --- a/aztk/spark/client/cluster/operations.py +++ b/aztk/spark/client/cluster/operations.py @@ -63,7 +63,14 @@ class ClusterOperations(SparkBaseOperations): """ return list.list_clusters(self._core_cluster_operations) - def submit(self, id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False): + def submit( + self, + id: str, + application: models.ApplicationConfiguration, + remote: bool = False, + wait: bool = False, + internal: bool = False, + ): """Submit an application to a cluster. Args: @@ -72,13 +79,16 @@ class ClusterOperations(SparkBaseOperations): remote (:obj:`bool`): If True, the application file will not be uploaded, it is assumed to be reachable by the cluster already. This is useful when your application is stored in a mounted Azure File Share and not the client. Defaults to False. + internal (:obj:`bool`): if True, this will connect to the node using its internal IP. + Only use this if running within the same VNET as the cluster. This only applies if the cluster's + SchedulingTarget is not set to SchedulingTarget.Any. Defaults to False. wait (:obj:`bool`, optional): If True, this function blocks until the application has completed. Defaults to False. Returns: :obj:`None` """ - return submit.submit(self._core_cluster_operations, self, id, application, remote, wait) + return submit.submit(self._core_cluster_operations, self, id, application, remote, wait, internal) def create_user(self, id: str, username: str, password: str = None, ssh_key: str = None): """Create a user on every node in the cluster diff --git a/aztk_cli/spark/endpoints/cluster/cluster_submit.py b/aztk_cli/spark/endpoints/cluster/cluster_submit.py index 7253a597..473ea0a9 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_submit.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_submit.py @@ -81,6 +81,12 @@ def setup_parser(parser: argparse.ArgumentParser): already accessible at the given path", ) + parser.add_argument( + "--internal", + action="store_true", + help="Connect using the local IP of the master node. Only use if using a VPN.", + ) + parser.add_argument( "app", help="App jar OR python file to execute. A path to a local " @@ -133,6 +139,7 @@ def execute(args: typing.NamedTuple): max_retry_count=args.max_retry_count, ), remote=args.remote, + internal=args.internal, wait=False, )