aztk/dtde/clusterlib.py

356 строки
12 KiB
Python
Исходник Обычный вид История

2017-05-13 01:23:33 +03:00
from datetime import datetime, timedelta
from subprocess import call
import azure.batch.models as batch_models
from dtde.core import CommandBuilder
from dtde.cli import Software
from . import azure_api, constants, upload_node_scripts, util
POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool,
elevation_level=batch_models.ElevationLevel.admin))
2017-07-10 19:03:34 +03:00
def cluster_install_cmd(zip_resource_file: batch_models.ResourceFile, custom_script_file):
"""
This will return the command line to be run on the start task of the pool to setup spark.
"""
2017-06-15 10:05:54 +03:00
ret = [
2017-06-13 17:46:16 +03:00
# setup spark home and permissions for spark folder
2017-04-14 09:04:12 +03:00
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
'chmod -R 777 $SPARK_HOME',
'chmod -R 777 /usr/local/share/jupyter/kernels',
# To avoid error: "sudo: sorry, you must have a tty to run sudo"
'sed -i -e "s/Defaults requiretty.*/ #Defaults requiretty/g" /etc/sudoers',
'unzip $AZ_BATCH_TASK_WORKING_DIR/{0}'.format(zip_resource_file.file_path),
'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/main.sh',
# Convert windows line ending to unix if applicable
'dos2unix $AZ_BATCH_TASK_WORKING_DIR/main.sh',
'$AZ_BATCH_TASK_WORKING_DIR/main.sh'
2017-06-15 10:05:54 +03:00
]
if custom_script_file is not None:
ret.extend([
'/bin/sh -c {}'.format(custom_script_file),
2017-06-15 10:05:54 +03:00
])
2017-06-13 17:46:16 +03:00
2017-06-15 10:05:54 +03:00
ret.extend(['exit 0'])
2017-06-13 17:46:16 +03:00
2017-06-15 10:05:54 +03:00
return ret
2017-04-14 09:04:12 +03:00
2017-07-10 19:03:34 +03:00
def generate_cluster_start_task(
cluster_id: str,
zip_resource_file: batch_models.ResourceFile,
custom_script: str=None):
"""
This will return the start task object for the pool to be created.
:param cluster_id str: Id of the cluster(Used for uploading the resource files)
:param zip_resource_file: Resource file object pointing to the zip file containing scripts to run on the node
:param custom_script str: Path to a local file to be uploaded to storage and run after spark started.
"""
2017-07-10 19:03:34 +03:00
resource_files = [zip_resource_file]
2017-07-10 19:03:34 +03:00
# Upload custom script file if given
if custom_script is not None:
resource_files.append(
util.upload_file_to_container(
container_name=cluster_id,
file_path=custom_script,
use_full_path=True))
# TODO use certificate
batch_config = azure_api.get_batch_config()
environment_settings = [
batch_models.EnvironmentSetting(
name="BATCH_ACCOUNT_KEY", value=batch_config.account_key),
batch_models.EnvironmentSetting(
name="BATCH_ACCOUNT_URL", value=batch_config.account_url),
]
2017-07-10 19:03:34 +03:00
# start task command
command = cluster_install_cmd(zip_resource_file, custom_script)
return batch_models.StartTask(
command_line=util.wrap_commands_in_shell(command),
resource_files=resource_files,
environment_settings=environment_settings,
user_identity=POOL_ADMIN_USER_IDENTITY,
wait_for_success=True)
2017-07-10 19:03:34 +03:00
2017-04-11 04:48:15 +03:00
def create_cluster(
custom_script: str,
cluster_id: str,
2017-04-11 04:48:15 +03:00
vm_count,
vm_low_pri_count,
2017-04-11 04:48:15 +03:00
vm_size,
username: str,
password: str,
wait=True):
2017-04-11 04:48:15 +03:00
"""
Create a spark cluster
:param custom_script: Path to a custom script to run on all the node of the cluster
:parm cluster_id: Id of the cluster
:param vm_count: Number of node in the cluster
:param vm_low_pri_count: Number of low pri node in the cluster
:param vm_size: Tier of the node(standard_a2, standard_g2, etc.)
:param username: Optional username of user to add to the pool when ready(Need wait to be True)
:param password: Optional password of user to add to the pool when ready(Need wait to be True)
:param wait: If this function should wait for the cluster to be ready(Master and all slave booted)
2017-04-11 04:48:15 +03:00
"""
# Upload start task scripts
zip_resource_file = upload_node_scripts.zip_and_upload()
2017-07-07 00:25:44 +03:00
batch_client = azure_api.get_batch_client()
2017-04-11 04:48:15 +03:00
# vm image
2017-06-15 10:05:54 +03:00
publisher = 'microsoft-ads'
offer = 'linux-data-science-vm'
sku = 'linuxdsvm'
2017-04-11 04:48:15 +03:00
# reuse pool_id as job_id
pool_id = cluster_id
job_id = cluster_id
2017-04-11 04:48:15 +03:00
# Get a verified node agent sku
sku_to_use, image_ref_to_use = \
util.select_latest_verified_vm_image_with_node_agent_sku(
2017-07-07 00:25:44 +03:00
publisher, offer, sku)
2017-04-11 04:48:15 +03:00
# Confiure the pool
pool = batch_models.PoolAddParameter(
id=pool_id,
virtual_machine_configuration=batch_models.VirtualMachineConfiguration(
image_reference=image_ref_to_use,
node_agent_sku_id=sku_to_use),
vm_size=vm_size,
target_dedicated_nodes=vm_count,
target_low_priority_nodes=vm_low_pri_count,
start_task=generate_cluster_start_task(
pool_id, zip_resource_file, custom_script),
enable_inter_node_communication=True,
max_tasks_per_node=1,
metadata=[
batch_models.MetadataItem(name=constants.AZB_SOFTWARE_METADATA_KEY, value=Software.spark),
])
2017-04-11 04:48:15 +03:00
2017-04-17 02:47:18 +03:00
# Create the pool + create user for the pool
util.create_pool_if_not_exist(
pool,
2017-04-17 02:47:18 +03:00
wait)
2017-04-11 04:48:15 +03:00
# Create job
2017-04-13 06:52:19 +03:00
job = batch_models.JobAddParameter(
id=job_id,
pool_info=batch_models.PoolInformation(pool_id=pool_id))
2017-04-13 06:52:19 +03:00
# Add job to batch
2017-07-12 22:33:18 +03:00
batch_client.job.add(job)
# Wait for the app to finish
if wait:
util.wait_for_master_to_be_ready(pool_id)
if username is not None and password is not None:
2017-07-07 00:25:44 +03:00
create_user(pool_id, username, password)
2017-04-17 02:47:18 +03:00
def create_user(
cluster_id: str,
username: str,
password: str):
2017-04-17 02:47:18 +03:00
"""
Create a cluster user
:param cluster_id: id of the spark cluster
:param username: username of the user to add
:param password: password of the user to add
2017-04-17 02:47:18 +03:00
"""
2017-07-07 00:25:44 +03:00
batch_client = azure_api.get_batch_client()
2017-07-10 19:03:34 +03:00
2017-04-17 02:47:18 +03:00
# Create new ssh user for the master node
batch_client.compute_node.add_user(
cluster_id,
util.get_master_node_id(cluster_id),
2017-04-17 02:47:18 +03:00
batch_models.ComputeNodeUser(
username,
is_admin=True,
password=password,
expiry_time=datetime.now() + timedelta(days=365)))
2017-04-17 02:47:18 +03:00
def get_cluster_details(cluster_id: str):
2017-06-15 10:05:54 +03:00
"""
Print the information for the given cluster
:param cluster_id: Id of the cluster
2017-06-15 10:05:54 +03:00
"""
2017-07-07 00:25:44 +03:00
batch_client = azure_api.get_batch_client()
pool = batch_client.pool.get(cluster_id)
if pool.state is batch_models.PoolState.deleting:
print("Cluster is being deleted!")
return
nodes = batch_client.compute_node.list(pool_id=cluster_id)
2017-06-15 10:05:54 +03:00
visible_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value
node_count = '{} -> {}'.format(
pool.current_dedicated_nodes + pool.current_low_priority_nodes,
pool.target_dedicated_nodes + pool.target_low_priority_nodes) if pool.state.value is 'resizing' or (pool.state.value is 'deleting' and pool.allocation_state.value is 'resizing') else '{}'.format(pool.current_dedicated_nodes)
2017-06-15 10:05:54 +03:00
print()
print('State: {}'.format(visible_state))
print('Node Size: {}'.format(pool.vm_size))
print('Nodes: {}'.format(node_count))
print('| Dedicated: {}'.format(pool.current_dedicated_nodes))
print('| Low priority: {}'.format(pool.current_low_priority_nodes))
2017-04-16 00:05:05 +03:00
print()
2017-06-15 10:05:54 +03:00
node_label = 'Nodes'
print_format = '{:<36}| {:<15} | {:<21}| {:<8}'
print_format_underline = '{:-<36}|{:-<17}|{:-<22}|{:-<8}'
2017-04-16 00:05:05 +03:00
print(print_format.format(node_label, 'State', 'IP:Port', 'Master'))
print(print_format_underline.format('', '', '', ''))
master_node = util.get_master_node_id_from_pool(pool)
2017-04-16 00:05:05 +03:00
for node in nodes:
ip, port = util.get_connection_info(cluster_id, node.id)
print(print_format.format(node.id, node.state.value, '{}:{}'.format(ip, port),
'*' if node.id == master_node else ''))
2017-04-16 00:05:05 +03:00
print()
def is_pool_running_spark(pool: batch_models.CloudPool):
if pool.metadata is None:
return False
for metadata in pool.metadata:
if metadata.name == constants.AZB_SOFTWARE_METADATA_KEY:
return metadata.value == Software.spark
return False
2017-07-07 00:25:44 +03:00
def list_clusters():
2017-06-15 10:05:54 +03:00
"""
List all the cluster on your account.
2017-06-15 10:05:54 +03:00
"""
2017-07-07 00:25:44 +03:00
batch_client = azure_api.get_batch_client()
print_format = '{:<34}| {:<10}| {:<20}| {:<7}'
print_format_underline = '{:-<34}|{:-<11}|{:-<21}|{:-<7}'
2017-04-16 00:05:05 +03:00
pools = batch_client.pool.list()
print(print_format.format('Cluster', 'State', 'VM Size', 'Nodes'))
print(print_format_underline.format('', '', '', ''))
2017-04-16 00:05:05 +03:00
for pool in pools:
if not is_pool_running_spark(pool):
continue
2017-06-15 10:05:54 +03:00
pool_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value
target_nodes = util.get_cluster_total_target_nodes(pool)
current_nodes = util.get_cluster_total_current_nodes(pool)
node_count = current_nodes
2017-06-15 10:05:54 +03:00
if pool_state is 'resizing' or (pool_state is 'deleting' and pool.allocation_state.value is 'resizing'):
node_count = '{} -> {}'.format(current_nodes, target_nodes)
2017-04-13 06:52:19 +03:00
print(print_format.format(pool.id,
pool_state,
pool.vm_size,
node_count))
def delete_cluster(cluster_id: str):
2017-04-11 04:48:15 +03:00
"""
Delete a spark cluster
:param cluster_id: Id of the cluster to delete
2017-04-11 04:48:15 +03:00
"""
2017-07-07 00:25:44 +03:00
batch_client = azure_api.get_batch_client()
2017-04-11 04:48:15 +03:00
# delete pool by id
pool_id = cluster_id
2017-04-11 04:48:15 +03:00
2017-04-13 06:52:19 +03:00
# job id is equal to pool id
job_id = pool_id
job_exists = True
2017-04-13 06:52:19 +03:00
try:
batch_client.job.get(job_id)
except:
job_exists = False
pool_exists = batch_client.pool.exists(pool_id)
if job_exists:
batch_client.job.delete(job_id)
2017-07-13 23:56:52 +03:00
if pool_exists:
batch_client.pool.delete(pool_id)
if job_exists or pool_exists:
print("Deleting cluster {0}".format(cluster_id))
class ClusterNotReadyError(Exception):
pass
def ssh(
cluster_id: str,
username: str=None,
masterui: str=None,
webui: str=None,
jupyter: str=None,
ports=None,
connect: bool=True):
2017-04-13 10:34:19 +03:00
"""
SSH into head node of spark-app
:param cluster_id: Id of the cluster to ssh in
:param username: Username to use to ssh
:param masterui: Port for the master ui(Local port)
:param webui: Port for the spark web ui(Local port)
:param jupyter: Port for jupyter(Local port)
:param ports: an list of local and remote ports
:type ports: [[<local-port>, <remote-port>]]
2017-04-13 10:34:19 +03:00
"""
2017-07-07 00:25:44 +03:00
batch_client = azure_api.get_batch_client()
2017-04-13 10:34:19 +03:00
# Get master node id from task (job and task are both named pool_id)
master_node_id = util.get_master_node_id(cluster_id)
2017-04-11 04:48:15 +03:00
if master_node_id is None:
raise ClusterNotReadyError("Master node has not yet been picked!")
2017-04-11 04:48:15 +03:00
# get remote login settings for the user
remote_login_settings = batch_client.compute_node.get_remote_login_settings(
cluster_id, master_node_id)
2017-04-11 04:48:15 +03:00
2017-07-13 23:56:52 +03:00
master_node_ip = remote_login_settings.remote_login_ip_address
2017-04-11 04:48:15 +03:00
master_node_port = remote_login_settings.remote_login_port
ssh_command = CommandBuilder('ssh')
2017-07-21 18:52:28 +03:00
ssh_command.add_option("-L", "{0}:localhost:{1}".format(masterui, constants.SPARK_MASTER_UI_PORT), enable=bool(masterui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format(webui, constants.SPARK_WEBUI_PORT), enable=bool(webui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format(jupyter, constants.SPARK_JUPYTER_PORT), enable=bool(jupyter))
if ports is not None:
for port in ports:
ssh_command.add_option(
"-L", "{0}:localhost:{1}".format(port[0], port[1]))
user = username if username is not None else '<username>'
ssh_command.add_argument("{0}@{1} -p {2}".format(user, master_node_ip, master_node_port))
command = ssh_command.to_str()
ssh_command_array = command.split()
if not connect:
print('\nuse the following command to connect to your spark head node:')
print('\n\t{}\n'.format(command))
else:
call(ssh_command_array)