convert spark app from job to task

This commit is contained in:
jiata 2017-04-13 03:52:19 +00:00
Родитель 50073cf2c9
Коммит c9441f3ab3
4 изменённых файлов: 65 добавлений и 53 удалений

Просмотреть файл

@ -18,7 +18,7 @@ import azure.batch.models as batch_models
import azure.storage.blob as blob
# config file path
_config_path = os.path.join(os.path.dirname(__file__), 'configuration.cfg')
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
if __name__ == '__main__':

Просмотреть файл

@ -18,7 +18,7 @@ import azure.batch.models as batch_models
import azure.storage.blob as blob
# config file path
_config_path = os.path.join(os.path.dirname(__file__), 'configuration.cfg')
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
# generate random number for deployment suffix
_deployment_suffix = str(random.randint(0,1000000))

Просмотреть файл

@ -18,17 +18,18 @@ import azure.batch.models as batch_models
import azure.storage.blob as blob
# config file path
_config_path = os.path.join(os.path.dirname(__file__), 'configuration.cfg')
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
# generate random number for deployment suffix
# TODO generate real GUID instead of a random number
_deployment_suffix = str(random.randint(0,1000000))
if __name__ == '__main__':
_pool_id = None
_job_id = 'az-spark-' + _deployment_suffix
_job_file_name = None
_job_file_path = None
_app_id = 'az-spark-' + _deployment_suffix
_app_file_name = None
_app_file_path = None
_username = 'admin'
_password = 'pass123!'
@ -37,14 +38,14 @@ if __name__ == '__main__':
parser.add_argument("--cluster-id", required=True,
help="the unique name of your spark cluster")
parser.add_argument("--job-id",
help="the unique name of your spark job")
parser.add_argument("--app-id",
help="the unique name of your spark app")
parser.add_argument("--file", required=True,
help="the relative path to your spark job in your directory")
help="the relative path to your spark app in your directory")
parser.add_argument("-u", "--user",
help="the relative path to your spark job in your directory")
help="the relative path to your spark app in your directory")
parser.add_argument("-p", "--password",
help="the relative path to your spark job in your directory")
help="the relative path to your spark app in your directory")
args = parser.parse_args()
@ -53,15 +54,15 @@ if __name__ == '__main__':
_pool_id = args.cluster_id
print("spark cluster id: %s" % _pool_id)
if args.job_id is not None:
_job_id = args.job_id
print("spark job id: %s" % _job_id)
if args.app_id is not None:
_app_id = args.app_id
print("spark job id: %s" % _app_id)
if args.file is not None:
_job_file_path = args.file
_job_file_name = os.path.basename(_job_file_path)
print("spark job file path: %s" % _job_file_path)
print("spark job file name: %s" % _job_file_name)
_app_file_path = args.file
_app_file_name = os.path.basename(_app_file_path)
print("spark job file path: %s" % _app_file_path)
print("spark job file name: %s" % _app_file_name)
if args.user is not None:
_username = args.user
@ -107,9 +108,10 @@ if __name__ == '__main__':
batch_client,
blob_client,
pool_id = _pool_id,
job_id = _job_id,
job_file_name = _job_file_name,
job_file_path = os.path.join(os.path.dirname(__file__), _job_file_path),
app_id = _app_id,
app_file_name = _app_file_name,
# app_file_path = os.path.join(os.path.dirname(__file__), _app_file_path),
app_file_path = _app_file_path,
username = _username,
password = _password)

Просмотреть файл

@ -63,6 +63,16 @@ def create_cluster(
# Create the pool
util.create_pool_if_not_exist(batch_client, pool, wait=wait)
# Create job (reuse pool_id as job_id)
job_id = pool_id
job = batch_models.JobAddParameter(
id = job_id,
pool_info=batch_models.PoolInformation(pool_id = pool_id))
# Add job to batch
batch_client.job.add(job)
def delete_cluster(
batch_client,
pool_id):
@ -77,8 +87,12 @@ def delete_cluster(
# delete pool by id
pool = batch_client.pool.get(pool_id)
# job id is equal to pool id
job_id = pool_id
if batch_client.pool.exists(pool_id) == True:
batch_client.pool.delete(pool_id)
batch_client.job.delete(job_id)
print("\nThe pool, '%s', is being deleted" % pool_id)
else:
print("\nThe pool, '%s', does not exist" % pool_id)
@ -88,27 +102,27 @@ def submit_job(
batch_client,
blob_client,
pool_id,
job_id,
job_file_path,
job_file_name,
app_id,
app_file_path,
app_file_name,
username,
password):
"""
Submit a spark job
Submit a spark app
:param batch_client: the batch client to use
:type batch_client: 'batchserviceclient.BatchServiceClient'
:param block_blob_client: A blob service client.
:type block_blob_client: `azure.storage.blob.BlockBlobService`
:param pool_id: The id of the pool to submit job to
:param pool_id: The id of the pool to submit app to
:type pool_id: string
:param job_id: The id of the job
:type job_id: string
:param job_file_path: The path of the spark job to run
:type job_file_path: string
:param job_file_name: The name of the spark job file to run
:type job_file_name: string
:param app_id: The id of the spark app (corresponds to batch task)
:type app_id: string
:param app_file_path: The path of the spark app to run
:type app_file_path: string
:param app_file_name: The name of the spark app file to run
:type app_file_name: string
:param username: The username to access the head node via ssh
:type username: string
:param password: The password to access the head node via ssh
@ -117,22 +131,15 @@ def submit_job(
"""
# set multi-instance task id
# TODO create a real GUID instead of just a random number
deployment_suffix = str(random.randint(0,1000000))
multiinstance_task_id= 'az-spark-' + deployment_suffix
# Upload job resource files to blob storage
job_resource_file = \
# Upload app resource files to blob storage
app_resource_file = \
util.upload_file_to_container(
blob_client, container_name = job_id, file_path = job_file_path)
blob_client, container_name = app_id, file_path = app_file_path)
#create job
job = batch_models.JobAddParameter(
id = job_id,
pool_info=batch_models.PoolInformation(pool_id = pool_id))
# add job to batch
batch_client.job.add(job)
# configure multi-instance task commands
coordination_commands = [
'echo CCP_NODES:',
@ -165,40 +172,43 @@ def submit_job(
'export PATH=$PATH:$SPARK_HOME/bin',
# kick off start-all spark command as a bg process
'($SPARK_HOME/sbin/start-all.sh &)',
# execute spark-submit on the specified job
# execute spark-submit on the specified app
'$SPARK_HOME/bin/spark-submit ' +
'--master spark://${AZ_BATCH_MASTER_NODE%:*}:7077 ' +
'$AZ_BATCH_TASK_WORKING_DIR/' + job_file_name
'$AZ_BATCH_TASK_WORKING_DIR/' + app_file_name
]
# get pool size
# Get pool size
pool = batch_client.pool.get(pool_id)
pool_size = pool.target_dedicated
# create multi-instance task
# Create multi-instance task
task = batch_models.TaskAddParameter(
id = multiinstance_task_id,
command_line = util.wrap_commands_in_shell(application_commands),
resource_files = [job_resource_file],
resource_files = [app_resource_file],
run_elevated = False,
multi_instance_settings = batch_models.MultiInstanceSettings(
number_of_instances = pool_size,
coordination_command_line = util.wrap_commands_in_shell(coordination_commands),
common_resource_files = []))
# add task to job
# Add task to batch job (which has the same name as pool_id)
job_id = pool_id
batch_client.task.add(job_id = job_id, task = task)
# wait for the job to finish
# Wait for the app to finish
util.wait_for_tasks_to_complete(
batch_client,
job_id,
datetime.timedelta(minutes=60))
# get master node id from task
master_node_id = batch_client.task.get(job_id, multiinstance_task_id).node_info.node_id
# Get master node id from task
master_node_id = batch_client.task \
.get(job_id=pool_id, task_id=multiinstance_task_id) \
.node_info.node_id
# create new ssh user for the master node
# Create new ssh user for the master node
batch_client.compute_node.add_user(
pool_id,
master_node_id,