This commit is contained in:
jiata 2017-04-13 07:34:19 +00:00
Родитель e17d8776ff
Коммит 73b2b6d531
4 изменённых файлов: 131 добавлений и 34 удалений

102
bin/spark-app-ssh Executable file
Просмотреть файл

@ -0,0 +1,102 @@
#!/usr/bin/env python
from redbull import sparklib
try:
import configparser
except ImportError:
import ConfigParser as configparser
import os
import datetime
import random
import argparse
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.models as batch_models
import azure.storage.blob as blob
# config file path
_config_path = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
if __name__ == '__main__':
_pool_id = None
_app_id = None
_username = 'admin'
_password = 'pass123!'
# parse arguments
parser = argparse.ArgumentParser(prog="az_spark")
parser.add_argument("--cluster-id", required=True,
help="the unique name of your spark cluster")
parser.add_argument("--app-id", required=True,
help="the unique name of your spark app")
parser.add_argument("-u", "--user",
help="the relative path to your spark app in your directory")
parser.add_argument("-p", "--password",
help="the relative path to your spark app in your directory")
args = parser.parse_args()
print()
if args.cluster_id is not None:
_pool_id = args.cluster_id
print("spark cluster id: %s" % _pool_id)
if args.app_id is not None:
_app_id = args.app_id
print("spark job id: %s" % _app_id)
if args.user is not None:
_username = args.user
print("az_spark username: %s" % _username)
if args.password is not None:
_password = args.password
print("az_spark password: %s" % _password)
# Read config file
global_config = configparser.ConfigParser()
global_config.read(_config_path)
# Set up the configuration
batch_account_key = global_config.get('Batch', 'batchaccountkey')
batch_account_name = global_config.get('Batch', 'batchaccountname')
batch_service_url = global_config.get('Batch', 'batchserviceurl')
# Set up SharedKeyCredentials
credentials = batch_auth.SharedKeyCredentials(
batch_account_name,
batch_account_key)
# Set up Batch Client
batch_client = batch.BatchServiceClient(
credentials,
base_url=batch_service_url)
# Set retry policy
batch_client.config.retry_policy.retries = 5
# get ssh command
sparklib.ssh_app(
batch_client,
pool_id = _pool_id,
app_id = _app_id,
username = _username,
password = _password)

Просмотреть файл

@ -27,11 +27,9 @@ _deployment_suffix = str(random.randint(0,1000000))
if __name__ == '__main__':
_pool_id = None
_app_id = 'az-spark-' + _deployment_suffix
_app_id = None
_app_file_name = None
_app_file_path = None
_username = 'admin'
_password = 'pass123!'
# parse arguments
parser = argparse.ArgumentParser(prog="az_spark")
@ -42,10 +40,6 @@ if __name__ == '__main__':
help="the unique name of your spark app")
parser.add_argument("--file", required=True,
help="the relative path to your spark app in your directory")
parser.add_argument("-u", "--user",
help="the relative path to your spark app in your directory")
parser.add_argument("-p", "--password",
help="the relative path to your spark app in your directory")
args = parser.parse_args()
@ -64,14 +58,6 @@ if __name__ == '__main__':
print("spark job file path: %s" % _app_file_path)
print("spark job file name: %s" % _app_file_name)
if args.user is not None:
_username = args.user
print("az_spark username: %s" % _username)
if args.password is not None:
_password = args.password
print("az_spark password: %s" % _password)
# Read config file
global_config = configparser.ConfigParser()
global_config.read(_config_path)
@ -111,7 +97,5 @@ if __name__ == '__main__':
app_id = _app_id,
app_file_name = _app_file_name,
# app_file_path = os.path.join(os.path.dirname(__file__), _app_file_path),
app_file_path = _app_file_path,
username = _username,
password = _password)
app_file_path = _app_file_path)

Просмотреть файл

@ -103,9 +103,7 @@ def submit_app(
pool_id,
app_id,
app_file_path,
app_file_name,
username,
password):
app_file_name):
"""
Submit a spark app
@ -122,18 +120,8 @@ def submit_app(
:type app_file_path: string
:param app_file_name: The name of the spark app file to run
:type app_file_name: string
:param username: The username to access the head node via ssh
:type username: string
:param password: The password to access the head node via ssh
:type password: string
"""
# set multi-instance task id
# TODO create a real GUID instead of just a random number
deployment_suffix = str(random.randint(0,1000000))
multiinstance_task_id= 'az-spark-' + deployment_suffix
# Upload app resource files to blob storage
app_resource_file = \
util.upload_file_to_container(
@ -183,7 +171,7 @@ def submit_app(
# Create multi-instance task
task = batch_models.TaskAddParameter(
id = multiinstance_task_id,
id = app_id,
command_line = util.wrap_commands_in_shell(application_commands),
resource_files = [app_resource_file],
run_elevated = False,
@ -202,9 +190,31 @@ def submit_app(
job_id,
datetime.timedelta(minutes=60))
def ssh_app(
batch_client,
pool_id,
app_id,
username,
password):
"""
SSH into head node of spark-app
:param batch_client: the batch client to use
:type batch_client: 'batchserviceclient.BatchServiceClient'
:param pool_id: The id of the pool to submit app to
:type pool_id: string
:param app_id: The id of the spark app (corresponds to batch task)
:type app_id: string
:param username: The username to access the head node via ssh
:type username: string
:param password: The password to access the head node via ssh
:type password: string
"""
# Get master node id from task
master_node_id = batch_client.task \
.get(job_id=pool_id, task_id=multiinstance_task_id) \
.get(job_id=pool_id, task_id=app_id) \
.node_info.node_id
# Create new ssh user for the master node

Просмотреть файл

@ -10,5 +10,6 @@ setup(name='redbull',
packages=['redbull'],
scripts=['bin/spark-cluster-create',
'bin/spark-cluster-delete',
'bin/spark-app-submit'],
'bin/spark-app-submit',
'bin/spark-app-ssh'],
zip_safe=False)