Merged PR 2: Merge ssh-update to master

## ssh update:
enabled jupyter, attached user to the cluster lvl (as opposed to job level)

## features:
* new cli cmd: spark-cluster-create-user (create user is needed because we cannot automatically create a user on pool creation since we don't necessarily wait for cluster creation to finish)
* new cli cmd: spark-cluster-jupyter
* new cli cmd: spark-cluster-webui
* spark-cluster-ssh does not tunnel any ports by default
This commit is contained in:
JS Tan 2017-04-17 05:10:41 +00:00 коммит произвёл paselem
Родитель aaa1cad029
Коммит 6ea20a76ba
8 изменённых файлов: 246 добавлений и 198 удалений

Просмотреть файл

@ -25,6 +25,7 @@ if __name__ == '__main__':
_pool_id = None
_vm_count = None
_vm_size = None
_wait = True
# parse arguments
@ -60,7 +61,6 @@ if __name__ == '__main__':
_wait = False
print("wait for cluster: %r" % _wait)
# Read config file
global_config = configparser.ConfigParser()
global_config.read(_config_path)

Просмотреть файл

@ -73,16 +73,12 @@ if __name__ == '__main__':
# Set retry policy
batch_client.config.retry_policy.retries = 5
# set app_id to jupyter
app_id = "jupyter-" + _pool_id
# get ssh command
sparklib.jupyter(
sparklib.create_user(
batch_client,
pool_id = _pool_id,
app_id = app_id,
username = _username,
password = _password)
_pool_id,
_username,
_password)

78
bin/spark-cluster-jupyter Executable file
Просмотреть файл

@ -0,0 +1,78 @@
#!/usr/bin/env python
from redbull import sparklib
try:
import configparser
except ImportError:
import ConfigParser as configparser
import os
import datetime
import random
import argparse
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.models as batch_models
import azure.storage.blob as blob
# config file path
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
if __name__ == '__main__':
_pool_id = None
_local_port = 7777
# parse arguments
parser = argparse.ArgumentParser(prog="az_spark")
parser.add_argument("--cluster-id", required=True,
help="the unique name of your spark cluster")
parser.add_argument("--local-port",
help="the port you want your jupyter notebook session mapped to")
args = parser.parse_args()
print()
if args.cluster_id is not None:
_pool_id = args.cluster_id
print("spark cluster id: %s" % _pool_id)
if args.local_port is not None:
_local_port = args.local_port
print("local port: %i" % _local_port)
# Read config file
global_config = configparser.ConfigParser()
global_config.read(_config_path)
# Set up the configuration
batch_account_key = global_config.get('Batch', 'batchaccountkey')
batch_account_name = global_config.get('Batch', 'batchaccountname')
batch_service_url = global_config.get('Batch', 'batchserviceurl')
# Set up SharedKeyCredentials
credentials = batch_auth.SharedKeyCredentials(
batch_account_name,
batch_account_key)
# Set up Batch Client
batch_client = batch.BatchServiceClient(
credentials,
base_url=batch_service_url)
# Set retry policy
batch_client.config.retry_policy.retries = 5
# get ssh command
sparklib.jupyter(
batch_client,
pool_id = _pool_id,
local_port = _local_port)

Просмотреть файл

@ -23,20 +23,11 @@ _config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
if __name__ == '__main__':
_pool_id = None
_app_id = None
_username = 'admin'
_password = 'pass123!'
# parse arguments
parser = argparse.ArgumentParser(prog="az_spark")
parser.add_argument("--cluster-id", required=True,
help="the unique name of your spark cluster")
parser.add_argument("-u", "--user",
help="the relative path to your spark app in your directory")
parser.add_argument("-p", "--password",
help="the relative path to your spark app in your directory")
args = parser.parse_args()
print()
@ -44,14 +35,6 @@ if __name__ == '__main__':
_pool_id = args.cluster_id
print("spark cluster id: %s" % _pool_id)
if args.user is not None:
_username = args.user
print("az_spark username: %s" % _username)
if args.password is not None:
_password = args.password
print("az_spark password: %s" % _password)
# Read config file
global_config = configparser.ConfigParser()
global_config.read(_config_path)
@ -77,10 +60,7 @@ if __name__ == '__main__':
# get ssh command
sparklib.ssh(
batch_client,
pool_id = _pool_id,
username = _username,
password = _password,
ports = [8082])
pool_id = _pool_id)

78
bin/spark-cluster-webui Executable file
Просмотреть файл

@ -0,0 +1,78 @@
#!/usr/bin/env python
from redbull import sparklib
try:
import configparser
except ImportError:
import ConfigParser as configparser
import os
import datetime
import random
import argparse
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.models as batch_models
import azure.storage.blob as blob
# config file path
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
if __name__ == '__main__':
_pool_id = None
_local_port = 8080
# parse arguments
parser = argparse.ArgumentParser(prog="az_spark")
parser.add_argument("--cluster-id", required=True,
help="the unique name of your spark cluster")
parser.add_argument("--local-port",
help="the port you want your spark webui session mapped to")
args = parser.parse_args()
print()
if args.cluster_id is not None:
_pool_id = args.cluster_id
print("spark cluster id: %s" % _pool_id)
if args.local_port is not None:
_local_port = args.local_port
print("local port: %i" % _local_port)
# Read config file
global_config = configparser.ConfigParser()
global_config.read(_config_path)
# Set up the configuration
batch_account_key = global_config.get('Batch', 'batchaccountkey')
batch_account_name = global_config.get('Batch', 'batchaccountname')
batch_service_url = global_config.get('Batch', 'batchserviceurl')
# Set up SharedKeyCredentials
credentials = batch_auth.SharedKeyCredentials(
batch_account_name,
batch_account_key)
# Set up Batch Client
batch_client = batch.BatchServiceClient(
credentials,
base_url=batch_service_url)
# Set retry policy
batch_client.config.retry_policy.retries = 5
# get ssh command
sparklib.webui(
batch_client,
pool_id = _pool_id,
local_port = _local_port)

Просмотреть файл

@ -9,9 +9,6 @@ _WEBUI_PORT = 8082
_JUPYTER_PORT = 7777
def cluster_install_cmd():
'''
this command is run-elevated
'''
return [
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
@ -47,10 +44,9 @@ def cluster_connect_cmd():
'cp $SPARK_HOME/conf/slaves $SPARK_HOME/conf/master',
# add batch pool ips to newly created slaves files
# make file name master
'IFS="," read -r -a workerips <<< $AZ_BATCH_HOST_LIST',
'for index in "${!workerips[@]}"',
'do echo "{workerips[index]}"',
'do echo "${workerips[index]}"',
'if [ "${AZ_BATCH_MASTER_NODE%:*}" = "${workerips[index]}" ]',
'then echo "${workerips[index]}" >> $SPARK_HOME/conf/master',
'else echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves',
@ -58,14 +54,31 @@ def cluster_connect_cmd():
'done'
]
def cluster_start_cmd(webui_port):
def cluster_start_cmd(webui_port, jupyter_port):
return [
# set SPARK_HOME environment vars
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
# kick off start-all spark command as a bg process
# get master node ip
'export MASTER_NODE=$(cat $SPARK_HOME/conf/master)',
# kick off start-all spark command (which starts web ui)
'($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)',
# jupyter setup: remove auth
'/anaconda/envs/py35/bin/jupyter notebook --generate-config',
'echo >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo c.NotebookApp.token=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo c.NotebookApp.password=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
# start jupyter notebook
'(PYSPARK_DRIVER_PYTHON=/anaconda/envs/py35/bin/jupyter ' +
'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' +
'pyspark ' +
'--master spark://${MASTER_NODE%:*}:7077 ' +
'--executor-memory 6400M ' +
'--driver-memory 6400M &)'
]
def app_submit_cmd(webui_port, app_file_name):
@ -87,32 +100,6 @@ def app_submit_cmd(webui_port, app_file_name):
'$AZ_BATCH_TASK_WORKING_DIR/' + app_file_name
]
# TODO not working
def jupyter_cmd(webui_port, jupyter_port):
return [
# set SPARK_HOME environment vars
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
# kick off start-all spark command as a bg process
'($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)',
# jupyter setup: remove auth
'mkdir $HOME/.jupyter',
'touch $HOME/.jupyter/jupyter_notebook_config.py',
'echo >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo "c.NotebookApp.token=\'\'" >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo "c.NotebookApp.password=\'\'" >> $HOME/.jupyter/jupyter_notebook_config.py',
# start jupyter notebook
'PYSPARK_DRIVER_PYTHON=jupyter ' +
'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' +
'pyspark ' +
'--master spark://${AZ_BATCH_MASTER_NODE%:*}:7077 ' +
'--executor-memory 6400M ' +
'--driver-memory 6400M'
]
def create_cluster(
batch_client,
pool_id,
@ -121,17 +108,6 @@ def create_cluster(
wait = True):
"""
Create a spark cluster
:param batch_client: the batch client to use
:type batch_client: 'batchserviceclient.BatchServiceClient'
:param pool_id: The id of the pool to create
:type pool_id: string
:param vm_count: the number of nodes in the pool
:type vm_count: int
:param vm_size: The vm size to use
:type vm_size: string
:param wait: whether or not to wait for pool creation to compelete
:type wait: boolean
"""
# vm image
@ -165,8 +141,11 @@ def create_cluster(
enable_inter_node_communication = True,
max_tasks_per_node = 1)
# Create the pool
util.create_pool_if_not_exist(batch_client, pool, wait=wait)
# Create the pool + create user for the pool
util.create_pool_if_not_exist(
batch_client,
pool,
wait)
# Create job
job = batch_models.JobAddParameter(
@ -178,7 +157,7 @@ def create_cluster(
# create application/coordination commands
coordination_cmd = cluster_connect_cmd()
application_cmd = cluster_start_cmd(_WEBUI_PORT)
application_cmd = cluster_start_cmd(_WEBUI_PORT, _JUPYTER_PORT)
# reuse pool_id as multi-instance task id
task_id = pool_id
@ -204,6 +183,29 @@ def create_cluster(
job_id,
datetime.timedelta(minutes=60))
def create_user(
batch_client,
pool_id,
username,
password):
"""
Create a cluster user
"""
# Get master node id from task (job and task are both named pool_id)
master_node_id = batch_client.task \
.get(job_id=pool_id, task_id=pool_id) \
.node_info.node_id
# Create new ssh user for the master node
batch_client.compute_node.add_user(
pool_id,
master_node_id,
batch_models.ComputeNodeUser(
username,
is_admin = True,
password = password))
def get_cluster_details(
batch_client,
@ -275,11 +277,6 @@ def delete_cluster(
pool_id):
"""
Delete a spark cluster
:param batch_client: the batch client to use
:type batch_client: 'batchserviceclient.BatchServiceClient'
:param pool_id: The id of the pool to create
:type pool_id: string
"""
# delete pool by id
pool = batch_client.pool.get(pool_id)
@ -306,19 +303,6 @@ def submit_app(
"""
Submit a spark app
:param batch_client: the batch client to use
:type batch_client: 'batchserviceclient.BatchServiceClient'
:param block_blob_client: A blob service client.
:type block_blob_client: `azure.storage.blob.BlockBlobService`
:param pool_id: The id of the pool to submit app to
:type pool_id: string
:param app_id: The id of the spark app (corresponds to batch task)
:type app_id: string
:param app_file_path: The path of the spark app to run
:type app_file_path: string
:param app_file_name: The name of the spark app file to run
:type app_file_name: string
"""
# Upload app resource files to blob storage
@ -355,23 +339,12 @@ def submit_app(
def ssh(
batch_client,
pool_id,
username,
password,
ports = None):
"""
SSH into head node of spark-app
:param batch_client: the batch client to use
:type batch_client: 'batchserviceclient.BatchServiceClient'
:param pool_id: The id of the pool to submit app to
:type pool_id: string
:param username: The username to access the head node via ssh
:type username: string
:param password: The password to access the head node via ssh
:type password: string
:param ports: A list of ports to open tunnels to
:type ports: [<int>]
:param ports: an list of local and remote ports
:type ports: [[<local-port>, <remote-port>]]
"""
# Get master node id from task (job and task are both named pool_id)
@ -379,18 +352,6 @@ def ssh(
.get(job_id=pool_id, task_id=pool_id) \
.node_info.node_id
# Create new ssh user for the master node
batch_client.compute_node.add_user(
pool_id,
master_node_id,
batch_models.ComputeNodeUser(
username,
is_admin = True,
password = password))
print('\nuser "{}" added to node "{}"in pool "{}"'.format(
username, master_node_id, pool_id))
# get remote login settings for the user
remote_login_settings = batch_client.compute_node.get_remote_login_settings(
pool_id, master_node_id)
@ -401,88 +362,40 @@ def ssh(
# build ssh tunnel command
ssh_command = "ssh "
for port in ports:
ssh_command += "-L " + str(port) + ":localhost:" + str(port) + " "
ssh_command += username + "@" + str(master_node_ip) + " -p " + str(master_node_port)
ssh_command += "-L " + str(port[0]) + ":localhost:" + str(port[1]) + " "
ssh_command += "<username>@" + str(master_node_ip) + " -p " + str(master_node_port)
print('\nuse the following command to connect to your spark head node:')
print()
print('\t%s' % ssh_command)
print()
def jupyter(
batch_client,
pool_id,
local_port):
"""
SSH tunnel for Jupyter
"""
ssh(batch_client, pool_id, [[local_port, _JUPYTER_PORT]])
def webui(
batch_client,
pool_id,
local_port):
"""
SSH tunnel for spark web-ui
"""
ssh(batch_client, pool_id, [[local_port, _WEBUI_PORT]])
def list_apps(
batch_client,
pool_id):
"""
List all spark apps for a given cluster
:param batch_client: the batch client to use
:type batch_client: 'batchserviceclient.BatchServiceClient'
:param pool_id: The id of the pool to submit app to
:type pool_id: string
"""
apps = batch_client.task.list(job_id=pool_id)
#TODO actually print
print(apps)
# TODO not working
def jupyter(
batch_client,
pool_id,
app_id,
username,
password):
"""
Install jupyter, create app_id and open ssh tunnel
:param batch_client: the batch client to use
:type batch_client: 'batchserviceclient.BatchServiceClient'
:param pool_id: The id of the pool to submit app to
:type pool_id: string
:param username: The username to access the head node via ssh
:type username: string
:param password: The password to access the head node via ssh
:type password: string
"""
# create application/coordination commands
coordination_cmd = cluster_connect_cmd()
application_cmd = jupyter_cmd(_WEBUI_PORT, _JUPYTER_PORT)
# Get pool size
pool = batch_client.pool.get(pool_id)
pool_size = pool.target_dedicated
# Create multi-instance task
task = batch_models.TaskAddParameter(
id = app_id,
command_line = util.wrap_commands_in_shell(application_cmd),
resource_files = [],
run_elevated = False,
multi_instance_settings = batch_models.MultiInstanceSettings(
number_of_instances = pool_size,
coordination_command_line = util.wrap_commands_in_shell(coordination_cmd),
common_resource_files = []))
# Add task to batch job (which has the same name as pool_id)
job_id = pool_id
batch_client.task.add(job_id = job_id, task = task)
# get job id (job id is the same as pool id)
job_id = pool_id
# Wait for the app to finish
util.wait_for_tasks_to_complete(
batch_client,
job_id,
datetime.timedelta(minutes=60))
# print ssh command
ssh_app(
batch_client,
pool_id,
app_id,
username,
password,
ports = [_JUPYTER_PORT, _WEBUI_PORT])

Просмотреть файл

@ -128,6 +128,7 @@ def wait_for_all_nodes_state(batch_client, pool, node_state):
'resize error encountered for pool {}: {!r}'.format(
pool.id, pool.resize_error))
nodes = list(batch_client.compute_node.list(pool.id))
if (len(nodes) >= pool.target_dedicated and
all(node.state in node_state for node in nodes)):
return nodes

Просмотреть файл

@ -10,10 +10,12 @@ setup(name='redbull',
packages=['redbull'],
scripts=['bin/spark-cluster-create',
'bin/spark-cluster-delete',
'bin/spark-cluster-create-user',
'bin/spark-cluster-ssh',
'bin/spark-cluster-jupyter',
'bin/spark-cluster-webui',
'bin/spark-cluster-get',
'bin/spark-cluster-list',
'bin/spark-app-submit',
'bin/spark-app-list',
'bin/spark-app-jupyter'],
'bin/spark-app-list']
zip_safe=False)