diff --git a/bin/spark-cluster-ssh b/bin/spark-cluster-ssh index 0be182b5..e52347c0 100755 --- a/bin/spark-cluster-ssh +++ b/bin/spark-cluster-ssh @@ -1,7 +1,7 @@ #!/usr/bin/env python from dtde import clusterlib, util -import argparse +import argparse import os if __name__ == '__main__': @@ -18,13 +18,13 @@ if __name__ == '__main__': parser.add_argument('--id', dest="cluster_id", required=True, help='The unique id of your spark cluster') - parser.add_argument('--masterui', + parser.add_argument('--masterui', help='Local port to port spark\'s master UI to') - parser.add_argument('--webui', + parser.add_argument('--webui', help='Local port to port spark\'s webui to') - parser.add_argument('--jupyter', + parser.add_argument('--jupyter', help='Local port to port jupyter to') - parser.add_argument('-u', '--username', + parser.add_argument('-u', '--username', help='Username to spark cluster') parser.add_argument('--no-connect', action='store_false', @@ -33,7 +33,7 @@ if __name__ == '__main__': the command to run.') args = parser.parse_args() - + if (args.no_connect is True and args.username is None): print('You must specify a username in order to connect automatically.') exit() @@ -70,7 +70,7 @@ if __name__ == '__main__': # get ssh command clusterlib.ssh( - pool_id = pool_id, + cluster_id = pool_id, masterui = masterui, webui = webui, jupyter = jupyter, diff --git a/dtde/clusterlib.py b/dtde/clusterlib.py index 34d9ee9d..bdf14128 100644 --- a/dtde/clusterlib.py +++ b/dtde/clusterlib.py @@ -81,13 +81,13 @@ def generate_cluster_start_task( def create_cluster( - custom_script:str, - cluster_id:str, + custom_script: str, + cluster_id: str, vm_count, vm_low_pri_count, vm_size, - username:str, - password:str, + username: str, + password: str, wait=True): """ Create a spark cluster @@ -257,7 +257,13 @@ def delete_cluster(cluster_id: str): # job id is equal to pool id job_id = pool_id - job_exists = batch_client.job.exists(job_id) + job_exists = True + + try: + batch_client.job.get(job_id) + except: + job_exists = False + pool_exists = batch_client.pool.exists(pool_id) if job_exists: @@ -269,6 +275,7 @@ def delete_cluster(cluster_id: str): if job_exists or pool_exists: print("Deleting cluster {0}".format(cluster_id)) + def ssh( cluster_id: str, username: str=None, @@ -296,6 +303,7 @@ def ssh( remote_login_settings = batch_client.compute_node.get_remote_login_settings( cluster_id, master_node_id) + master_node_ip = remote_login_settings.remote_login_ip_address master_node_port = remote_login_settings.remote_login_port ssh_command = CommandBuilder('ssh') @@ -308,7 +316,7 @@ def ssh( ssh_command.add_option("-L", "{0}:localhost:{1}".format(port[0], port[1])) user = username if username is not None else '' - ssh_command.add_argument("{0}@{1} -p {2}".format(user, master_node_id, master_node_port)) + ssh_command.add_argument("{0}@{1} -p {2}".format(user, master_node_ip, master_node_port)) command = ssh_command.to_str() ssh_command_array = command.split()