зеркало из https://github.com/Azure/aztk.git
mpi task in pool creation + regular task to submit
This commit is contained in:
Родитель
8b80fff5a3
Коммит
aaa1cad029
|
@ -18,11 +18,7 @@ import azure.batch.models as batch_models
|
|||
import azure.storage.blob as blob
|
||||
|
||||
# config file path
|
||||
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
|
||||
|
||||
# generate random number for deployment suffix
|
||||
# TODO generate real GUID instead of a random number
|
||||
_deployment_suffix = str(random.randint(0,1000000))
|
||||
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
|
@ -30,6 +26,7 @@ if __name__ == '__main__':
|
|||
_app_id = None
|
||||
_app_file_name = None
|
||||
_app_file_path = None
|
||||
_wait = True
|
||||
|
||||
# parse arguments
|
||||
parser = argparse.ArgumentParser(prog="az_spark")
|
||||
|
@ -40,6 +37,9 @@ if __name__ == '__main__':
|
|||
help="the unique name of your spark app")
|
||||
parser.add_argument("--file", required=True,
|
||||
help="the relative path to your spark app in your directory")
|
||||
parser.add_argument('--wait', dest='wait', action='store_true')
|
||||
parser.add_argument('--no-wait', dest='wait', action='store_false')
|
||||
parser.set_defaults(wait=True)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
@ -58,6 +58,11 @@ if __name__ == '__main__':
|
|||
print("spark job file path: %s" % _app_file_path)
|
||||
print("spark job file name: %s" % _app_file_name)
|
||||
|
||||
if args.wait is not None:
|
||||
if args.wait == False:
|
||||
_wait = False
|
||||
print("wait for cluster: %r" % _wait)
|
||||
|
||||
# Read config file
|
||||
global_config = configparser.ConfigParser()
|
||||
global_config.read(_config_path)
|
||||
|
@ -97,5 +102,6 @@ if __name__ == '__main__':
|
|||
app_id = _app_id,
|
||||
app_file_name = _app_file_name,
|
||||
# app_file_path = os.path.join(os.path.dirname(__file__), _app_file_path),
|
||||
app_file_path = _app_file_path)
|
||||
app_file_path = _app_file_path,
|
||||
wait = _wait)
|
||||
|
||||
|
|
|
@ -32,8 +32,6 @@ if __name__ == '__main__':
|
|||
|
||||
parser.add_argument("--cluster-id", required=True,
|
||||
help="the unique name of your spark cluster")
|
||||
parser.add_argument("--app-id", required=True,
|
||||
help="the unique name of your spark app")
|
||||
parser.add_argument("-u", "--user",
|
||||
help="the relative path to your spark app in your directory")
|
||||
parser.add_argument("-p", "--password",
|
||||
|
@ -46,10 +44,6 @@ if __name__ == '__main__':
|
|||
_pool_id = args.cluster_id
|
||||
print("spark cluster id: %s" % _pool_id)
|
||||
|
||||
if args.app_id is not None:
|
||||
_app_id = args.app_id
|
||||
print("spark job id: %s" % _app_id)
|
||||
|
||||
if args.user is not None:
|
||||
_username = args.user
|
||||
print("az_spark username: %s" % _username)
|
||||
|
@ -81,10 +75,9 @@ if __name__ == '__main__':
|
|||
batch_client.config.retry_policy.retries = 5
|
||||
|
||||
# get ssh command
|
||||
sparklib.ssh_app(
|
||||
sparklib.ssh(
|
||||
batch_client,
|
||||
pool_id = _pool_id,
|
||||
app_id = _app_id,
|
||||
username = _username,
|
||||
password = _password,
|
||||
ports = [8082])
|
|
@ -8,7 +8,7 @@ import azure.batch.models as batch_models
|
|||
_WEBUI_PORT = 8082
|
||||
_JUPYTER_PORT = 7777
|
||||
|
||||
def install_cmd():
|
||||
def cluster_install_cmd():
|
||||
'''
|
||||
this command is run-elevated
|
||||
'''
|
||||
|
@ -19,7 +19,7 @@ def install_cmd():
|
|||
'exit 0'
|
||||
]
|
||||
|
||||
def connect_cmd():
|
||||
def cluster_connect_cmd():
|
||||
return [
|
||||
# print env vars for debug
|
||||
'echo CCP_NODES:',
|
||||
|
@ -43,15 +43,22 @@ def connect_cmd():
|
|||
# delete existing content & create a new line in the slaves file
|
||||
'echo > $SPARK_HOME/conf/slaves',
|
||||
|
||||
# make empty 'master' file in $SPARK/conf
|
||||
'cp $SPARK_HOME/conf/slaves $SPARK_HOME/conf/master',
|
||||
|
||||
# add batch pool ips to newly created slaves files
|
||||
# make file name master
|
||||
'IFS="," read -r -a workerips <<< $AZ_BATCH_HOST_LIST',
|
||||
'for index in "${!workerips[@]}"',
|
||||
'do echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves', # TODO unless node is master
|
||||
'echo "${workerips[index]}"',
|
||||
'do echo "{workerips[index]}"',
|
||||
'if [ "${AZ_BATCH_MASTER_NODE%:*}" = "${workerips[index]}" ]',
|
||||
'then echo "${workerips[index]}" >> $SPARK_HOME/conf/master',
|
||||
'else echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves',
|
||||
'fi',
|
||||
'done'
|
||||
]
|
||||
|
||||
def custom_app_cmd(webui_port, app_file_name):
|
||||
def cluster_start_cmd(webui_port):
|
||||
return [
|
||||
# set SPARK_HOME environment vars
|
||||
'export SPARK_HOME=/dsvm/tools/spark/current',
|
||||
|
@ -59,14 +66,24 @@ def custom_app_cmd(webui_port, app_file_name):
|
|||
|
||||
# kick off start-all spark command as a bg process
|
||||
'($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)',
|
||||
]
|
||||
|
||||
def app_submit_cmd(webui_port, app_file_name):
|
||||
return [
|
||||
# set SPARK_HOME environment vars
|
||||
'export SPARK_HOME=/dsvm/tools/spark/current',
|
||||
'export PATH=$PATH:$SPARK_HOME/bin',
|
||||
|
||||
# set the runtime to python 3
|
||||
'export PYSPARK_PYTHON=/usr/bin/python3',
|
||||
'export PYSPARK_DRIVER_PYTHON=python3',
|
||||
|
||||
# get master node ip
|
||||
'export MASTER_NODE=$(cat $SPARK_HOME/conf/master)',
|
||||
|
||||
# execute spark-submit on the specified app
|
||||
'$SPARK_HOME/bin/spark-submit ' +
|
||||
'--master spark://${AZ_BATCH_MASTER_NODE%:*}:7077 ' +
|
||||
'--master spark://${MASTER_NODE%:*}:7077 ' +
|
||||
'$AZ_BATCH_TASK_WORKING_DIR/' + app_file_name
|
||||
]
|
||||
|
||||
|
@ -122,8 +139,12 @@ def create_cluster(
|
|||
_offer = 'linux-data-science-vm'
|
||||
_sku = 'linuxdsvm'
|
||||
|
||||
# reuse pool_id as job_id
|
||||
job_id = pool_id
|
||||
|
||||
# start task command
|
||||
start_task_commands = install_cmd()
|
||||
start_task_commands = cluster_install_cmd()
|
||||
|
||||
# Get a verified node agent sku
|
||||
sku_to_use, image_ref_to_use = \
|
||||
util.select_latest_verified_vm_image_with_node_agent_sku(
|
||||
|
@ -147,8 +168,7 @@ def create_cluster(
|
|||
# Create the pool
|
||||
util.create_pool_if_not_exist(batch_client, pool, wait=wait)
|
||||
|
||||
# Create job (reuse pool_id as job_id)
|
||||
job_id = pool_id
|
||||
# Create job
|
||||
job = batch_models.JobAddParameter(
|
||||
id = job_id,
|
||||
pool_info=batch_models.PoolInformation(pool_id = pool_id))
|
||||
|
@ -156,6 +176,35 @@ def create_cluster(
|
|||
# Add job to batch
|
||||
batch_client.job.add(job)
|
||||
|
||||
# create application/coordination commands
|
||||
coordination_cmd = cluster_connect_cmd()
|
||||
application_cmd = cluster_start_cmd(_WEBUI_PORT)
|
||||
|
||||
# reuse pool_id as multi-instance task id
|
||||
task_id = pool_id
|
||||
|
||||
# Create multi-instance task
|
||||
task = batch_models.TaskAddParameter(
|
||||
id = task_id,
|
||||
command_line = util.wrap_commands_in_shell(application_cmd),
|
||||
resource_files = [],
|
||||
run_elevated = False,
|
||||
multi_instance_settings = batch_models.MultiInstanceSettings(
|
||||
number_of_instances = vm_count,
|
||||
coordination_command_line = util.wrap_commands_in_shell(coordination_cmd),
|
||||
common_resource_files = []))
|
||||
|
||||
# Add task to batch job (which has the same name as pool_id)
|
||||
batch_client.task.add(job_id = job_id, task = task)
|
||||
|
||||
# Wait for the app to finish
|
||||
if wait == True:
|
||||
util.wait_for_tasks_to_complete(
|
||||
batch_client,
|
||||
job_id,
|
||||
datetime.timedelta(minutes=60))
|
||||
|
||||
|
||||
def get_cluster_details(
|
||||
batch_client,
|
||||
pool_id):
|
||||
|
@ -207,7 +256,7 @@ def get_master_node_id(batch_client, pool_id):
|
|||
return ""
|
||||
|
||||
def list_clusters(
|
||||
batch_client):
|
||||
batch_client):
|
||||
print_format = '{:<34}| {:<10}| {:<20}| {:<7}'
|
||||
print_format_underline = '{:-<34}|{:-<11}|{:-<21}|{:-<7}'
|
||||
|
||||
|
@ -252,8 +301,8 @@ def submit_app(
|
|||
pool_id,
|
||||
app_id,
|
||||
app_file_path,
|
||||
app_file_name):
|
||||
#TODO add 'wait' param
|
||||
app_file_name,
|
||||
wait):
|
||||
|
||||
"""
|
||||
Submit a spark app
|
||||
|
@ -277,39 +326,35 @@ def submit_app(
|
|||
util.upload_file_to_container(
|
||||
blob_client, container_name = app_id, file_path = app_file_path)
|
||||
|
||||
# create application/coordination commands
|
||||
coordination_cmd = connect_cmd()
|
||||
application_cmd = custom_app_cmd(_WEBUI_PORT, app_file_name)
|
||||
# create command to submit task
|
||||
cmd = app_submit_cmd(_WEBUI_PORT, app_file_name)
|
||||
|
||||
# Get pool size
|
||||
pool = batch_client.pool.get(pool_id)
|
||||
pool_size = pool.target_dedicated
|
||||
|
||||
# Create multi-instance task
|
||||
# Create task
|
||||
task = batch_models.TaskAddParameter(
|
||||
id = app_id,
|
||||
command_line = util.wrap_commands_in_shell(application_cmd),
|
||||
id=app_id,
|
||||
command_line=util.wrap_commands_in_shell(cmd),
|
||||
resource_files = [app_resource_file],
|
||||
run_elevated = False,
|
||||
multi_instance_settings = batch_models.MultiInstanceSettings(
|
||||
number_of_instances = pool_size,
|
||||
coordination_command_line = util.wrap_commands_in_shell(coordination_cmd),
|
||||
common_resource_files = []))
|
||||
run_elevated = False
|
||||
)
|
||||
|
||||
# Add task to batch job (which has the same name as pool_id)
|
||||
job_id = pool_id
|
||||
batch_client.task.add(job_id = job_id, task = task)
|
||||
|
||||
# Wait for the app to finish
|
||||
util.wait_for_tasks_to_complete(
|
||||
batch_client,
|
||||
job_id,
|
||||
datetime.timedelta(minutes=60))
|
||||
if wait == True:
|
||||
util.wait_for_tasks_to_complete(
|
||||
batch_client,
|
||||
job_id,
|
||||
datetime.timedelta(minutes=60))
|
||||
|
||||
def ssh_app(
|
||||
def ssh(
|
||||
batch_client,
|
||||
pool_id,
|
||||
app_id,
|
||||
username,
|
||||
password,
|
||||
ports = None):
|
||||
|
@ -321,8 +366,6 @@ def ssh_app(
|
|||
:type batch_client: 'batchserviceclient.BatchServiceClient'
|
||||
:param pool_id: The id of the pool to submit app to
|
||||
:type pool_id: string
|
||||
:param app_id: The id of the spark app (corresponds to batch task)
|
||||
:type app_id: string
|
||||
:param username: The username to access the head node via ssh
|
||||
:type username: string
|
||||
:param password: The password to access the head node via ssh
|
||||
|
@ -331,9 +374,9 @@ def ssh_app(
|
|||
:type ports: [<int>]
|
||||
"""
|
||||
|
||||
# Get master node id from task
|
||||
# Get master node id from task (job and task are both named pool_id)
|
||||
master_node_id = batch_client.task \
|
||||
.get(job_id=pool_id, task_id=app_id) \
|
||||
.get(job_id=pool_id, task_id=pool_id) \
|
||||
.node_info.node_id
|
||||
|
||||
# Create new ssh user for the master node
|
||||
|
@ -403,7 +446,7 @@ def jupyter(
|
|||
"""
|
||||
|
||||
# create application/coordination commands
|
||||
coordination_cmd = connect_cmd()
|
||||
coordination_cmd = cluster_connect_cmd()
|
||||
application_cmd = jupyter_cmd(_WEBUI_PORT, _JUPYTER_PORT)
|
||||
|
||||
# Get pool size
|
||||
|
|
4
setup.py
4
setup.py
|
@ -10,8 +10,10 @@ setup(name='redbull',
|
|||
packages=['redbull'],
|
||||
scripts=['bin/spark-cluster-create',
|
||||
'bin/spark-cluster-delete',
|
||||
'bin/spark-cluster-ssh',
|
||||
'bin/spark-cluster-get',
|
||||
'bin/spark-cluster-list',
|
||||
'bin/spark-app-submit',
|
||||
'bin/spark-app-ssh',
|
||||
'bin/spark-app-list',
|
||||
'bin/spark-app-jupyter'],
|
||||
zip_safe=False)
|
||||
|
|
Загрузка…
Ссылка в новой задаче