зеркало из https://github.com/Azure/aztk.git
attach user to the cluster
This commit is contained in:
Родитель
0de0450843
Коммит
4954d6997d
|
@ -25,6 +25,7 @@ if __name__ == '__main__':
|
|||
_pool_id = None
|
||||
_vm_count = None
|
||||
_vm_size = None
|
||||
|
||||
_wait = True
|
||||
|
||||
# parse arguments
|
||||
|
@ -45,21 +46,20 @@ if __name__ == '__main__':
|
|||
print()
|
||||
if args.cluster_id is not None:
|
||||
_pool_id = args.cluster_id
|
||||
print("spark cluster id: %s" % _pool_id)
|
||||
print("spark cluster id: %s" % _pool_id)
|
||||
|
||||
if args.cluster_size is not None:
|
||||
_vm_count = args.cluster_size
|
||||
print("spark cluster size: %i" % _vm_count)
|
||||
print("spark cluster size: %i" % _vm_count)
|
||||
|
||||
if args.cluster_vm_size is not None:
|
||||
_vm_size = args.cluster_vm_size
|
||||
print("spark cluster vm size: %s" % _vm_size)
|
||||
|
||||
print("spark cluster vm size: %s" % _vm_size)
|
||||
|
||||
if args.wait is not None:
|
||||
if args.wait == False:
|
||||
_wait = False
|
||||
print("wait for cluster: %r" % _wait)
|
||||
|
||||
print("wait for cluster: %r" % _wait)
|
||||
|
||||
# Read config file
|
||||
global_config = configparser.ConfigParser()
|
||||
|
|
|
@ -50,7 +50,7 @@ if __name__ == '__main__':
|
|||
if args.password is not None:
|
||||
_password = args.password
|
||||
print("az_spark password: %s" % _password)
|
||||
|
||||
|
||||
# Read config file
|
||||
global_config = configparser.ConfigParser()
|
||||
global_config.read(_config_path)
|
||||
|
@ -73,16 +73,12 @@ if __name__ == '__main__':
|
|||
# Set retry policy
|
||||
batch_client.config.retry_policy.retries = 5
|
||||
|
||||
# set app_id to jupyter
|
||||
app_id = "jupyter-" + _pool_id
|
||||
|
||||
# get ssh command
|
||||
sparklib.jupyter(
|
||||
sparklib.create_user(
|
||||
batch_client,
|
||||
pool_id = _pool_id,
|
||||
app_id = app_id,
|
||||
username = _username,
|
||||
password = _password)
|
||||
_pool_id,
|
||||
_username,
|
||||
_password)
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,78 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from redbull import sparklib
|
||||
|
||||
try:
|
||||
import configparser
|
||||
except ImportError:
|
||||
import ConfigParser as configparser
|
||||
|
||||
import os
|
||||
import datetime
|
||||
import random
|
||||
import argparse
|
||||
|
||||
import azure.batch.batch_service_client as batch
|
||||
import azure.batch.batch_auth as batch_auth
|
||||
import azure.batch.models as batch_models
|
||||
import azure.storage.blob as blob
|
||||
|
||||
# config file path
|
||||
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
_pool_id = None
|
||||
_local_port = 7777
|
||||
|
||||
# parse arguments
|
||||
parser = argparse.ArgumentParser(prog="az_spark")
|
||||
|
||||
parser.add_argument("--cluster-id", required=True,
|
||||
help="the unique name of your spark cluster")
|
||||
parser.add_argument("--local-port",
|
||||
help="the port you want your jupyter notebook session mapped to")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
print()
|
||||
if args.cluster_id is not None:
|
||||
_pool_id = args.cluster_id
|
||||
print("spark cluster id: %s" % _pool_id)
|
||||
|
||||
if args.local_port is not None:
|
||||
_local_port = args.local_port
|
||||
print("local port: %i" % _local_port)
|
||||
|
||||
# Read config file
|
||||
global_config = configparser.ConfigParser()
|
||||
global_config.read(_config_path)
|
||||
|
||||
# Set up the configuration
|
||||
batch_account_key = global_config.get('Batch', 'batchaccountkey')
|
||||
batch_account_name = global_config.get('Batch', 'batchaccountname')
|
||||
batch_service_url = global_config.get('Batch', 'batchserviceurl')
|
||||
|
||||
# Set up SharedKeyCredentials
|
||||
credentials = batch_auth.SharedKeyCredentials(
|
||||
batch_account_name,
|
||||
batch_account_key)
|
||||
|
||||
# Set up Batch Client
|
||||
batch_client = batch.BatchServiceClient(
|
||||
credentials,
|
||||
base_url=batch_service_url)
|
||||
|
||||
# Set retry policy
|
||||
batch_client.config.retry_policy.retries = 5
|
||||
|
||||
# get ssh command
|
||||
sparklib.jupyter(
|
||||
batch_client,
|
||||
pool_id = _pool_id,
|
||||
local_port = _local_port)
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -23,20 +23,11 @@ _config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
|
|||
if __name__ == '__main__':
|
||||
|
||||
_pool_id = None
|
||||
_app_id = None
|
||||
_username = 'admin'
|
||||
_password = 'pass123!'
|
||||
|
||||
# parse arguments
|
||||
parser = argparse.ArgumentParser(prog="az_spark")
|
||||
|
||||
parser.add_argument("--cluster-id", required=True,
|
||||
help="the unique name of your spark cluster")
|
||||
parser.add_argument("-u", "--user",
|
||||
help="the relative path to your spark app in your directory")
|
||||
parser.add_argument("-p", "--password",
|
||||
help="the relative path to your spark app in your directory")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
print()
|
||||
|
@ -44,14 +35,6 @@ if __name__ == '__main__':
|
|||
_pool_id = args.cluster_id
|
||||
print("spark cluster id: %s" % _pool_id)
|
||||
|
||||
if args.user is not None:
|
||||
_username = args.user
|
||||
print("az_spark username: %s" % _username)
|
||||
|
||||
if args.password is not None:
|
||||
_password = args.password
|
||||
print("az_spark password: %s" % _password)
|
||||
|
||||
# Read config file
|
||||
global_config = configparser.ConfigParser()
|
||||
global_config.read(_config_path)
|
||||
|
@ -77,10 +60,7 @@ if __name__ == '__main__':
|
|||
# get ssh command
|
||||
sparklib.ssh(
|
||||
batch_client,
|
||||
pool_id = _pool_id,
|
||||
username = _username,
|
||||
password = _password,
|
||||
ports = [8082])
|
||||
pool_id = _pool_id)
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from redbull import sparklib
|
||||
|
||||
try:
|
||||
import configparser
|
||||
except ImportError:
|
||||
import ConfigParser as configparser
|
||||
|
||||
import os
|
||||
import datetime
|
||||
import random
|
||||
import argparse
|
||||
|
||||
import azure.batch.batch_service_client as batch
|
||||
import azure.batch.batch_auth as batch_auth
|
||||
import azure.batch.models as batch_models
|
||||
import azure.storage.blob as blob
|
||||
|
||||
# config file path
|
||||
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
_pool_id = None
|
||||
_local_port = 8080
|
||||
|
||||
# parse arguments
|
||||
parser = argparse.ArgumentParser(prog="az_spark")
|
||||
|
||||
parser.add_argument("--cluster-id", required=True,
|
||||
help="the unique name of your spark cluster")
|
||||
parser.add_argument("--local-port",
|
||||
help="the port you want your spark webui session mapped to")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
print()
|
||||
if args.cluster_id is not None:
|
||||
_pool_id = args.cluster_id
|
||||
print("spark cluster id: %s" % _pool_id)
|
||||
|
||||
if args.local_port is not None:
|
||||
_local_port = args.local_port
|
||||
print("local port: %i" % _local_port)
|
||||
|
||||
# Read config file
|
||||
global_config = configparser.ConfigParser()
|
||||
global_config.read(_config_path)
|
||||
|
||||
# Set up the configuration
|
||||
batch_account_key = global_config.get('Batch', 'batchaccountkey')
|
||||
batch_account_name = global_config.get('Batch', 'batchaccountname')
|
||||
batch_service_url = global_config.get('Batch', 'batchserviceurl')
|
||||
|
||||
# Set up SharedKeyCredentials
|
||||
credentials = batch_auth.SharedKeyCredentials(
|
||||
batch_account_name,
|
||||
batch_account_key)
|
||||
|
||||
# Set up Batch Client
|
||||
batch_client = batch.BatchServiceClient(
|
||||
credentials,
|
||||
base_url=batch_service_url)
|
||||
|
||||
# Set retry policy
|
||||
batch_client.config.retry_policy.retries = 5
|
||||
|
||||
# get ssh command
|
||||
sparklib.webui(
|
||||
batch_client,
|
||||
pool_id = _pool_id,
|
||||
local_port = _local_port)
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -63,7 +63,7 @@ def cluster_start_cmd(webui_port, jupyter_port):
|
|||
# get master node ip
|
||||
'export MASTER_NODE=$(cat $SPARK_HOME/conf/master)',
|
||||
|
||||
# kick off start-all spark command as a bg process
|
||||
# kick off start-all spark command (which starts web ui)
|
||||
'($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)',
|
||||
|
||||
# jupyter setup: remove auth
|
||||
|
@ -73,12 +73,12 @@ def cluster_start_cmd(webui_port, jupyter_port):
|
|||
'echo c.NotebookApp.password=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
|
||||
|
||||
# start jupyter notebook
|
||||
'PYSPARK_DRIVER_PYTHON=/anaconda/envs/py35/bin/jupyter ' +
|
||||
'(PYSPARK_DRIVER_PYTHON=/anaconda/envs/py35/bin/jupyter ' +
|
||||
'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' +
|
||||
'pyspark ' +
|
||||
'--master spark://${MASTER_NODE%:*}:7077 ' +
|
||||
'--executor-memory 6400M ' +
|
||||
'--driver-memory 6400M'
|
||||
'--driver-memory 6400M &)'
|
||||
]
|
||||
|
||||
def app_submit_cmd(webui_port, app_file_name):
|
||||
|
@ -141,8 +141,11 @@ def create_cluster(
|
|||
enable_inter_node_communication = True,
|
||||
max_tasks_per_node = 1)
|
||||
|
||||
# Create the pool
|
||||
util.create_pool_if_not_exist(batch_client, pool, wait=wait)
|
||||
# Create the pool + create user for the pool
|
||||
util.create_pool_if_not_exist(
|
||||
batch_client,
|
||||
pool,
|
||||
wait)
|
||||
|
||||
# Create job
|
||||
job = batch_models.JobAddParameter(
|
||||
|
@ -180,6 +183,29 @@ def create_cluster(
|
|||
job_id,
|
||||
datetime.timedelta(minutes=60))
|
||||
|
||||
def create_user(
|
||||
batch_client,
|
||||
pool_id,
|
||||
username,
|
||||
password):
|
||||
"""
|
||||
Create a cluster user
|
||||
"""
|
||||
|
||||
# Get master node id from task (job and task are both named pool_id)
|
||||
master_node_id = batch_client.task \
|
||||
.get(job_id=pool_id, task_id=pool_id) \
|
||||
.node_info.node_id
|
||||
|
||||
# Create new ssh user for the master node
|
||||
batch_client.compute_node.add_user(
|
||||
pool_id,
|
||||
master_node_id,
|
||||
batch_models.ComputeNodeUser(
|
||||
username,
|
||||
is_admin = True,
|
||||
password = password))
|
||||
|
||||
|
||||
def get_cluster_details(
|
||||
batch_client,
|
||||
|
@ -313,12 +339,12 @@ def submit_app(
|
|||
def ssh(
|
||||
batch_client,
|
||||
pool_id,
|
||||
username,
|
||||
password,
|
||||
ports = None):
|
||||
|
||||
"""
|
||||
SSH into head node of spark-app
|
||||
:param ports: an list of local and remote ports
|
||||
:type ports: [[<local-port>, <remote-port>]]
|
||||
"""
|
||||
|
||||
# Get master node id from task (job and task are both named pool_id)
|
||||
|
@ -326,18 +352,6 @@ def ssh(
|
|||
.get(job_id=pool_id, task_id=pool_id) \
|
||||
.node_info.node_id
|
||||
|
||||
# Create new ssh user for the master node
|
||||
batch_client.compute_node.add_user(
|
||||
pool_id,
|
||||
master_node_id,
|
||||
batch_models.ComputeNodeUser(
|
||||
username,
|
||||
is_admin = True,
|
||||
password = password))
|
||||
|
||||
print('\nuser "{}" added to node "{}"in pool "{}"'.format(
|
||||
username, master_node_id, pool_id))
|
||||
|
||||
# get remote login settings for the user
|
||||
remote_login_settings = batch_client.compute_node.get_remote_login_settings(
|
||||
pool_id, master_node_id)
|
||||
|
@ -348,14 +362,32 @@ def ssh(
|
|||
# build ssh tunnel command
|
||||
ssh_command = "ssh "
|
||||
for port in ports:
|
||||
ssh_command += "-L " + str(port) + ":localhost:" + str(port) + " "
|
||||
ssh_command += username + "@" + str(master_node_ip) + " -p " + str(master_node_port)
|
||||
ssh_command += "-L " + str(port[0]) + ":localhost:" + str(port[1]) + " "
|
||||
ssh_command += "<username>@" + str(master_node_ip) + " -p " + str(master_node_port)
|
||||
|
||||
print('\nuse the following command to connect to your spark head node:')
|
||||
print()
|
||||
print('\t%s' % ssh_command)
|
||||
print()
|
||||
|
||||
def jupyter(
|
||||
batch_client,
|
||||
pool_id,
|
||||
local_port):
|
||||
"""
|
||||
SSH tunnel for Jupyter
|
||||
"""
|
||||
ssh(batch_client, pool_id, [[local_port, _JUPYTER_PORT]])
|
||||
|
||||
def webui(
|
||||
batch_client,
|
||||
pool_id,
|
||||
local_port):
|
||||
"""
|
||||
SSH tunnel for spark web-ui
|
||||
"""
|
||||
ssh(batch_client, pool_id, [[local_port, _WEBUI_PORT]])
|
||||
|
||||
def list_apps(
|
||||
batch_client,
|
||||
pool_id):
|
||||
|
@ -366,55 +398,4 @@ def list_apps(
|
|||
#TODO actually print
|
||||
print(apps)
|
||||
|
||||
# TODO not working
|
||||
def jupyter(
|
||||
batch_client,
|
||||
pool_id,
|
||||
app_id,
|
||||
username,
|
||||
password):
|
||||
"""
|
||||
Open jupyter ssh tunnel
|
||||
"""
|
||||
|
||||
# create application/coordination commands
|
||||
coordination_cmd = cluster_connect_cmd()
|
||||
application_cmd = jupyter_cmd(_WEBUI_PORT, _JUPYTER_PORT)
|
||||
|
||||
# Get pool size
|
||||
pool = batch_client.pool.get(pool_id)
|
||||
pool_size = pool.target_dedicated
|
||||
|
||||
# Create multi-instance task
|
||||
task = batch_models.TaskAddParameter(
|
||||
id = app_id,
|
||||
command_line = util.wrap_commands_in_shell(application_cmd),
|
||||
resource_files = [],
|
||||
run_elevated = False,
|
||||
multi_instance_settings = batch_models.MultiInstanceSettings(
|
||||
number_of_instances = pool_size,
|
||||
coordination_command_line = util.wrap_commands_in_shell(coordination_cmd),
|
||||
common_resource_files = []))
|
||||
|
||||
# Add task to batch job (which has the same name as pool_id)
|
||||
job_id = pool_id
|
||||
batch_client.task.add(job_id = job_id, task = task)
|
||||
|
||||
# get job id (job id is the same as pool id)
|
||||
job_id = pool_id
|
||||
|
||||
# Wait for the app to finish
|
||||
util.wait_for_tasks_to_complete(
|
||||
batch_client,
|
||||
job_id,
|
||||
datetime.timedelta(minutes=60))
|
||||
|
||||
# print ssh command
|
||||
ssh_app(
|
||||
batch_client,
|
||||
pool_id,
|
||||
app_id,
|
||||
username,
|
||||
password,
|
||||
ports = [_JUPYTER_PORT, _WEBUI_PORT])
|
||||
|
||||
|
||||
|
|
|
@ -128,6 +128,7 @@ def wait_for_all_nodes_state(batch_client, pool, node_state):
|
|||
'resize error encountered for pool {}: {!r}'.format(
|
||||
pool.id, pool.resize_error))
|
||||
nodes = list(batch_client.compute_node.list(pool.id))
|
||||
|
||||
if (len(nodes) >= pool.target_dedicated and
|
||||
all(node.state in node_state for node in nodes)):
|
||||
return nodes
|
||||
|
@ -248,4 +249,4 @@ def get_connection_info(batch_client, pool_id, node_id):
|
|||
pool_id, node_id)
|
||||
remote_ip = rls.remote_login_ip_address
|
||||
ssh_port = str(rls.remote_login_port)
|
||||
return (remote_ip, ssh_port)
|
||||
return (remote_ip, ssh_port)
|
||||
|
|
6
setup.py
6
setup.py
|
@ -10,10 +10,12 @@ setup(name='redbull',
|
|||
packages=['redbull'],
|
||||
scripts=['bin/spark-cluster-create',
|
||||
'bin/spark-cluster-delete',
|
||||
'bin/spark-cluster-create-user',
|
||||
'bin/spark-cluster-ssh',
|
||||
'bin/spark-cluster-jupyter',
|
||||
'bin/spark-cluster-webui',
|
||||
'bin/spark-cluster-get',
|
||||
'bin/spark-cluster-list',
|
||||
'bin/spark-app-submit',
|
||||
'bin/spark-app-list',
|
||||
'bin/spark-app-jupyter'],
|
||||
'bin/spark-app-list']
|
||||
zip_safe=False)
|
||||
|
|
Загрузка…
Ссылка в новой задаче