This commit is contained in:
Timothee Guerin 2017-07-12 12:33:18 -07:00
Родитель 1ee0225f67
Коммит f334de60c7
1 изменённых файлов: 32 добавлений и 107 удалений

Просмотреть файл

@ -3,7 +3,7 @@ from subprocess import call
import azure.batch.models as batch_models import azure.batch.models as batch_models
from . import util, constants, azure_api, upload_node_scripts from . import util, constants, azure_api, upload_node_scripts
pool_admin_user = batch_models.UserIdentity( POOL_ADMIN_USER = batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification( auto_user=batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool, scope=batch_models.AutoUserScope.pool,
elevation_level=batch_models.ElevationLevel.admin)) elevation_level=batch_models.ElevationLevel.admin))
@ -38,87 +38,10 @@ def cluster_install_cmd(zip_resource_file: batch_models.ResourceFile, custom_scr
return ret return ret
def cluster_connect_cmd(): def generate_cluster_start_task(
return [ cluster_id: str,
# set SPARK_HOME environment vars zip_resource_file: batch_models.ResourceFile,
'export SPARK_HOME=/dsvm/tools/spark/current', custom_script: str=None):
'export PATH=$PATH:$SPARK_HOME/bin',
# copy a 'slaves' file from the slaves.template in $SPARK_HOME/conf
'cp $SPARK_HOME/conf/slaves.template $SPARK_HOME/conf/slaves'
# delete existing content & create a new line in the slaves file
'echo > $SPARK_HOME/conf/slaves',
# make empty 'master' file in $SPARK/conf
'cp $SPARK_HOME/conf/slaves $SPARK_HOME/conf/master',
# add batch pool ips to newly created slaves files
'IFS="," read -r -a workerips <<< $AZ_BATCH_HOST_LIST',
'for index in "${!workerips[@]}"',
'do echo "${workerips[index]}"',
'if [ "${AZ_BATCH_MASTER_NODE%:*}" = "${workerips[index]}" ]',
'then echo "${workerips[index]}" >> $SPARK_HOME/conf/master',
'else echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves',
'fi',
'done'
]
def cluster_start_cmd(webui_port, jupyter_port):
return [
# set SPARK_HOME environment vars
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
# get master node ip
'export MASTER_NODE=$(cat $SPARK_HOME/conf/master)',
# kick off start-all spark command (which starts web ui)
'($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)',
# jupyter setup: remove auth
'/anaconda/envs/py35/bin/jupyter notebook --generate-config',
'echo >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo c.NotebookApp.token=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo c.NotebookApp.password=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
# create jupyter kernal for pyspark
'rm -rf /usr/local/share/jupyter/kernels/*',
'mkdir /usr/local/share/jupyter/kernels/pyspark',
'touch /usr/local/share/jupyter/kernels/pyspark/kernel.json',
'echo { ' +
'\\\"display_name\\\": \\\"PySpark\\\", ' +
'\\\"language\\\": \\\"python\\\", ' +
'\\\"argv\\\": [ ' +
'\\\"/usr/bin/python3\\\", ' +
'\\\"-m\\\", ' +
'\\\"ipykernel\\\", ' +
'\\\"-f\\\", ' +
'\\\"{connection_file}\\\" ' +
'], ' +
'\\\"env\\\": { ' +
'\\\"SPARK_HOME\\\": \\\"/dsvm/tools/spark/current\\\", ' +
'\\\"PYSPARK_PYTHON\\\": \\\"/usr/bin/python3\\\", ' +
'\\\"PYSPARK_SUBMIT_ARGS\\\": ' +
'\\\"--master spark://${MASTER_NODE%:*}:7077 ' +
# '--executor-memory 6400M ' +
# '--driver-memory 6400M ' +
'pyspark-shell\\\" ' +
'}' +
'} >> /usr/local/share/jupyter/kernels/pyspark/kernel.json',
# start jupyter notebook
'(PYSPARK_DRIVER_PYTHON=/anaconda/envs/py35/bin/jupyter ' +
'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' +
'pyspark &)' # +
# '--master spark://${MASTER_NODE%:*}:7077 ' +
# '--executor-memory 6400M ' +
# '--driver-memory 6400M &)'
]
def generate_cluster_start_task(cluster_id: str, zip_resource_file: batch_models.ResourceFile, custom_script: str = None):
""" """
This will return the start task object for the pool to be created. This will return the start task object for the pool to be created.
:param custom_script str: Path to a local file to be uploaded to storage and run after spark started. :param custom_script str: Path to a local file to be uploaded to storage and run after spark started.
@ -150,7 +73,7 @@ def generate_cluster_start_task(cluster_id: str, zip_resource_file: batch_models
command_line=util.wrap_commands_in_shell(command), command_line=util.wrap_commands_in_shell(command),
resource_files=resource_files, resource_files=resource_files,
environment_settings=environment_settings, environment_settings=environment_settings,
user_identity=pool_admin_user, user_identity=POOL_ADMIN_USER,
wait_for_success=True) wait_for_success=True)
@ -210,10 +133,10 @@ def create_cluster(
pool_info=batch_models.PoolInformation(pool_id=pool_id)) pool_info=batch_models.PoolInformation(pool_id=pool_id))
# Add job to batch # Add job to batch
batch_client.job.add(job) # TODO batch_client.job.add(job)
# Wait for the app to finish # Wait for the app to finish
if wait == True: if wait:
util.wait_for_master_to_be_ready(pool_id) util.wait_for_master_to_be_ready(pool_id)
if username is not None and password is not None: if username is not None and password is not None:
@ -221,20 +144,21 @@ def create_cluster(
def create_user( def create_user(
pool_id, cluster_id: str,
username, username: str,
password): password: str):
""" """
Create a cluster user Create a cluster user
:param cluster_id: id of the spark cluster
:param username: username of the user to add
:param password: password of the user to add
""" """
batch_client = azure_api.get_batch_client() batch_client = azure_api.get_batch_client()
# TODO wait for pool to be ready?
# Create new ssh user for the master node # Create new ssh user for the master node
batch_client.compute_node.add_user( batch_client.compute_node.add_user(
pool_id, cluster_id,
util.get_master_node_id(pool_id), util.get_master_node_id(cluster_id),
batch_models.ComputeNodeUser( batch_models.ComputeNodeUser(
username, username,
is_admin=True, is_admin=True,
@ -242,16 +166,17 @@ def create_user(
expiry_time=datetime.now() + timedelta(days=365))) expiry_time=datetime.now() + timedelta(days=365)))
def get_cluster_details(pool_id: str): def get_cluster_details(cluster_id: str):
""" """
print out specified cluster info Print the information for the given cluster
:param cluster_id: Id of the cluster
""" """
batch_client = azure_api.get_batch_client() batch_client = azure_api.get_batch_client()
pool = batch_client.pool.get(pool_id) pool = batch_client.pool.get(cluster_id)
if (pool.state == batch_models.PoolState.deleting): if (pool.state == batch_models.PoolState.deleting):
print print("Cluster is being deleted!")
nodes = batch_client.compute_node.list(pool_id=pool_id) nodes = batch_client.compute_node.list(pool_id=cluster_id)
visible_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value visible_state = pool.allocation_state.value if pool.state.value is 'active' else pool.state.value
node_count = '{} -> {}'.format( node_count = '{} -> {}'.format(
pool.current_dedicated_nodes + pool.current_low_priority_nodes, pool.current_dedicated_nodes + pool.current_low_priority_nodes,
@ -275,10 +200,10 @@ def get_cluster_details(pool_id: str):
print(print_format.format(node_label, 'State', 'IP:Port', 'Master')) print(print_format.format(node_label, 'State', 'IP:Port', 'Master'))
print(print_format_underline.format('', '', '', '')) print(print_format_underline.format('', '', '', ''))
master_node = util.get_master_node_id(pool_id) master_node = util.get_master_node_id(cluster_id)
for node in nodes: for node in nodes:
ip, port = util.get_connection_info(pool_id, node.id) ip, port = util.get_connection_info(cluster_id, node.id)
print(print_format.format(node.id, node.state.value, '{}:{}'.format(ip, port), print(print_format.format(node.id, node.state.value, '{}:{}'.format(ip, port),
'*' if node.id == master_node else '')) '*' if node.id == master_node else ''))
print() print()
@ -286,7 +211,7 @@ def get_cluster_details(pool_id: str):
def list_clusters(): def list_clusters():
""" """
print out all clusters List all the cluster on your account.
""" """
batch_client = azure_api.get_batch_client() batch_client = azure_api.get_batch_client()
@ -311,19 +236,19 @@ def list_clusters():
node_count)) node_count))
def delete_cluster(pool_id: str): def delete_cluster(cluster_id: str):
""" """
Delete a spark cluster Delete a spark cluster
""" """
batch_client = azure_api.get_batch_client() batch_client = azure_api.get_batch_client()
# delete pool by id # delete pool by id
pool = batch_client.pool.get(pool_id) pool_id = cluster_id
# job id is equal to pool id # job id is equal to pool id
job_id = pool_id job_id = pool_id
if batch_client.pool.exists(pool_id) == True: if batch_client.pool.exists(pool_id):
batch_client.pool.delete(pool_id) batch_client.pool.delete(pool_id)
batch_client.job.delete(job_id) batch_client.job.delete(job_id)
print('The pool, \'{}\', is being deleted'.format(pool_id)) print('The pool, \'{}\', is being deleted'.format(pool_id))
@ -340,9 +265,9 @@ def ssh(
ports=None, ports=None,
connect=True): connect=True):
""" """
SSH into head node of spark-app SSH into head node of spark-app
:param ports: an list of local and remote ports :param ports: an list of local and remote ports
:type ports: [[<local-port>, <remote-port>]] :type ports: [[<local-port>, <remote-port>]]
""" """
batch_client = azure_api.get_batch_client() batch_client = azure_api.get_batch_client()