зеркало из https://github.com/Azure/aztk.git
366 строки
12 KiB
Python
366 строки
12 KiB
Python
from . import util, constants
|
|
|
|
import random
|
|
from datetime import datetime, timedelta
|
|
import azure.batch.models as batch_models
|
|
from subprocess import call
|
|
|
|
VM_IMAGES = {
|
|
'centos': {
|
|
'publisher': 'microsoft-ads',
|
|
'offer': 'linux-data-science-vm',
|
|
'sku': 'linuxdsvm'
|
|
},
|
|
'ubuntu': {
|
|
'publisher': 'microsoft-ads',
|
|
'offer': 'linux-data-science-vm-ubuntu',
|
|
'sku': 'linuxdsvmubuntu'
|
|
}
|
|
}
|
|
|
|
def cluster_install_cmd():
|
|
return [
|
|
'export SPARK_HOME=/dsvm/tools/spark/current',
|
|
'export PATH=$PATH:$SPARK_HOME/bin',
|
|
'chmod -R 777 $SPARK_HOME',
|
|
'chmod -R 777 /usr/local/share/jupyter/kernels',
|
|
'exit 0'
|
|
]
|
|
|
|
def cluster_connect_cmd():
|
|
return [
|
|
# print env vars for debug
|
|
'echo CCP_NODES:',
|
|
'echo $CCP_NODES',
|
|
'echo AZ_BATCH_NODE_LIST:',
|
|
'echo $AZ_BATCH_NODE_LIST',
|
|
'echo AZ_BATCH_HOST_LIST:',
|
|
'echo $AZ_BATCH_HOST_LIST',
|
|
'echo AZ_BATCH_MASTER_NODE:',
|
|
'echo $AZ_BATCH_MASTER_NODE',
|
|
'echo AZ_BATCH_IS_CURRENT_NODE_MASTER:',
|
|
'echo $AZ_BATCH_IS_CURRENT_NODE_MASTER',
|
|
|
|
# set SPARK_HOME environment vars
|
|
'export SPARK_HOME=/dsvm/tools/spark/current',
|
|
'export PATH=$PATH:$SPARK_HOME/bin',
|
|
|
|
# copy a 'slaves' file from the slaves.template in $SPARK_HOME/conf
|
|
'cp $SPARK_HOME/conf/slaves.template $SPARK_HOME/conf/slaves'
|
|
|
|
# delete existing content & create a new line in the slaves file
|
|
'echo > $SPARK_HOME/conf/slaves',
|
|
|
|
# make empty 'master' file in $SPARK/conf
|
|
'cp $SPARK_HOME/conf/slaves $SPARK_HOME/conf/master',
|
|
|
|
# add batch pool ips to newly created slaves files
|
|
'IFS="," read -r -a workerips <<< $AZ_BATCH_HOST_LIST',
|
|
'for index in "${!workerips[@]}"',
|
|
'do echo "${workerips[index]}"',
|
|
'if [ "${AZ_BATCH_MASTER_NODE%:*}" = "${workerips[index]}" ]',
|
|
'then echo "${workerips[index]}" >> $SPARK_HOME/conf/master',
|
|
'else echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves',
|
|
'fi',
|
|
'done'
|
|
]
|
|
|
|
def cluster_start_cmd(webui_port, jupyter_port):
|
|
return [
|
|
# set SPARK_HOME environment vars
|
|
'export SPARK_HOME=/dsvm/tools/spark/current',
|
|
'export PATH=$PATH:$SPARK_HOME/bin',
|
|
|
|
# get master node ip
|
|
'export MASTER_NODE=$(cat $SPARK_HOME/conf/master)',
|
|
|
|
# kick off start-all spark command (which starts web ui)
|
|
'($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)',
|
|
|
|
# jupyter setup: remove auth
|
|
'/anaconda/envs/py35/bin/jupyter notebook --generate-config',
|
|
'echo >> $HOME/.jupyter/jupyter_notebook_config.py',
|
|
'echo c.NotebookApp.token=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
|
|
'echo c.NotebookApp.password=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
|
|
|
|
# create jupyter kernal for pyspark
|
|
'rm -rf /usr/local/share/jupyter/kernels/*',
|
|
'mkdir /usr/local/share/jupyter/kernels/pyspark',
|
|
'touch /usr/local/share/jupyter/kernels/pyspark/kernel.json',
|
|
'echo { ' +
|
|
'\\\"display_name\\\": \\\"PySpark\\\", ' +
|
|
'\\\"language\\\": \\\"python\\\", ' +
|
|
'\\\"argv\\\": [ ' +
|
|
'\\\"/usr/bin/python3\\\", ' +
|
|
'\\\"-m\\\", ' +
|
|
'\\\"ipykernel\\\", ' +
|
|
'\\\"-f\\\", ' +
|
|
'\\\"{connection_file}\\\" ' +
|
|
'], ' +
|
|
'\\\"env\\\": { ' +
|
|
'\\\"SPARK_HOME\\\": \\\"/dsvm/tools/spark/current\\\", ' +
|
|
'\\\"PYSPARK_PYTHON\\\": \\\"/usr/bin/python3\\\", ' +
|
|
'\\\"PYSPARK_SUBMIT_ARGS\\\": ' +
|
|
'\\\"--master spark://${MASTER_NODE%:*}:7077 ' +
|
|
# '--executor-memory 6400M ' +
|
|
# '--driver-memory 6400M ' +
|
|
'pyspark-shell\\\" ' +
|
|
'}' +
|
|
'} >> /usr/local/share/jupyter/kernels/pyspark/kernel.json',
|
|
|
|
# start jupyter notebook
|
|
'(PYSPARK_DRIVER_PYTHON=/anaconda/envs/py35/bin/jupyter ' +
|
|
'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' +
|
|
'pyspark &)' # +
|
|
# '--master spark://${MASTER_NODE%:*}:7077 ' +
|
|
# '--executor-memory 6400M ' +
|
|
# '--driver-memory 6400M &)'
|
|
]
|
|
|
|
def create_cluster(
|
|
batch_client,
|
|
pool_id,
|
|
vm_count,
|
|
vm_size,
|
|
os,
|
|
wait = True):
|
|
"""
|
|
Create a spark cluster
|
|
"""
|
|
|
|
# set OS
|
|
image = None
|
|
if os == 'ubuntu':
|
|
image = VM_IMAGES['ubuntu']
|
|
elif os == 'centos':
|
|
image = VM_IMAGES['centos']
|
|
else:
|
|
print("\nInvalid OS")
|
|
|
|
# vm image
|
|
_publisher = image['publisher']
|
|
_offer = image['offer']
|
|
_sku = image['sku']
|
|
|
|
# reuse pool_id as job_id
|
|
job_id = pool_id
|
|
|
|
# start task command
|
|
start_task_commands = cluster_install_cmd()
|
|
|
|
# Get a verified node agent sku
|
|
sku_to_use, image_ref_to_use = \
|
|
util.select_latest_verified_vm_image_with_node_agent_sku(
|
|
batch_client, _publisher, _offer, _sku)
|
|
|
|
# Confiure the pool
|
|
pool = batch_models.PoolAddParameter(
|
|
id = pool_id,
|
|
virtual_machine_configuration = batch_models.VirtualMachineConfiguration(
|
|
image_reference = image_ref_to_use,
|
|
node_agent_sku_id = sku_to_use),
|
|
vm_size = vm_size,
|
|
target_dedicated = vm_count,
|
|
start_task = batch_models.StartTask(
|
|
command_line = util.wrap_commands_in_shell(start_task_commands),
|
|
user_identity = batch_models.UserIdentity(
|
|
auto_user = batch_models.AutoUserSpecification(
|
|
scope=batch_models.AutoUserScope.pool,
|
|
elevation_level=batch_models.ElevationLevel.admin)),
|
|
wait_for_success = True),
|
|
enable_inter_node_communication = True,
|
|
max_tasks_per_node = 1)
|
|
|
|
# Create the pool + create user for the pool
|
|
util.create_pool_if_not_exist(
|
|
batch_client,
|
|
pool,
|
|
wait)
|
|
|
|
# Create job
|
|
job = batch_models.JobAddParameter(
|
|
id = job_id,
|
|
pool_info=batch_models.PoolInformation(pool_id = pool_id))
|
|
|
|
# Add job to batch
|
|
batch_client.job.add(job)
|
|
|
|
# create application/coordination commands
|
|
coordination_cmd = cluster_connect_cmd()
|
|
application_cmd = cluster_start_cmd(constants._WEBUI_PORT, constants._JUPYTER_PORT)
|
|
|
|
# reuse pool_id as multi-instance task id
|
|
task_id = pool_id
|
|
|
|
# Create multi-instance task
|
|
task = batch_models.TaskAddParameter(
|
|
id = task_id,
|
|
command_line = util.wrap_commands_in_shell(application_cmd),
|
|
resource_files = [],
|
|
multi_instance_settings = batch_models.MultiInstanceSettings(
|
|
number_of_instances = vm_count,
|
|
coordination_command_line = util.wrap_commands_in_shell(coordination_cmd),
|
|
common_resource_files = []))
|
|
|
|
# Add task to batch job (which has the same name as pool_id)
|
|
try:
|
|
batch_client.task.add(job_id = job_id, task = task)
|
|
except batch_models.batch_error.BatchErrorException as err:
|
|
util.print_batch_exception(err)
|
|
if err.error.code != "JobExists":
|
|
raise
|
|
else:
|
|
print("Job {!r} already exists".format(job_id))
|
|
|
|
# Wait for the app to finish
|
|
if wait == True:
|
|
util.wait_for_tasks_to_complete(
|
|
batch_client,
|
|
job_id,
|
|
datetime.timedelta(minutes=60))
|
|
|
|
def create_user(
|
|
batch_client,
|
|
pool_id,
|
|
username,
|
|
password):
|
|
"""
|
|
Create a cluster user
|
|
"""
|
|
|
|
# Get master node id from task (job and task are both named pool_id)
|
|
master_node_id = batch_client.task \
|
|
.get(job_id=pool_id, task_id=pool_id) \
|
|
.node_info.node_id
|
|
|
|
# Create new ssh user for the master node
|
|
batch_client.compute_node.add_user(
|
|
pool_id,
|
|
master_node_id,
|
|
batch_models.ComputeNodeUser(
|
|
username,
|
|
is_admin = True,
|
|
password = password,
|
|
expiry_time = datetime.now() + timedelta(days=365)))
|
|
|
|
|
|
def get_cluster_details(
|
|
batch_client,
|
|
pool_id):
|
|
pool = batch_client.pool.get(pool_id)
|
|
if (pool.state == batch_models.PoolState.deleting):
|
|
print
|
|
nodes = batch_client.compute_node.list(pool_id=pool_id)
|
|
visible_state = pool.allocation_state.value if pool.state.value is "active" else pool.state.value
|
|
node_count = '{} -> {}'.format(pool.current_dedicated, pool.target_dedicated) if pool.state.value is "resizing" or (pool.state.value is "deleting" and pool.allocation_state.value is "resizing") else '{}'.format(pool.current_dedicated)
|
|
|
|
print("State: {}".format(visible_state))
|
|
print("Node Size: {}".format(pool.vm_size))
|
|
print("Nodes: {}".format(node_count))
|
|
print()
|
|
|
|
node_label = "Nodes"
|
|
print_format = '{:<34}| {:<15} | {:<21}| {:<8}'
|
|
print_format_underline = '{:-<34}|{:-<17}|{:-<22}|{:-<8}'
|
|
print(print_format.format(node_label, 'State', 'IP:Port', 'Master'))
|
|
print(print_format_underline.format('', '', '', ''))
|
|
|
|
master_node = util.get_master_node_id(batch_client, pool_id)
|
|
|
|
for node in nodes:
|
|
ip, port = util.get_connection_info(batch_client, pool_id, node.id)
|
|
print (print_format.format(node.id, node.state.value, "{}:{}".format(ip, port),
|
|
"*" if node.id == master_node else ""))
|
|
print()
|
|
|
|
def list_clusters(
|
|
batch_client):
|
|
print_format = '{:<34}| {:<10}| {:<20}| {:<7}'
|
|
print_format_underline = '{:-<34}|{:-<11}|{:-<21}|{:-<7}'
|
|
|
|
pools = batch_client.pool.list()
|
|
print(print_format.format('Cluster', 'State', 'VM Size', 'Nodes'))
|
|
print(print_format_underline.format('','','',''))
|
|
for pool in pools:
|
|
pool_state = pool.allocation_state.value if pool.state.value is "active" else pool.state.value
|
|
|
|
node_count = pool.current_dedicated
|
|
if pool_state is "resizing" or (pool_state is "deleting" and pool.allocation_state.value is "resizing"):
|
|
node_count = '{} -> {}'.format(pool.current_dedicated, pool.target_dedicated)
|
|
|
|
print(print_format.format(pool.id,
|
|
pool_state,
|
|
pool.vm_size,
|
|
node_count))
|
|
|
|
def delete_cluster(
|
|
batch_client,
|
|
pool_id):
|
|
"""
|
|
Delete a spark cluster
|
|
"""
|
|
# delete pool by id
|
|
pool = batch_client.pool.get(pool_id)
|
|
|
|
# job id is equal to pool id
|
|
job_id = pool_id
|
|
|
|
if batch_client.pool.exists(pool_id) == True:
|
|
batch_client.pool.delete(pool_id)
|
|
batch_client.job.delete(job_id)
|
|
print("\nThe pool, '%s', is being deleted" % pool_id)
|
|
else:
|
|
print("\nThe pool, '%s', does not exist" % pool_id)
|
|
|
|
def ssh(
|
|
batch_client,
|
|
pool_id,
|
|
username = None,
|
|
masterui = None,
|
|
webui = None,
|
|
jupyter = None,
|
|
ports = None,
|
|
connect = True):
|
|
|
|
"""
|
|
SSH into head node of spark-app
|
|
:param ports: an list of local and remote ports
|
|
:type ports: [[<local-port>, <remote-port>]]
|
|
"""
|
|
|
|
# Get master node id from task (job and task are both named pool_id)
|
|
master_node_id = batch_client.task \
|
|
.get(job_id=pool_id, task_id=pool_id) \
|
|
.node_info.node_id
|
|
|
|
# get remote login settings for the user
|
|
remote_login_settings = batch_client.compute_node.get_remote_login_settings(
|
|
pool_id, master_node_id)
|
|
|
|
master_node_ip = remote_login_settings.remote_login_ip_address
|
|
master_node_port = remote_login_settings.remote_login_port
|
|
|
|
# build ssh tunnel command
|
|
ssh_command = "ssh "
|
|
if masterui is not None:
|
|
ssh_command += "-L " + str(masterui) + ":localhost:" + str(constants._MASTER_UI_PORT) + " "
|
|
if webui is not None:
|
|
ssh_command += "-L " + str(webui) + ":localhost:" + str(constants._WEBUI_PORT) + " "
|
|
if jupyter is not None:
|
|
ssh_command += "-L " + str(jupyter) + ":localhost:" + str(constants._JUPYTER_PORT) + " "
|
|
if ports is not None:
|
|
for port in ports:
|
|
ssh_command += "-L " + str(port[0]) + ":localhost:" + str(port[1]) + " "
|
|
|
|
user = username if username is not None else "<username>";
|
|
ssh_command += user + "@" + str(master_node_ip) + " -p " + str(master_node_port)
|
|
ssh_command_array = ssh_command.split()
|
|
|
|
if (not connect):
|
|
print('\nuse the following command to connect to your spark head node:')
|
|
print()
|
|
print('\t%s' % ssh_command)
|
|
print()
|
|
else:
|
|
call(ssh_command_array)
|