diff --git a/bin/az_spark_start b/bin/az_spark_start index 01d68833..59bcd536 100755 --- a/bin/az_spark_start +++ b/bin/az_spark_start @@ -18,7 +18,7 @@ import azure.batch.models as batch_models import azure.storage.blob as blob # config file path -_config_path = os.path.join(os.path.dirname(__file__), 'configuration.cfg') +_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') if __name__ == '__main__': diff --git a/bin/az_spark_stop b/bin/az_spark_stop index 3e8888c5..3113c779 100755 --- a/bin/az_spark_stop +++ b/bin/az_spark_stop @@ -18,7 +18,7 @@ import azure.batch.models as batch_models import azure.storage.blob as blob # config file path -_config_path = os.path.join(os.path.dirname(__file__), 'configuration.cfg') +_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') # generate random number for deployment suffix _deployment_suffix = str(random.randint(0,1000000)) diff --git a/bin/az_spark_submit b/bin/az_spark_submit index 5846c667..ed010f4d 100755 --- a/bin/az_spark_submit +++ b/bin/az_spark_submit @@ -18,17 +18,18 @@ import azure.batch.models as batch_models import azure.storage.blob as blob # config file path -_config_path = os.path.join(os.path.dirname(__file__), 'configuration.cfg') +_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') # generate random number for deployment suffix +# TODO generate real GUID instead of a random number _deployment_suffix = str(random.randint(0,1000000)) if __name__ == '__main__': _pool_id = None - _job_id = 'az-spark-' + _deployment_suffix - _job_file_name = None - _job_file_path = None + _app_id = 'az-spark-' + _deployment_suffix + _app_file_name = None + _app_file_path = None _username = 'admin' _password = 'pass123!' @@ -37,14 +38,14 @@ if __name__ == '__main__': parser.add_argument("--cluster-id", required=True, help="the unique name of your spark cluster") - parser.add_argument("--job-id", - help="the unique name of your spark job") + parser.add_argument("--app-id", + help="the unique name of your spark app") parser.add_argument("--file", required=True, - help="the relative path to your spark job in your directory") + help="the relative path to your spark app in your directory") parser.add_argument("-u", "--user", - help="the relative path to your spark job in your directory") + help="the relative path to your spark app in your directory") parser.add_argument("-p", "--password", - help="the relative path to your spark job in your directory") + help="the relative path to your spark app in your directory") args = parser.parse_args() @@ -53,15 +54,15 @@ if __name__ == '__main__': _pool_id = args.cluster_id print("spark cluster id: %s" % _pool_id) - if args.job_id is not None: - _job_id = args.job_id - print("spark job id: %s" % _job_id) + if args.app_id is not None: + _app_id = args.app_id + print("spark job id: %s" % _app_id) if args.file is not None: - _job_file_path = args.file - _job_file_name = os.path.basename(_job_file_path) - print("spark job file path: %s" % _job_file_path) - print("spark job file name: %s" % _job_file_name) + _app_file_path = args.file + _app_file_name = os.path.basename(_app_file_path) + print("spark job file path: %s" % _app_file_path) + print("spark job file name: %s" % _app_file_name) if args.user is not None: _username = args.user @@ -107,9 +108,10 @@ if __name__ == '__main__': batch_client, blob_client, pool_id = _pool_id, - job_id = _job_id, - job_file_name = _job_file_name, - job_file_path = os.path.join(os.path.dirname(__file__), _job_file_path), + app_id = _app_id, + app_file_name = _app_file_name, + # app_file_path = os.path.join(os.path.dirname(__file__), _app_file_path), + app_file_path = _app_file_path, username = _username, password = _password) diff --git a/redbull/sparklib.py b/redbull/sparklib.py index 9bb67fdd..c68f86ed 100644 --- a/redbull/sparklib.py +++ b/redbull/sparklib.py @@ -63,6 +63,16 @@ def create_cluster( # Create the pool util.create_pool_if_not_exist(batch_client, pool, wait=wait) + # Create job (reuse pool_id as job_id) + job_id = pool_id + job = batch_models.JobAddParameter( + id = job_id, + pool_info=batch_models.PoolInformation(pool_id = pool_id)) + + # Add job to batch + batch_client.job.add(job) + + def delete_cluster( batch_client, pool_id): @@ -77,8 +87,12 @@ def delete_cluster( # delete pool by id pool = batch_client.pool.get(pool_id) + # job id is equal to pool id + job_id = pool_id + if batch_client.pool.exists(pool_id) == True: batch_client.pool.delete(pool_id) + batch_client.job.delete(job_id) print("\nThe pool, '%s', is being deleted" % pool_id) else: print("\nThe pool, '%s', does not exist" % pool_id) @@ -88,27 +102,27 @@ def submit_job( batch_client, blob_client, pool_id, - job_id, - job_file_path, - job_file_name, + app_id, + app_file_path, + app_file_name, username, password): """ - Submit a spark job + Submit a spark app :param batch_client: the batch client to use :type batch_client: 'batchserviceclient.BatchServiceClient' :param block_blob_client: A blob service client. :type block_blob_client: `azure.storage.blob.BlockBlobService` - :param pool_id: The id of the pool to submit job to + :param pool_id: The id of the pool to submit app to :type pool_id: string - :param job_id: The id of the job - :type job_id: string - :param job_file_path: The path of the spark job to run - :type job_file_path: string - :param job_file_name: The name of the spark job file to run - :type job_file_name: string + :param app_id: The id of the spark app (corresponds to batch task) + :type app_id: string + :param app_file_path: The path of the spark app to run + :type app_file_path: string + :param app_file_name: The name of the spark app file to run + :type app_file_name: string :param username: The username to access the head node via ssh :type username: string :param password: The password to access the head node via ssh @@ -117,22 +131,15 @@ def submit_job( """ # set multi-instance task id + # TODO create a real GUID instead of just a random number deployment_suffix = str(random.randint(0,1000000)) multiinstance_task_id= 'az-spark-' + deployment_suffix - # Upload job resource files to blob storage - job_resource_file = \ + # Upload app resource files to blob storage + app_resource_file = \ util.upload_file_to_container( - blob_client, container_name = job_id, file_path = job_file_path) + blob_client, container_name = app_id, file_path = app_file_path) - #create job - job = batch_models.JobAddParameter( - id = job_id, - pool_info=batch_models.PoolInformation(pool_id = pool_id)) - - # add job to batch - batch_client.job.add(job) - # configure multi-instance task commands coordination_commands = [ 'echo CCP_NODES:', @@ -165,40 +172,43 @@ def submit_job( 'export PATH=$PATH:$SPARK_HOME/bin', # kick off start-all spark command as a bg process '($SPARK_HOME/sbin/start-all.sh &)', - # execute spark-submit on the specified job + # execute spark-submit on the specified app '$SPARK_HOME/bin/spark-submit ' + '--master spark://${AZ_BATCH_MASTER_NODE%:*}:7077 ' + - '$AZ_BATCH_TASK_WORKING_DIR/' + job_file_name + '$AZ_BATCH_TASK_WORKING_DIR/' + app_file_name ] - # get pool size + # Get pool size pool = batch_client.pool.get(pool_id) pool_size = pool.target_dedicated - # create multi-instance task + # Create multi-instance task task = batch_models.TaskAddParameter( id = multiinstance_task_id, command_line = util.wrap_commands_in_shell(application_commands), - resource_files = [job_resource_file], + resource_files = [app_resource_file], run_elevated = False, multi_instance_settings = batch_models.MultiInstanceSettings( number_of_instances = pool_size, coordination_command_line = util.wrap_commands_in_shell(coordination_commands), common_resource_files = [])) - # add task to job + # Add task to batch job (which has the same name as pool_id) + job_id = pool_id batch_client.task.add(job_id = job_id, task = task) - # wait for the job to finish + # Wait for the app to finish util.wait_for_tasks_to_complete( batch_client, job_id, datetime.timedelta(minutes=60)) - # get master node id from task - master_node_id = batch_client.task.get(job_id, multiinstance_task_id).node_info.node_id + # Get master node id from task + master_node_id = batch_client.task \ + .get(job_id=pool_id, task_id=multiinstance_task_id) \ + .node_info.node_id - # create new ssh user for the master node + # Create new ssh user for the master node batch_client.compute_node.add_user( pool_id, master_node_id,