This commit is contained in:
jiata 2017-06-13 07:46:16 -07:00
Родитель 5e2f053987
Коммит 6bc5842dad
3 изменённых файлов: 55 добавлений и 6 удалений

Просмотреть файл

@ -17,6 +17,7 @@ if __name__ == '__main__':
_pool_id = None
_vm_count = None
_vm_size = None
_custom_script = None
_wait = True
@ -29,6 +30,8 @@ if __name__ == '__main__':
help="number of vms in your cluster")
parser.add_argument("--cluster-vm-size", required=True,
help="size of each vm in your cluster")
parser.add_argument("--custom-script",
help="absolute path of custom bash script (.sh) to run on each node")
parser.add_argument('--wait', dest='wait', action='store_true')
parser.add_argument('--no-wait', dest='wait', action='store_false')
parser.set_defaults(wait=False)
@ -48,6 +51,10 @@ if __name__ == '__main__':
_vm_size = args.cluster_vm_size
print("spark cluster vm size: %s" % _vm_size)
if args.custom_script is not None:
_custom_script = args.custom_script
print("path to custom script: %s" % _custom_script)
if args.wait is not None:
if args.wait == False:
_wait = False
@ -62,15 +69,28 @@ if __name__ == '__main__':
batch_account_name = global_config.get('Batch', 'batchaccountname')
batch_service_url = global_config.get('Batch', 'batchserviceurl')
# Set up storage configuration
storage_account_key = global_config.get('Storage', 'storageaccountkey')
storage_account_name = global_config.get('Storage', 'storageaccountname')
storage_account_suffix = global_config.get('Storage', 'storageaccountsuffix')
# create batch client
batch_client = util.create_batch_client(
batch_account_key,
batch_account_name,
batch_service_url)
# create storage client
blob_client = util.create_blob_client(
storage_account_key,
storage_account_name,
storage_account_suffix)
# create spark cluster
clusterlib.create_cluster(
batch_client,
blob_client,
_custom_script,
_pool_id,
_vm_count,
_vm_size,

Просмотреть файл

@ -60,15 +60,18 @@ if __name__ == '__main__':
parser.add_argument("--jars",
help="Comma-separated list of local jars to include \
on the driver and executor classpaths")
on the driver and executor classpaths. Use \
absolute path to reference files.")
parser.add_argument("--py-files",
help="Comma-separated list of .zip, .egg, or .py files \
to place on the PYTHONPATH for Python apps.")
to place on the PYTHONPATH for Python apps. Use \
absolute path to reference files.")
parser.add_argument("--files",
help="Comma-separated list of .zip, .egg, or .py files \
to place on the PYTHONPATH for Python apps.")
to place on the PYTHONPATH for Python apps. Use \
absolute path ot reference files.")
parser.add_argument("--driver-java-options",
help="Extra Java options to pass to the driver.")
@ -95,7 +98,8 @@ if __name__ == '__main__':
available cores on the worker")
parser.add_argument("application", nargs='*',
help="App jar OR python file to execute")
help="App jar OR python file to execute. Use absolute \
path to reference file.")
args = parser.parse_args()

Просмотреть файл

@ -5,12 +5,23 @@ from datetime import datetime, timedelta
import azure.batch.models as batch_models
from subprocess import call
def cluster_install_cmd():
def cluster_install_cmd(custom_script_file):
run_custom_script = ''
if custom_script_file is not None:
run_custom_script = '/bin/sh -c ' + custom_script_file
return [
# setup spark home and permissions for spark folder
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
'chmod -R 777 $SPARK_HOME',
'chmod -R 777 /usr/local/share/jupyter/kernels',
# To avoid error: "sudo: sorry, you must have a tty to run sudo"
'sed -i -e "s/Defaults requiretty.*/ #Defaults requiretty/g" /etc/sudoers',
run_custom_script,
'exit 0'
]
@ -106,6 +117,8 @@ def cluster_start_cmd(webui_port, jupyter_port):
def create_cluster(
batch_client,
blob_client,
custom_script,
pool_id,
vm_count,
vm_size,
@ -122,8 +135,19 @@ def create_cluster(
# reuse pool_id as job_id
job_id = pool_id
# Upload custom script file
custom_script_resource_file = None
if custom_script is not None:
custom_script_resource_file = \
util.upload_file_to_container(
blob_client,
container_name = pool_id,
file_path = custom_script,
use_full_path = True)
# start task command
start_task_commands = cluster_install_cmd()
start_task_commands = \
cluster_install_cmd(custom_script)
# Get a verified node agent sku
sku_to_use, image_ref_to_use = \
@ -140,6 +164,7 @@ def create_cluster(
target_dedicated = vm_count,
start_task = batch_models.StartTask(
command_line = util.wrap_commands_in_shell(start_task_commands),
resource_files = [custom_script_resource_file],
user_identity = batch_models.UserIdentity(
auto_user = batch_models.AutoUserSpecification(
scope=batch_models.AutoUserScope.pool,