From 6ea20a76ba89a632473e8acbe58e05c33dd04bf7 Mon Sep 17 00:00:00 2001 From: JS Tan Date: Mon, 17 Apr 2017 05:10:41 +0000 Subject: [PATCH] Merged PR 2: Merge ssh-update to master ## ssh update: enabled jupyter, attached user to the cluster lvl (as opposed to job level) ## features: * new cli cmd: spark-cluster-create-user (create user is needed because we cannot automatically create a user on pool creation since we don't necessarily wait for cluster creation to finish) * new cli cmd: spark-cluster-jupyter * new cli cmd: spark-cluster-webui * spark-cluster-ssh does not tunnel any ports by default --- bin/spark-cluster-create | 12 +- ...-app-jupyter => spark-cluster-create-user} | 14 +- bin/spark-cluster-jupyter | 78 ++++++ bin/spark-cluster-ssh | 22 +- bin/spark-cluster-webui | 78 ++++++ redbull/sparklib.py | 231 ++++++------------ redbull/util.py | 3 +- setup.py | 6 +- 8 files changed, 246 insertions(+), 198 deletions(-) rename bin/{spark-app-jupyter => spark-cluster-create-user} (91%) create mode 100755 bin/spark-cluster-jupyter create mode 100755 bin/spark-cluster-webui diff --git a/bin/spark-cluster-create b/bin/spark-cluster-create index 05fc17d6..dafc2013 100755 --- a/bin/spark-cluster-create +++ b/bin/spark-cluster-create @@ -25,6 +25,7 @@ if __name__ == '__main__': _pool_id = None _vm_count = None _vm_size = None + _wait = True # parse arguments @@ -45,21 +46,20 @@ if __name__ == '__main__': print() if args.cluster_id is not None: _pool_id = args.cluster_id - print("spark cluster id: %s" % _pool_id) + print("spark cluster id: %s" % _pool_id) if args.cluster_size is not None: _vm_count = args.cluster_size - print("spark cluster size: %i" % _vm_count) + print("spark cluster size: %i" % _vm_count) if args.cluster_vm_size is not None: _vm_size = args.cluster_vm_size - print("spark cluster vm size: %s" % _vm_size) - + print("spark cluster vm size: %s" % _vm_size) + if args.wait is not None: if args.wait == False: _wait = False - print("wait for cluster: %r" % _wait) - + print("wait for cluster: %r" % _wait) # Read config file global_config = configparser.ConfigParser() diff --git a/bin/spark-app-jupyter b/bin/spark-cluster-create-user similarity index 91% rename from bin/spark-app-jupyter rename to bin/spark-cluster-create-user index 6de00342..1d2df9fc 100755 --- a/bin/spark-app-jupyter +++ b/bin/spark-cluster-create-user @@ -50,7 +50,7 @@ if __name__ == '__main__': if args.password is not None: _password = args.password print("az_spark password: %s" % _password) - + # Read config file global_config = configparser.ConfigParser() global_config.read(_config_path) @@ -73,16 +73,12 @@ if __name__ == '__main__': # Set retry policy batch_client.config.retry_policy.retries = 5 - # set app_id to jupyter - app_id = "jupyter-" + _pool_id - # get ssh command - sparklib.jupyter( + sparklib.create_user( batch_client, - pool_id = _pool_id, - app_id = app_id, - username = _username, - password = _password) + _pool_id, + _username, + _password) diff --git a/bin/spark-cluster-jupyter b/bin/spark-cluster-jupyter new file mode 100755 index 00000000..83da3293 --- /dev/null +++ b/bin/spark-cluster-jupyter @@ -0,0 +1,78 @@ +#!/usr/bin/env python + +from redbull import sparklib + +try: + import configparser +except ImportError: + import ConfigParser as configparser + +import os +import datetime +import random +import argparse + +import azure.batch.batch_service_client as batch +import azure.batch.batch_auth as batch_auth +import azure.batch.models as batch_models +import azure.storage.blob as blob + +# config file path +_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') + +if __name__ == '__main__': + + _pool_id = None + _local_port = 7777 + + # parse arguments + parser = argparse.ArgumentParser(prog="az_spark") + + parser.add_argument("--cluster-id", required=True, + help="the unique name of your spark cluster") + parser.add_argument("--local-port", + help="the port you want your jupyter notebook session mapped to") + + args = parser.parse_args() + + print() + if args.cluster_id is not None: + _pool_id = args.cluster_id + print("spark cluster id: %s" % _pool_id) + + if args.local_port is not None: + _local_port = args.local_port + print("local port: %i" % _local_port) + + # Read config file + global_config = configparser.ConfigParser() + global_config.read(_config_path) + + # Set up the configuration + batch_account_key = global_config.get('Batch', 'batchaccountkey') + batch_account_name = global_config.get('Batch', 'batchaccountname') + batch_service_url = global_config.get('Batch', 'batchserviceurl') + + # Set up SharedKeyCredentials + credentials = batch_auth.SharedKeyCredentials( + batch_account_name, + batch_account_key) + + # Set up Batch Client + batch_client = batch.BatchServiceClient( + credentials, + base_url=batch_service_url) + + # Set retry policy + batch_client.config.retry_policy.retries = 5 + + # get ssh command + sparklib.jupyter( + batch_client, + pool_id = _pool_id, + local_port = _local_port) + + + + + diff --git a/bin/spark-cluster-ssh b/bin/spark-cluster-ssh index 7fe6dd83..05d73861 100755 --- a/bin/spark-cluster-ssh +++ b/bin/spark-cluster-ssh @@ -23,20 +23,11 @@ _config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') if __name__ == '__main__': _pool_id = None - _app_id = None - _username = 'admin' - _password = 'pass123!' - # parse arguments parser = argparse.ArgumentParser(prog="az_spark") parser.add_argument("--cluster-id", required=True, help="the unique name of your spark cluster") - parser.add_argument("-u", "--user", - help="the relative path to your spark app in your directory") - parser.add_argument("-p", "--password", - help="the relative path to your spark app in your directory") - args = parser.parse_args() print() @@ -44,14 +35,6 @@ if __name__ == '__main__': _pool_id = args.cluster_id print("spark cluster id: %s" % _pool_id) - if args.user is not None: - _username = args.user - print("az_spark username: %s" % _username) - - if args.password is not None: - _password = args.password - print("az_spark password: %s" % _password) - # Read config file global_config = configparser.ConfigParser() global_config.read(_config_path) @@ -77,10 +60,7 @@ if __name__ == '__main__': # get ssh command sparklib.ssh( batch_client, - pool_id = _pool_id, - username = _username, - password = _password, - ports = [8082]) + pool_id = _pool_id) diff --git a/bin/spark-cluster-webui b/bin/spark-cluster-webui new file mode 100755 index 00000000..54af921b --- /dev/null +++ b/bin/spark-cluster-webui @@ -0,0 +1,78 @@ +#!/usr/bin/env python + +from redbull import sparklib + +try: + import configparser +except ImportError: + import ConfigParser as configparser + +import os +import datetime +import random +import argparse + +import azure.batch.batch_service_client as batch +import azure.batch.batch_auth as batch_auth +import azure.batch.models as batch_models +import azure.storage.blob as blob + +# config file path +_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') + +if __name__ == '__main__': + + _pool_id = None + _local_port = 8080 + + # parse arguments + parser = argparse.ArgumentParser(prog="az_spark") + + parser.add_argument("--cluster-id", required=True, + help="the unique name of your spark cluster") + parser.add_argument("--local-port", + help="the port you want your spark webui session mapped to") + + args = parser.parse_args() + + print() + if args.cluster_id is not None: + _pool_id = args.cluster_id + print("spark cluster id: %s" % _pool_id) + + if args.local_port is not None: + _local_port = args.local_port + print("local port: %i" % _local_port) + + # Read config file + global_config = configparser.ConfigParser() + global_config.read(_config_path) + + # Set up the configuration + batch_account_key = global_config.get('Batch', 'batchaccountkey') + batch_account_name = global_config.get('Batch', 'batchaccountname') + batch_service_url = global_config.get('Batch', 'batchserviceurl') + + # Set up SharedKeyCredentials + credentials = batch_auth.SharedKeyCredentials( + batch_account_name, + batch_account_key) + + # Set up Batch Client + batch_client = batch.BatchServiceClient( + credentials, + base_url=batch_service_url) + + # Set retry policy + batch_client.config.retry_policy.retries = 5 + + # get ssh command + sparklib.webui( + batch_client, + pool_id = _pool_id, + local_port = _local_port) + + + + + diff --git a/redbull/sparklib.py b/redbull/sparklib.py index dfd8c405..24d3384f 100644 --- a/redbull/sparklib.py +++ b/redbull/sparklib.py @@ -9,9 +9,6 @@ _WEBUI_PORT = 8082 _JUPYTER_PORT = 7777 def cluster_install_cmd(): - ''' - this command is run-elevated - ''' return [ 'export SPARK_HOME=/dsvm/tools/spark/current', 'export PATH=$PATH:$SPARK_HOME/bin', @@ -47,10 +44,9 @@ def cluster_connect_cmd(): 'cp $SPARK_HOME/conf/slaves $SPARK_HOME/conf/master', # add batch pool ips to newly created slaves files - # make file name master 'IFS="," read -r -a workerips <<< $AZ_BATCH_HOST_LIST', 'for index in "${!workerips[@]}"', - 'do echo "{workerips[index]}"', + 'do echo "${workerips[index]}"', 'if [ "${AZ_BATCH_MASTER_NODE%:*}" = "${workerips[index]}" ]', 'then echo "${workerips[index]}" >> $SPARK_HOME/conf/master', 'else echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves', @@ -58,14 +54,31 @@ def cluster_connect_cmd(): 'done' ] -def cluster_start_cmd(webui_port): +def cluster_start_cmd(webui_port, jupyter_port): return [ # set SPARK_HOME environment vars 'export SPARK_HOME=/dsvm/tools/spark/current', 'export PATH=$PATH:$SPARK_HOME/bin', - # kick off start-all spark command as a bg process + # get master node ip + 'export MASTER_NODE=$(cat $SPARK_HOME/conf/master)', + + # kick off start-all spark command (which starts web ui) '($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)', + + # jupyter setup: remove auth + '/anaconda/envs/py35/bin/jupyter notebook --generate-config', + 'echo >> $HOME/.jupyter/jupyter_notebook_config.py', + 'echo c.NotebookApp.token=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py', + 'echo c.NotebookApp.password=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py', + + # start jupyter notebook + '(PYSPARK_DRIVER_PYTHON=/anaconda/envs/py35/bin/jupyter ' + + 'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' + + 'pyspark ' + + '--master spark://${MASTER_NODE%:*}:7077 ' + + '--executor-memory 6400M ' + + '--driver-memory 6400M &)' ] def app_submit_cmd(webui_port, app_file_name): @@ -87,32 +100,6 @@ def app_submit_cmd(webui_port, app_file_name): '$AZ_BATCH_TASK_WORKING_DIR/' + app_file_name ] -# TODO not working -def jupyter_cmd(webui_port, jupyter_port): - return [ - # set SPARK_HOME environment vars - 'export SPARK_HOME=/dsvm/tools/spark/current', - 'export PATH=$PATH:$SPARK_HOME/bin', - - # kick off start-all spark command as a bg process - '($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)', - - # jupyter setup: remove auth - 'mkdir $HOME/.jupyter', - 'touch $HOME/.jupyter/jupyter_notebook_config.py', - 'echo >> $HOME/.jupyter/jupyter_notebook_config.py', - 'echo "c.NotebookApp.token=\'\'" >> $HOME/.jupyter/jupyter_notebook_config.py', - 'echo "c.NotebookApp.password=\'\'" >> $HOME/.jupyter/jupyter_notebook_config.py', - - # start jupyter notebook - 'PYSPARK_DRIVER_PYTHON=jupyter ' + - 'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' + - 'pyspark ' + - '--master spark://${AZ_BATCH_MASTER_NODE%:*}:7077 ' + - '--executor-memory 6400M ' + - '--driver-memory 6400M' - ] - def create_cluster( batch_client, pool_id, @@ -121,17 +108,6 @@ def create_cluster( wait = True): """ Create a spark cluster - - :param batch_client: the batch client to use - :type batch_client: 'batchserviceclient.BatchServiceClient' - :param pool_id: The id of the pool to create - :type pool_id: string - :param vm_count: the number of nodes in the pool - :type vm_count: int - :param vm_size: The vm size to use - :type vm_size: string - :param wait: whether or not to wait for pool creation to compelete - :type wait: boolean """ # vm image @@ -165,8 +141,11 @@ def create_cluster( enable_inter_node_communication = True, max_tasks_per_node = 1) - # Create the pool - util.create_pool_if_not_exist(batch_client, pool, wait=wait) + # Create the pool + create user for the pool + util.create_pool_if_not_exist( + batch_client, + pool, + wait) # Create job job = batch_models.JobAddParameter( @@ -178,7 +157,7 @@ def create_cluster( # create application/coordination commands coordination_cmd = cluster_connect_cmd() - application_cmd = cluster_start_cmd(_WEBUI_PORT) + application_cmd = cluster_start_cmd(_WEBUI_PORT, _JUPYTER_PORT) # reuse pool_id as multi-instance task id task_id = pool_id @@ -204,6 +183,29 @@ def create_cluster( job_id, datetime.timedelta(minutes=60)) +def create_user( + batch_client, + pool_id, + username, + password): + """ + Create a cluster user + """ + + # Get master node id from task (job and task are both named pool_id) + master_node_id = batch_client.task \ + .get(job_id=pool_id, task_id=pool_id) \ + .node_info.node_id + + # Create new ssh user for the master node + batch_client.compute_node.add_user( + pool_id, + master_node_id, + batch_models.ComputeNodeUser( + username, + is_admin = True, + password = password)) + def get_cluster_details( batch_client, @@ -275,11 +277,6 @@ def delete_cluster( pool_id): """ Delete a spark cluster - - :param batch_client: the batch client to use - :type batch_client: 'batchserviceclient.BatchServiceClient' - :param pool_id: The id of the pool to create - :type pool_id: string """ # delete pool by id pool = batch_client.pool.get(pool_id) @@ -306,19 +303,6 @@ def submit_app( """ Submit a spark app - - :param batch_client: the batch client to use - :type batch_client: 'batchserviceclient.BatchServiceClient' - :param block_blob_client: A blob service client. - :type block_blob_client: `azure.storage.blob.BlockBlobService` - :param pool_id: The id of the pool to submit app to - :type pool_id: string - :param app_id: The id of the spark app (corresponds to batch task) - :type app_id: string - :param app_file_path: The path of the spark app to run - :type app_file_path: string - :param app_file_name: The name of the spark app file to run - :type app_file_name: string """ # Upload app resource files to blob storage @@ -355,23 +339,12 @@ def submit_app( def ssh( batch_client, pool_id, - username, - password, ports = None): """ SSH into head node of spark-app - - :param batch_client: the batch client to use - :type batch_client: 'batchserviceclient.BatchServiceClient' - :param pool_id: The id of the pool to submit app to - :type pool_id: string - :param username: The username to access the head node via ssh - :type username: string - :param password: The password to access the head node via ssh - :type password: string - :param ports: A list of ports to open tunnels to - :type ports: [] + :param ports: an list of local and remote ports + :type ports: [[, ]] """ # Get master node id from task (job and task are both named pool_id) @@ -379,18 +352,6 @@ def ssh( .get(job_id=pool_id, task_id=pool_id) \ .node_info.node_id - # Create new ssh user for the master node - batch_client.compute_node.add_user( - pool_id, - master_node_id, - batch_models.ComputeNodeUser( - username, - is_admin = True, - password = password)) - - print('\nuser "{}" added to node "{}"in pool "{}"'.format( - username, master_node_id, pool_id)) - # get remote login settings for the user remote_login_settings = batch_client.compute_node.get_remote_login_settings( pool_id, master_node_id) @@ -401,88 +362,40 @@ def ssh( # build ssh tunnel command ssh_command = "ssh " for port in ports: - ssh_command += "-L " + str(port) + ":localhost:" + str(port) + " " - ssh_command += username + "@" + str(master_node_ip) + " -p " + str(master_node_port) + ssh_command += "-L " + str(port[0]) + ":localhost:" + str(port[1]) + " " + ssh_command += "@" + str(master_node_ip) + " -p " + str(master_node_port) print('\nuse the following command to connect to your spark head node:') print() print('\t%s' % ssh_command) print() +def jupyter( + batch_client, + pool_id, + local_port): + """ + SSH tunnel for Jupyter + """ + ssh(batch_client, pool_id, [[local_port, _JUPYTER_PORT]]) + +def webui( + batch_client, + pool_id, + local_port): + """ + SSH tunnel for spark web-ui + """ + ssh(batch_client, pool_id, [[local_port, _WEBUI_PORT]]) + def list_apps( batch_client, pool_id): """ List all spark apps for a given cluster - - :param batch_client: the batch client to use - :type batch_client: 'batchserviceclient.BatchServiceClient' - :param pool_id: The id of the pool to submit app to - :type pool_id: string """ apps = batch_client.task.list(job_id=pool_id) #TODO actually print print(apps) -# TODO not working -def jupyter( - batch_client, - pool_id, - app_id, - username, - password): - """ - Install jupyter, create app_id and open ssh tunnel - - :param batch_client: the batch client to use - :type batch_client: 'batchserviceclient.BatchServiceClient' - :param pool_id: The id of the pool to submit app to - :type pool_id: string - :param username: The username to access the head node via ssh - :type username: string - :param password: The password to access the head node via ssh - :type password: string - - """ - - # create application/coordination commands - coordination_cmd = cluster_connect_cmd() - application_cmd = jupyter_cmd(_WEBUI_PORT, _JUPYTER_PORT) - - # Get pool size - pool = batch_client.pool.get(pool_id) - pool_size = pool.target_dedicated - - # Create multi-instance task - task = batch_models.TaskAddParameter( - id = app_id, - command_line = util.wrap_commands_in_shell(application_cmd), - resource_files = [], - run_elevated = False, - multi_instance_settings = batch_models.MultiInstanceSettings( - number_of_instances = pool_size, - coordination_command_line = util.wrap_commands_in_shell(coordination_cmd), - common_resource_files = [])) - - # Add task to batch job (which has the same name as pool_id) - job_id = pool_id - batch_client.task.add(job_id = job_id, task = task) - - # get job id (job id is the same as pool id) - job_id = pool_id - - # Wait for the app to finish - util.wait_for_tasks_to_complete( - batch_client, - job_id, - datetime.timedelta(minutes=60)) - - # print ssh command - ssh_app( - batch_client, - pool_id, - app_id, - username, - password, - ports = [_JUPYTER_PORT, _WEBUI_PORT]) - + diff --git a/redbull/util.py b/redbull/util.py index 15f4fbcc..41c79388 100644 --- a/redbull/util.py +++ b/redbull/util.py @@ -128,6 +128,7 @@ def wait_for_all_nodes_state(batch_client, pool, node_state): 'resize error encountered for pool {}: {!r}'.format( pool.id, pool.resize_error)) nodes = list(batch_client.compute_node.list(pool.id)) + if (len(nodes) >= pool.target_dedicated and all(node.state in node_state for node in nodes)): return nodes @@ -248,4 +249,4 @@ def get_connection_info(batch_client, pool_id, node_id): pool_id, node_id) remote_ip = rls.remote_login_ip_address ssh_port = str(rls.remote_login_port) - return (remote_ip, ssh_port) \ No newline at end of file + return (remote_ip, ssh_port) diff --git a/setup.py b/setup.py index 96d05389..533d8b4b 100644 --- a/setup.py +++ b/setup.py @@ -10,10 +10,12 @@ setup(name='redbull', packages=['redbull'], scripts=['bin/spark-cluster-create', 'bin/spark-cluster-delete', + 'bin/spark-cluster-create-user', 'bin/spark-cluster-ssh', + 'bin/spark-cluster-jupyter', + 'bin/spark-cluster-webui', 'bin/spark-cluster-get', 'bin/spark-cluster-list', 'bin/spark-app-submit', - 'bin/spark-app-list', - 'bin/spark-app-jupyter'], + 'bin/spark-app-list'] zip_safe=False)