diff --git a/bin/spark-app-jupyter b/bin/spark-app-jupyter new file mode 100755 index 00000000..6de00342 --- /dev/null +++ b/bin/spark-app-jupyter @@ -0,0 +1,98 @@ +#!/usr/bin/env python + +from redbull import sparklib + +try: + import configparser +except ImportError: + import ConfigParser as configparser + +import os +import datetime +import random +import argparse + +import azure.batch.batch_service_client as batch +import azure.batch.batch_auth as batch_auth +import azure.batch.models as batch_models +import azure.storage.blob as blob + +# config file path +_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') + +if __name__ == '__main__': + + _pool_id = None + _username = 'admin' + _password = 'pass123!' + + # parse arguments + parser = argparse.ArgumentParser(prog="az_spark") + + parser.add_argument("--cluster-id", required=True, + help="the unique name of your spark cluster") + parser.add_argument("-u", "--user", + help="the relative path to your spark app in your directory") + parser.add_argument("-p", "--password", + help="the relative path to your spark app in your directory") + + args = parser.parse_args() + + print() + if args.cluster_id is not None: + _pool_id = args.cluster_id + print("spark cluster id: %s" % _pool_id) + + if args.user is not None: + _username = args.user + print("az_spark username: %s" % _username) + + if args.password is not None: + _password = args.password + print("az_spark password: %s" % _password) + + # Read config file + global_config = configparser.ConfigParser() + global_config.read(_config_path) + + # Set up the configuration + batch_account_key = global_config.get('Batch', 'batchaccountkey') + batch_account_name = global_config.get('Batch', 'batchaccountname') + batch_service_url = global_config.get('Batch', 'batchserviceurl') + + # Set up SharedKeyCredentials + credentials = batch_auth.SharedKeyCredentials( + batch_account_name, + batch_account_key) + + # Set up Batch Client + batch_client = batch.BatchServiceClient( + credentials, + base_url=batch_service_url) + + # Set retry policy + batch_client.config.retry_policy.retries = 5 + + # set app_id to jupyter + app_id = "jupyter-" + _pool_id + + # get ssh command + sparklib.jupyter( + batch_client, + pool_id = _pool_id, + app_id = app_id, + username = _username, + password = _password) + + + + + + + + + + + + + diff --git a/bin/spark-app-list b/bin/spark-app-list new file mode 100755 index 00000000..dd15fd66 --- /dev/null +++ b/bin/spark-app-list @@ -0,0 +1,71 @@ +#!/usr/bin/env python + +from redbull import sparklib + +try: + import configparser +except ImportError: + import ConfigParser as configparser + +import os +import datetime +import random +import argparse + +import azure.batch.batch_service_client as batch +import azure.batch.batch_auth as batch_auth +import azure.batch.models as batch_models +import azure.storage.blob as blob + +# config file path +_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg') + +if __name__ == '__main__': + + _pool_id = None + + # parse arguments + parser = argparse.ArgumentParser(prog="az_spark") + + parser.add_argument("--cluster-id", required=True, + help="the unique name of your spark cluster") + + args = parser.parse_args() + + print() + if args.cluster_id is not None: + _pool_id = args.cluster_id + print("spark cluster id: %s" % _pool_id) + + # Read config file + global_config = configparser.ConfigParser() + global_config.read(_config_path) + + # Set up the configuration + batch_account_key = global_config.get('Batch', 'batchaccountkey') + batch_account_name = global_config.get('Batch', 'batchaccountname') + batch_service_url = global_config.get('Batch', 'batchserviceurl') + + # Set up SharedKeyCredentials + credentials = batch_auth.SharedKeyCredentials( + batch_account_name, + batch_account_key) + + # Set up Batch Client + batch_client = batch.BatchServiceClient( + credentials, + base_url=batch_service_url) + + # Set retry policy + batch_client.config.retry_policy.retries = 5 + + # get ssh command + sparklib.list_apps( + batch_client, + pool_id = _pool_id) + + + + + + diff --git a/bin/spark-app-ssh b/bin/spark-app-ssh index 81727399..4f2b6167 100755 --- a/bin/spark-app-ssh +++ b/bin/spark-app-ssh @@ -87,6 +87,7 @@ if __name__ == '__main__': app_id = _app_id, username = _username, password = _password) + ports = [8888]) diff --git a/redbull/sparklib.py b/redbull/sparklib.py index 3a668b0d..61053163 100644 --- a/redbull/sparklib.py +++ b/redbull/sparklib.py @@ -5,6 +5,93 @@ import datetime import azure.batch.models as batch_models +_WEBUI_PORT = 8082 +_JUPYTER_PORT = 7777 + +def install_cmd(): + ''' + this command is run-elevated + ''' + return [ + 'export SPARK_HOME=/dsvm/tools/spark/current', + 'export PATH=$PATH:$SPARK_HOME/bin', + 'chmod -R 777 $SPARK_HOME', + 'exit 0' + ] + +def connect_cmd(): + return [ + # print env vars for debug + 'echo CCP_NODES:', + 'echo $CCP_NODES', + 'echo AZ_BATCH_NODE_LIST:', + 'echo $AZ_BATCH_NODE_LIST', + 'echo AZ_BATCH_HOST_LIST:', + 'echo $AZ_BATCH_HOST_LIST', + 'echo AZ_BATCH_MASTER_NODE:', + 'echo $AZ_BATCH_MASTER_NODE', + 'echo AZ_BATCH_IS_CURRENT_NODE_MASTER:', + 'echo $AZ_BATCH_IS_CURRENT_NODE_MASTER', + + # set SPARK_HOME environment vars + 'export SPARK_HOME=/dsvm/tools/spark/current', + 'export PATH=$PATH:$SPARK_HOME/bin', + + # copy a 'slaves' file from the slaves.template in $SPARK_HOME/conf + 'cp $SPARK_HOME/conf/slaves.template $SPARK_HOME/conf/slaves' + + # delete existing content & create a new line in the slaves file + 'echo > $SPARK_HOME/conf/slaves', + + # add batch pool ips to newly created slaves files + 'IFS="," read -r -a workerips <<< $AZ_BATCH_HOST_LIST', + 'for index in "${!workerips[@]}"', + 'do echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves', # TODO unless node is master + 'echo "${workerips[index]}"', + 'done' + ] + +def custom_app_cmd(webui_port, app_file_name): + return [ + # set SPARK_HOME environment vars + 'export SPARK_HOME=/dsvm/tools/spark/current', + 'export PATH=$PATH:$SPARK_HOME/bin', + + # kick off start-all spark command as a bg process + '($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)', + + # execute spark-submit on the specified app + '$SPARK_HOME/bin/spark-submit ' + + '--master spark://${AZ_BATCH_MASTER_NODE%:*}:7077 ' + + '$AZ_BATCH_TASK_WORKING_DIR/' + app_file_name + ] + +# TODO not working +def jupyter_cmd(webui_port, jupyter_port): + return [ + # set SPARK_HOME environment vars + 'export SPARK_HOME=/dsvm/tools/spark/current', + 'export PATH=$PATH:$SPARK_HOME/bin', + + # kick off start-all spark command as a bg process + '($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)', + + # jupyter setup: remove auth + 'mkdir $HOME/.jupyter', + 'touch $HOME/.jupyter/jupyter_notebook_config.py', + 'echo >> $HOME/.jupyter/jupyter_notebook_config.py', + 'echo "c.NotebookApp.token=\'\'" >> $HOME/.jupyter/jupyter_notebook_config.py', + 'echo "c.NotebookApp.password=\'\'" >> $HOME/.jupyter/jupyter_notebook_config.py', + + # start jupyter notebook + 'PYSPARK_DRIVER_PYTHON=jupyter ' + + 'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' + + 'pyspark ' + + '--master spark://${AZ_BATCH_MASTER_NODE%:*}:7077 ' + + '--executor-memory 6400M ' + + '--driver-memory 6400M' + ] + def create_cluster( batch_client, pool_id, @@ -32,13 +119,7 @@ def create_cluster( _sku = 'linuxdsvm' # start task command - start_task_commands = [ - 'export SPARK_HOME=/dsvm/tools/spark/current', - 'export PATH=$PATH:$SPARK_HOME/bin', - 'chmod -R 777 $SPARK_HOME', - 'exit 0' - ] - + start_task_commands = install_cmd() # Get a verified node agent sku sku_to_use, image_ref_to_use = \ util.select_latest_verified_vm_image_with_node_agent_sku( @@ -104,6 +185,7 @@ def submit_app( app_id, app_file_path, app_file_name): + #TODO add 'wait' param """ Submit a spark app @@ -126,45 +208,11 @@ def submit_app( app_resource_file = \ util.upload_file_to_container( blob_client, container_name = app_id, file_path = app_file_path) - - # configure multi-instance task commands - coordination_commands = [ - 'echo CCP_NODES:', - 'echo $CCP_NODES', - 'echo AZ_BATCH_NODE_LIST:', - 'echo $AZ_BATCH_NODE_LIST', - 'echo AZ_BATCH_HOST_LIST:', - 'echo $AZ_BATCH_HOST_LIST', - 'echo AZ_BATCH_MASTER_NODE:', - 'echo $AZ_BATCH_MASTER_NODE', - 'echo AZ_BATCH_IS_CURRENT_NODE_MASTER:', - 'echo $AZ_BATCH_IS_CURRENT_NODE_MASTER', - # set SPARK_HOME environment vars - 'export SPARK_HOME=/dsvm/tools/spark/current', - 'export PATH=$PATH:$SPARK_HOME/bin', - # create a 'slaves' file from the slaves.template in $SPARK_HOME/conf - 'cp $SPARK_HOME/conf/slaves.template $SPARK_HOME/conf/slaves' - # create a new line in the slaves file - 'echo >> $SPARK_HOME/conf/slaves', - # add batch pool ips to newly created slaves files - 'IFS="," read -r -a workerips <<< $AZ_BATCH_HOST_LIST', - 'for index in "${!workerips[@]}"', - 'do echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves', - 'echo "${workerips[index]}"', - 'done' - ] - application_commands = [ - # set SPARK_HOME environment vars - 'export SPARK_HOME=/dsvm/tools/spark/current', - 'export PATH=$PATH:$SPARK_HOME/bin', - # kick off start-all spark command as a bg process - '($SPARK_HOME/sbin/start-all.sh &)', - # execute spark-submit on the specified app - '$SPARK_HOME/bin/spark-submit ' + - '--master spark://${AZ_BATCH_MASTER_NODE%:*}:7077 ' + - '$AZ_BATCH_TASK_WORKING_DIR/' + app_file_name - ] + # create application/coordination commands + coordination_cmd = connect_cmd() + application_cmd = custom_app_cmd(_WEBUI_PORT, app_file_name) + # Get pool size pool = batch_client.pool.get(pool_id) pool_size = pool.target_dedicated @@ -172,12 +220,12 @@ def submit_app( # Create multi-instance task task = batch_models.TaskAddParameter( id = app_id, - command_line = util.wrap_commands_in_shell(application_commands), + command_line = util.wrap_commands_in_shell(application_cmd), resource_files = [app_resource_file], run_elevated = False, multi_instance_settings = batch_models.MultiInstanceSettings( number_of_instances = pool_size, - coordination_command_line = util.wrap_commands_in_shell(coordination_commands), + coordination_command_line = util.wrap_commands_in_shell(coordination_cmd), common_resource_files = [])) # Add task to batch job (which has the same name as pool_id) @@ -195,7 +243,8 @@ def ssh_app( pool_id, app_id, username, - password): + password, + ports = None): """ SSH into head node of spark-app @@ -210,6 +259,8 @@ def ssh_app( :type username: string :param password: The password to access the head node via ssh :type password: string + :param ports: A list of ports to open tunnels to + :type ports: [] """ # Get master node id from task @@ -237,12 +288,90 @@ def ssh_app( master_node_port = remote_login_settings.remote_login_port # build ssh tunnel command - ssh_tunnel_command = "ssh -L 8080:localhost:8080 " + \ - username + "@" + str(master_node_ip) + " -p " + str(master_node_port) + ssh_command = "ssh " + for port in ports: + ssh_command += "-L " + str(port) + ":localhost:" + str(port) + " " + ssh_command += username + "@" + str(master_node_ip) + " -p " + str(master_node_port) print('\nuse the following command to connect to your spark head node:') print() - print('\t%s' % ssh_tunnel_command) + print('\t%s' % ssh_command) print() +#TODO actually print +def list_apps( + batch_client, + pool_id): + """ + List all spark apps for a given cluster + :param batch_client: the batch client to use + :type batch_client: 'batchserviceclient.BatchServiceClient' + :param pool_id: The id of the pool to submit app to + :type pool_id: string + """ + apps = batch_client.task.list(job_id=pool_id) + print(apps) + +# TODO not working +def jupyter( + batch_client, + pool_id, + app_id, + username, + password): + """ + Install jupyter, create app_id and open ssh tunnel + + :param batch_client: the batch client to use + :type batch_client: 'batchserviceclient.BatchServiceClient' + :param pool_id: The id of the pool to submit app to + :type pool_id: string + :param username: The username to access the head node via ssh + :type username: string + :param password: The password to access the head node via ssh + :type password: string + + """ + + # create application/coordination commands + coordination_cmd = connect_cmd() + application_cmd = jupyter_cmd(_WEBUI_PORT, _JUPYTER_PORT) + + # Get pool size + pool = batch_client.pool.get(pool_id) + pool_size = pool.target_dedicated + + # Create multi-instance task + task = batch_models.TaskAddParameter( + id = app_id, + command_line = util.wrap_commands_in_shell(application_cmd), + resource_files = [], + run_elevated = False, + multi_instance_settings = batch_models.MultiInstanceSettings( + number_of_instances = pool_size, + coordination_command_line = util.wrap_commands_in_shell(coordination_cmd), + common_resource_files = [])) + + # Add task to batch job (which has the same name as pool_id) + job_id = pool_id + batch_client.task.add(job_id = job_id, task = task) + + # get job id (job id is the same as pool id) + job_id = pool_id + + # Wait for the app to finish + util.wait_for_tasks_to_complete( + batch_client, + job_id, + datetime.timedelta(minutes=60)) + + # print ssh command + ssh_app( + batch_client, + pool_id, + app_id, + username, + password, + ports = [_JUPYTER_PORT, _WEBUI_PORT]) + diff --git a/setup.py b/setup.py index 8a1c2788..39cdb4cb 100644 --- a/setup.py +++ b/setup.py @@ -11,5 +11,7 @@ setup(name='redbull', scripts=['bin/spark-cluster-create', 'bin/spark-cluster-delete', 'bin/spark-app-submit', - 'bin/spark-app-ssh'], + 'bin/spark-app-ssh', + 'bin/spark-app-list', + 'bin/spark-app-jupyter'], zip_safe=False)