2017-05-13 01:23:33 +03:00
|
|
|
from datetime import datetime, timedelta
|
2017-07-14 06:57:04 +03:00
|
|
|
from subprocess import call
|
2017-07-14 19:00:47 +03:00
|
|
|
import azure.batch.models as batch_models
|
|
|
|
from . import azure_api, constants, upload_node_scripts, util
|
|
|
|
from dtde.core import CommandBuilder
|
|
|
|
|
|
|
|
POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
|
|
|
|
auto_user=batch_models.AutoUserSpecification(
|
|
|
|
scope=batch_models.AutoUserScope.pool,
|
|
|
|
elevation_level=batch_models.ElevationLevel.admin))
|
|
|
|
|
2017-07-10 19:03:34 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
def cluster_install_cmd(zip_resource_file: batch_models.ResourceFile, custom_script_file):
|
|
|
|
"""
|
|
|
|
This will return the command line to be run on the start task of the pool to setup spark.
|
|
|
|
"""
|
2017-06-15 10:05:54 +03:00
|
|
|
ret = [
|
2017-06-13 17:46:16 +03:00
|
|
|
# setup spark home and permissions for spark folder
|
2017-04-14 09:04:12 +03:00
|
|
|
'export SPARK_HOME=/dsvm/tools/spark/current',
|
|
|
|
'export PATH=$PATH:$SPARK_HOME/bin',
|
|
|
|
'chmod -R 777 $SPARK_HOME',
|
2017-07-14 19:00:47 +03:00
|
|
|
'chmod -R 777 /usr/local/share/jupyter/kernels',
|
|
|
|
# To avoid error: "sudo: sorry, you must have a tty to run sudo"
|
|
|
|
'sed -i -e "s/Defaults requiretty.*/ #Defaults requiretty/g" /etc/sudoers',
|
|
|
|
'unzip $AZ_BATCH_TASK_WORKING_DIR/{0}'.format(zip_resource_file.file_path),
|
|
|
|
'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/main.sh',
|
|
|
|
# Convert windows line ending to unix if applicable
|
|
|
|
'dos2unix $AZ_BATCH_TASK_WORKING_DIR/main.sh',
|
|
|
|
'$AZ_BATCH_TASK_WORKING_DIR/main.sh'
|
2017-06-15 10:05:54 +03:00
|
|
|
]
|
|
|
|
|
|
|
|
if custom_script_file is not None:
|
|
|
|
ret.extend([
|
2017-07-14 19:00:47 +03:00
|
|
|
'/bin/sh -c {}'.format(custom_script_file),
|
2017-06-15 10:05:54 +03:00
|
|
|
])
|
2017-06-13 17:46:16 +03:00
|
|
|
|
2017-06-15 10:05:54 +03:00
|
|
|
ret.extend(['exit 0'])
|
2017-06-13 17:46:16 +03:00
|
|
|
|
2017-06-15 10:05:54 +03:00
|
|
|
return ret
|
2017-04-14 09:04:12 +03:00
|
|
|
|
2017-07-10 19:03:34 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
def generate_cluster_start_task(
|
|
|
|
cluster_id: str,
|
|
|
|
zip_resource_file: batch_models.ResourceFile,
|
|
|
|
custom_script: str=None):
|
|
|
|
"""
|
|
|
|
This will return the start task object for the pool to be created.
|
|
|
|
:param cluster_id str: Id of the cluster(Used for uploading the resource files)
|
|
|
|
:param zip_resource_file: Resource file object pointing to the zip file containing scripts to run on the node
|
|
|
|
:param custom_script str: Path to a local file to be uploaded to storage and run after spark started.
|
|
|
|
"""
|
2017-07-10 19:03:34 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
resource_files = [zip_resource_file]
|
2017-07-10 19:03:34 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
# Upload custom script file if given
|
|
|
|
if custom_script is not None:
|
|
|
|
resource_files.append(
|
|
|
|
util.upload_file_to_container(
|
|
|
|
container_name=cluster_id,
|
|
|
|
file_path=custom_script,
|
|
|
|
use_full_path=True))
|
|
|
|
|
|
|
|
# TODO use certificate
|
|
|
|
batch_config = azure_api.get_batch_config()
|
|
|
|
environment_settings = [
|
|
|
|
batch_models.EnvironmentSetting(
|
|
|
|
name="BATCH_ACCOUNT_KEY", value=batch_config.account_key),
|
|
|
|
batch_models.EnvironmentSetting(
|
|
|
|
name="BATCH_ACCOUNT_URL", value=batch_config.account_url),
|
2017-07-14 06:57:04 +03:00
|
|
|
]
|
2017-07-10 19:03:34 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
# start task command
|
|
|
|
command = cluster_install_cmd(zip_resource_file, custom_script)
|
|
|
|
|
|
|
|
return batch_models.StartTask(
|
|
|
|
command_line=util.wrap_commands_in_shell(command),
|
|
|
|
resource_files=resource_files,
|
|
|
|
environment_settings=environment_settings,
|
|
|
|
user_identity=POOL_ADMIN_USER_IDENTITY,
|
|
|
|
wait_for_success=True)
|
2017-07-10 19:03:34 +03:00
|
|
|
|
|
|
|
|
2017-04-11 04:48:15 +03:00
|
|
|
def create_cluster(
|
2017-07-14 19:00:47 +03:00
|
|
|
custom_script: str,
|
|
|
|
cluster_id: str,
|
2017-04-11 04:48:15 +03:00
|
|
|
vm_count,
|
2017-06-20 07:58:56 +03:00
|
|
|
vm_low_pri_count,
|
2017-04-11 04:48:15 +03:00
|
|
|
vm_size,
|
2017-07-14 19:00:47 +03:00
|
|
|
username: str,
|
|
|
|
password: str,
|
|
|
|
wait=True):
|
2017-04-11 04:48:15 +03:00
|
|
|
"""
|
2017-07-14 19:00:47 +03:00
|
|
|
Create a spark cluster
|
|
|
|
:param custom_script: Path to a custom script to run on all the node of the cluster
|
|
|
|
:parm cluster_id: Id of the cluster
|
|
|
|
:param vm_count: Number of node in the cluster
|
|
|
|
:param vm_low_pri_count: Number of low pri node in the cluster
|
|
|
|
:param vm_size: Tier of the node(standard_a2, standard_g2, etc.)
|
|
|
|
:param username: Optional username of user to add to the pool when ready(Need wait to be True)
|
|
|
|
:param password: Optional password of user to add to the pool when ready(Need wait to be True)
|
|
|
|
:param wait: If this function should wait for the cluster to be ready(Master and all slave booted)
|
2017-04-11 04:48:15 +03:00
|
|
|
"""
|
2017-07-14 19:00:47 +03:00
|
|
|
|
|
|
|
# Upload start task scripts
|
|
|
|
zip_resource_file = upload_node_scripts.zip_and_upload()
|
|
|
|
|
2017-07-07 00:25:44 +03:00
|
|
|
batch_client = azure_api.get_batch_client()
|
2017-04-11 04:48:15 +03:00
|
|
|
|
|
|
|
# vm image
|
2017-06-15 10:05:54 +03:00
|
|
|
publisher = 'microsoft-ads'
|
|
|
|
offer = 'linux-data-science-vm'
|
|
|
|
sku = 'linuxdsvm'
|
2017-04-11 04:48:15 +03:00
|
|
|
|
2017-04-16 06:34:41 +03:00
|
|
|
# reuse pool_id as job_id
|
2017-07-14 19:00:47 +03:00
|
|
|
pool_id = cluster_id
|
|
|
|
job_id = cluster_id
|
2017-04-16 06:34:41 +03:00
|
|
|
|
2017-04-11 04:48:15 +03:00
|
|
|
# Get a verified node agent sku
|
|
|
|
sku_to_use, image_ref_to_use = \
|
|
|
|
util.select_latest_verified_vm_image_with_node_agent_sku(
|
2017-07-07 00:25:44 +03:00
|
|
|
publisher, offer, sku)
|
2017-04-11 04:48:15 +03:00
|
|
|
|
|
|
|
# Confiure the pool
|
|
|
|
pool = batch_models.PoolAddParameter(
|
2017-07-14 19:00:47 +03:00
|
|
|
id=pool_id,
|
|
|
|
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
|
|
|
|
image_reference=image_ref_to_use,
|
|
|
|
node_agent_sku_id=sku_to_use),
|
|
|
|
vm_size=vm_size,
|
|
|
|
target_dedicated_nodes=vm_count,
|
|
|
|
target_low_priority_nodes=vm_low_pri_count,
|
|
|
|
start_task=generate_cluster_start_task(
|
|
|
|
pool_id, zip_resource_file, custom_script),
|
|
|
|
enable_inter_node_communication=True,
|
|
|
|
max_tasks_per_node=1)
|
2017-04-11 04:48:15 +03:00
|
|
|
|
2017-04-17 02:47:18 +03:00
|
|
|
# Create the pool + create user for the pool
|
|
|
|
util.create_pool_if_not_exist(
|
2017-07-14 19:00:47 +03:00
|
|
|
pool,
|
2017-04-17 02:47:18 +03:00
|
|
|
wait)
|
2017-04-11 04:48:15 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
# Create job
|
2017-04-13 06:52:19 +03:00
|
|
|
job = batch_models.JobAddParameter(
|
2017-07-14 19:00:47 +03:00
|
|
|
id=job_id,
|
|
|
|
pool_info=batch_models.PoolInformation(pool_id=pool_id))
|
2017-04-13 06:52:19 +03:00
|
|
|
|
|
|
|
# Add job to batch
|
2017-07-12 22:33:18 +03:00
|
|
|
batch_client.job.add(job)
|
2017-04-16 06:34:41 +03:00
|
|
|
|
|
|
|
# Wait for the app to finish
|
2017-07-14 19:00:47 +03:00
|
|
|
if wait:
|
|
|
|
util.wait_for_master_to_be_ready(pool_id)
|
2017-06-27 01:12:11 +03:00
|
|
|
|
|
|
|
if username is not None and password is not None:
|
2017-07-07 00:25:44 +03:00
|
|
|
create_user(pool_id, username, password)
|
2017-04-16 06:34:41 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
|
2017-04-17 02:47:18 +03:00
|
|
|
def create_user(
|
2017-07-14 19:00:47 +03:00
|
|
|
cluster_id: str,
|
|
|
|
username: str,
|
|
|
|
password: str):
|
2017-04-17 02:47:18 +03:00
|
|
|
"""
|
2017-07-14 19:00:47 +03:00
|
|
|
Create a cluster user
|
|
|
|
:param cluster_id: id of the spark cluster
|
|
|
|
:param username: username of the user to add
|
|
|
|
:param password: password of the user to add
|
2017-04-17 02:47:18 +03:00
|
|
|
"""
|
2017-07-07 00:25:44 +03:00
|
|
|
batch_client = azure_api.get_batch_client()
|
2017-07-10 19:03:34 +03:00
|
|
|
|
2017-04-17 02:47:18 +03:00
|
|
|
# Create new ssh user for the master node
|
|
|
|
batch_client.compute_node.add_user(
|
2017-07-14 19:00:47 +03:00
|
|
|
cluster_id,
|
|
|
|
util.get_master_node_id(cluster_id),
|
2017-04-17 02:47:18 +03:00
|
|
|
batch_models.ComputeNodeUser(
|
|
|
|
username,
|
2017-07-14 19:00:47 +03:00
|
|
|
is_admin=True,
|
|
|
|
password=password,
|
|
|
|
expiry_time=datetime.now() + timedelta(days=365)))
|
2017-04-17 02:47:18 +03:00
|
|
|
|
2017-04-16 06:34:41 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
def get_cluster_details(cluster_id: str):
|
2017-06-15 10:05:54 +03:00
|
|
|
"""
|
2017-07-14 19:00:47 +03:00
|
|
|
Print the information for the given cluster
|
|
|
|
:param cluster_id: Id of the cluster
|
2017-06-15 10:05:54 +03:00
|
|
|
"""
|
2017-07-07 00:25:44 +03:00
|
|
|
batch_client = azure_api.get_batch_client()
|
2017-07-14 19:00:47 +03:00
|
|
|
|
|
|
|
pool = batch_client.pool.get(cluster_id)
|
|
|
|
if pool.state is batch_models.PoolState.deleting:
|
|
|
|
print("Cluster is being deleted!")
|
|
|
|
return
|
|
|
|
nodes = batch_client.compute_node.list(pool_id=cluster_id)
|
2017-06-15 10:05:54 +03:00
|
|
|
visible_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value
|
2017-06-20 07:58:56 +03:00
|
|
|
node_count = '{} -> {}'.format(
|
|
|
|
pool.current_dedicated_nodes + pool.current_low_priority_nodes,
|
|
|
|
pool.target_dedicated_nodes + pool.target_low_priority_nodes) if pool.state.value is 'resizing' or (pool.state.value is 'deleting' and pool.allocation_state.value is 'resizing') else '{}'.format(pool.current_dedicated_nodes)
|
2017-05-19 06:09:48 +03:00
|
|
|
|
2017-06-15 10:05:54 +03:00
|
|
|
print()
|
2017-06-20 07:58:56 +03:00
|
|
|
print('State: {}'.format(visible_state))
|
|
|
|
print('Node Size: {}'.format(pool.vm_size))
|
|
|
|
print('Nodes: {}'.format(node_count))
|
|
|
|
print('| Dedicated: {}'.format(pool.current_dedicated_nodes))
|
|
|
|
print('| Low priority: {}'.format(pool.current_low_priority_nodes))
|
2017-04-16 00:05:05 +03:00
|
|
|
print()
|
2017-05-19 06:09:48 +03:00
|
|
|
|
2017-06-15 10:05:54 +03:00
|
|
|
node_label = 'Nodes'
|
2017-06-20 07:58:56 +03:00
|
|
|
print_format = '{:<36}| {:<15} | {:<21}| {:<8}'
|
|
|
|
print_format_underline = '{:-<36}|{:-<17}|{:-<22}|{:-<8}'
|
2017-04-16 00:05:05 +03:00
|
|
|
print(print_format.format(node_label, 'State', 'IP:Port', 'Master'))
|
|
|
|
print(print_format_underline.format('', '', '', ''))
|
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
master_node = util.get_master_node_id_from_pool(pool)
|
2017-04-16 00:05:05 +03:00
|
|
|
|
|
|
|
for node in nodes:
|
2017-07-14 19:00:47 +03:00
|
|
|
ip, port = util.get_connection_info(cluster_id, node.id)
|
|
|
|
print(print_format.format(node.id, node.state.value, '{}:{}'.format(ip, port),
|
|
|
|
'*' if node.id == master_node else ''))
|
2017-04-16 00:05:05 +03:00
|
|
|
print()
|
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
|
2017-07-07 00:25:44 +03:00
|
|
|
def list_clusters():
|
2017-06-15 10:05:54 +03:00
|
|
|
"""
|
2017-07-14 19:00:47 +03:00
|
|
|
List all the cluster on your account.
|
2017-06-15 10:05:54 +03:00
|
|
|
"""
|
2017-07-07 00:25:44 +03:00
|
|
|
batch_client = azure_api.get_batch_client()
|
2017-07-14 19:00:47 +03:00
|
|
|
|
2017-04-16 00:54:58 +03:00
|
|
|
print_format = '{:<34}| {:<10}| {:<20}| {:<7}'
|
|
|
|
print_format_underline = '{:-<34}|{:-<11}|{:-<21}|{:-<7}'
|
2017-07-14 19:00:47 +03:00
|
|
|
|
2017-04-16 00:05:05 +03:00
|
|
|
pools = batch_client.pool.list()
|
2017-04-16 00:54:58 +03:00
|
|
|
print(print_format.format('Cluster', 'State', 'VM Size', 'Nodes'))
|
2017-07-14 19:00:47 +03:00
|
|
|
print(print_format_underline.format('', '', '', ''))
|
2017-04-16 00:05:05 +03:00
|
|
|
for pool in pools:
|
2017-06-15 10:05:54 +03:00
|
|
|
pool_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value
|
2017-05-12 23:25:47 +03:00
|
|
|
|
2017-06-20 07:58:56 +03:00
|
|
|
target_nodes = util.get_cluster_total_target_nodes(pool)
|
2017-07-14 19:00:47 +03:00
|
|
|
current_nodes = util.get_cluster_total_current_nodes(pool)
|
2017-06-20 07:58:56 +03:00
|
|
|
node_count = current_nodes
|
2017-06-15 10:05:54 +03:00
|
|
|
if pool_state is 'resizing' or (pool_state is 'deleting' and pool.allocation_state.value is 'resizing'):
|
2017-06-20 07:58:56 +03:00
|
|
|
node_count = '{} -> {}'.format(current_nodes, target_nodes)
|
2017-04-13 06:52:19 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
print(print_format.format(pool.id,
|
|
|
|
pool_state,
|
|
|
|
pool.vm_size,
|
|
|
|
node_count))
|
|
|
|
|
|
|
|
|
|
|
|
def delete_cluster(cluster_id: str):
|
2017-04-11 04:48:15 +03:00
|
|
|
"""
|
2017-07-14 19:00:47 +03:00
|
|
|
Delete a spark cluster
|
|
|
|
:param cluster_id: Id of the cluster to delete
|
2017-04-11 04:48:15 +03:00
|
|
|
"""
|
2017-07-07 00:25:44 +03:00
|
|
|
batch_client = azure_api.get_batch_client()
|
2017-07-14 19:00:47 +03:00
|
|
|
|
2017-04-11 04:48:15 +03:00
|
|
|
# delete pool by id
|
2017-07-14 19:00:47 +03:00
|
|
|
pool_id = cluster_id
|
2017-04-11 04:48:15 +03:00
|
|
|
|
2017-04-13 06:52:19 +03:00
|
|
|
# job id is equal to pool id
|
|
|
|
job_id = pool_id
|
2017-07-14 19:00:47 +03:00
|
|
|
job_exists = True
|
2017-04-13 06:52:19 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
try:
|
|
|
|
batch_client.job.get(job_id)
|
|
|
|
except:
|
|
|
|
job_exists = False
|
|
|
|
|
|
|
|
pool_exists = batch_client.pool.exists(pool_id)
|
|
|
|
|
|
|
|
if job_exists:
|
2017-07-14 06:57:04 +03:00
|
|
|
batch_client.job.delete(job_id)
|
2017-07-13 23:56:52 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
if pool_exists:
|
|
|
|
batch_client.pool.delete(pool_id)
|
|
|
|
|
|
|
|
if job_exists or pool_exists:
|
|
|
|
print("Deleting cluster {0}".format(cluster_id))
|
2017-07-14 06:57:04 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
|
|
|
|
def ssh(
|
|
|
|
cluster_id: str,
|
|
|
|
username: str=None,
|
|
|
|
masterui: str=None,
|
|
|
|
webui: str=None,
|
|
|
|
jupyter: str=None,
|
|
|
|
ports=None,
|
|
|
|
connect: bool=True):
|
2017-04-13 10:34:19 +03:00
|
|
|
"""
|
2017-07-14 19:00:47 +03:00
|
|
|
SSH into head node of spark-app
|
|
|
|
:param cluster_id: Id of the cluster to ssh in
|
|
|
|
:param username: Username to use to ssh
|
|
|
|
:param masterui: Port for the master ui(Local port)
|
|
|
|
:param webui: Port for the spark web ui(Local port)
|
|
|
|
:param jupyter: Port for jupyter(Local port)
|
|
|
|
:param ports: an list of local and remote ports
|
|
|
|
:type ports: [[<local-port>, <remote-port>]]
|
2017-04-13 10:34:19 +03:00
|
|
|
"""
|
2017-07-07 00:25:44 +03:00
|
|
|
batch_client = azure_api.get_batch_client()
|
2017-04-13 10:34:19 +03:00
|
|
|
|
2017-04-16 06:34:41 +03:00
|
|
|
# Get master node id from task (job and task are both named pool_id)
|
2017-07-14 19:00:47 +03:00
|
|
|
master_node_id = util.get_master_node_id(cluster_id)
|
2017-04-11 04:48:15 +03:00
|
|
|
|
|
|
|
# get remote login settings for the user
|
|
|
|
remote_login_settings = batch_client.compute_node.get_remote_login_settings(
|
2017-07-14 19:00:47 +03:00
|
|
|
cluster_id, master_node_id)
|
2017-04-11 04:48:15 +03:00
|
|
|
|
2017-07-13 23:56:52 +03:00
|
|
|
master_node_ip = remote_login_settings.remote_login_ip_address
|
2017-04-11 04:48:15 +03:00
|
|
|
master_node_port = remote_login_settings.remote_login_port
|
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
ssh_command = CommandBuilder('ssh')
|
|
|
|
|
|
|
|
ssh_command.add_option("-L", "{0}:localhost:{1}".format(masterui, constants.MASTER_UI_PORT), enable=masterui)
|
|
|
|
ssh_command.add_option("-L", "{0}:localhost:{1}".format(webui, constants.WEBUI_PORT), enable=webui)
|
|
|
|
ssh_command.add_option("-L", "{0}:localhost:{1}".format(jupyter, constants.JUPYTER_PORT), enable=jupyter)
|
2017-04-18 19:13:02 +03:00
|
|
|
if ports is not None:
|
|
|
|
for port in ports:
|
2017-07-14 19:00:47 +03:00
|
|
|
ssh_command.add_option("-L", "{0}:localhost:{1}".format(port[0], port[1]))
|
|
|
|
|
|
|
|
user = username if username is not None else '<username>'
|
|
|
|
ssh_command.add_argument("{0}@{1} -p {2}".format(user, master_node_ip, master_node_port))
|
|
|
|
|
|
|
|
command = ssh_command.to_str()
|
|
|
|
ssh_command_array = command.split()
|
2017-05-19 06:09:48 +03:00
|
|
|
|
2017-07-14 19:00:47 +03:00
|
|
|
if not connect:
|
2017-05-19 06:09:48 +03:00
|
|
|
print('\nuse the following command to connect to your spark head node:')
|
2017-07-14 19:00:47 +03:00
|
|
|
print('\n\t{}\n'.format(command))
|
2017-05-19 06:09:48 +03:00
|
|
|
else:
|
|
|
|
call(ssh_command_array)
|