Feature: spark submit scheduling internal (#674)

* add internal support for scheduling_target cluster submit

* add internal support for scheduling target job submission

* add cli flag
This commit is contained in:
Jacob Freck 2018-10-26 16:58:38 -07:00 коммит произвёл GitHub
Родитель 18b74e47d5
Коммит 8c2bf0c1a6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
4 изменённых файлов: 36 добавлений и 8 удалений

Просмотреть файл

@ -73,7 +73,7 @@ def schedule_with_target(scheduling_target, task_sas_urls):
format(task_working_dir, aztk_cluster_id, task_sas_url, constants.SPARK_SUBMIT_LOGS_FILE)) format(task_working_dir, aztk_cluster_id, task_sas_url, constants.SPARK_SUBMIT_LOGS_FILE))
node_id = select_scheduling_target_node(config.spark_client.cluster, config.pool_id, scheduling_target) node_id = select_scheduling_target_node(config.spark_client.cluster, config.pool_id, scheduling_target)
node_run_output = config.spark_client.cluster.node_run( node_run_output = config.spark_client.cluster.node_run(
config.pool_id, node_id, task_cmd, timeout=120, block=False) config.pool_id, node_id, task_cmd, timeout=120, block=False, internal=True)
# block job_manager_task until scheduling_target task completion # block job_manager_task until scheduling_target task completion
wait_until_tasks_complete(aztk_cluster_id) wait_until_tasks_complete(aztk_cluster_id)

Просмотреть файл

@ -40,7 +40,15 @@ def select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduli
return cluster.master_node_id return cluster.master_node_id
def schedule_with_target(core_cluster_operations, spark_cluster_operations, cluster_id, scheduling_target, task, wait): def schedule_with_target(
core_cluster_operations,
spark_cluster_operations,
cluster_id,
scheduling_target,
task,
wait,
internal,
):
# upload "real" task definition to storage # upload "real" task definition to storage
serialized_task_resource_file = upload_serialized_task_to_storage(core_cluster_operations.blob_client, cluster_id, serialized_task_resource_file = upload_serialized_task_to_storage(core_cluster_operations.blob_client, cluster_id,
task) task)
@ -65,7 +73,8 @@ def schedule_with_target(core_cluster_operations, spark_cluster_operations, clus
format(task_working_dir, cluster_id, serialized_task_resource_file.blob_source, format(task_working_dir, cluster_id, serialized_task_resource_file.blob_source,
constants.SPARK_SUBMIT_LOGS_FILE)) constants.SPARK_SUBMIT_LOGS_FILE))
node_id = select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduling_target) node_id = select_scheduling_target_node(spark_cluster_operations, cluster_id, scheduling_target)
node_run_output = spark_cluster_operations.node_run(cluster_id, node_id, task_cmd, timeout=120, block=wait) node_run_output = spark_cluster_operations.node_run(
cluster_id, node_id, task_cmd, timeout=120, block=wait, internal=internal)
def get_cluster_scheduling_target(core_cluster_operations, cluster_id): def get_cluster_scheduling_target(core_cluster_operations, cluster_id):
@ -80,6 +89,7 @@ def submit_application(
application, application,
remote: bool = False, remote: bool = False,
wait: bool = False, wait: bool = False,
internal: bool = False,
): ):
""" """
Submit a spark app Submit a spark app
@ -90,7 +100,7 @@ def submit_application(
scheduling_target = get_cluster_scheduling_target(core_cluster_operations, cluster_id) scheduling_target = get_cluster_scheduling_target(core_cluster_operations, cluster_id)
if scheduling_target is not models.SchedulingTarget.Any: if scheduling_target is not models.SchedulingTarget.Any:
schedule_with_target(core_cluster_operations, spark_cluster_operations, cluster_id, scheduling_target, task, schedule_with_target(core_cluster_operations, spark_cluster_operations, cluster_id, scheduling_target, task,
wait) wait, internal)
else: else:
# Add task to batch job (which has the same name as cluster_id) # Add task to batch job (which has the same name as cluster_id)
core_cluster_operations.batch_client.task.add(job_id=cluster_id, task=task) core_cluster_operations.batch_client.task.add(job_id=cluster_id, task=task)
@ -107,9 +117,10 @@ def submit(
application: models.ApplicationConfiguration, application: models.ApplicationConfiguration,
remote: bool = False, remote: bool = False,
wait: bool = False, wait: bool = False,
scheduling_target: str = None, internal: bool = False,
): ):
try: try:
submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait) submit_application(core_cluster_operations, spark_cluster_operations, cluster_id, application, remote, wait,
internal)
except BatchErrorException as e: except BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e)) raise error.AztkError(helpers.format_batch_exception(e))

Просмотреть файл

@ -63,7 +63,14 @@ class ClusterOperations(SparkBaseOperations):
""" """
return list.list_clusters(self._core_cluster_operations) return list.list_clusters(self._core_cluster_operations)
def submit(self, id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False): def submit(
self,
id: str,
application: models.ApplicationConfiguration,
remote: bool = False,
wait: bool = False,
internal: bool = False,
):
"""Submit an application to a cluster. """Submit an application to a cluster.
Args: Args:
@ -72,13 +79,16 @@ class ClusterOperations(SparkBaseOperations):
remote (:obj:`bool`): If True, the application file will not be uploaded, it is assumed to be reachable remote (:obj:`bool`): If True, the application file will not be uploaded, it is assumed to be reachable
by the cluster already. This is useful when your application is stored in a mounted Azure File Share by the cluster already. This is useful when your application is stored in a mounted Azure File Share
and not the client. Defaults to False. and not the client. Defaults to False.
internal (:obj:`bool`): if True, this will connect to the node using its internal IP.
Only use this if running within the same VNET as the cluster. This only applies if the cluster's
SchedulingTarget is not set to SchedulingTarget.Any. Defaults to False.
wait (:obj:`bool`, optional): If True, this function blocks until the application has completed. wait (:obj:`bool`, optional): If True, this function blocks until the application has completed.
Defaults to False. Defaults to False.
Returns: Returns:
:obj:`None` :obj:`None`
""" """
return submit.submit(self._core_cluster_operations, self, id, application, remote, wait) return submit.submit(self._core_cluster_operations, self, id, application, remote, wait, internal)
def create_user(self, id: str, username: str, password: str = None, ssh_key: str = None): def create_user(self, id: str, username: str, password: str = None, ssh_key: str = None):
"""Create a user on every node in the cluster """Create a user on every node in the cluster

Просмотреть файл

@ -81,6 +81,12 @@ def setup_parser(parser: argparse.ArgumentParser):
already accessible at the given path", already accessible at the given path",
) )
parser.add_argument(
"--internal",
action="store_true",
help="Connect using the local IP of the master node. Only use if using a VPN.",
)
parser.add_argument( parser.add_argument(
"app", "app",
help="App jar OR python file to execute. A path to a local " help="App jar OR python file to execute. A path to a local "
@ -133,6 +139,7 @@ def execute(args: typing.NamedTuple):
max_retry_count=args.max_retry_count, max_retry_count=args.max_retry_count,
), ),
remote=args.remote, remote=args.remote,
internal=args.internal,
wait=False, wait=False,
) )