This commit is contained in:
Timothee Guerin 2017-07-11 14:25:17 -07:00
Родитель 222821cdb2
Коммит e9c7f74e79
5 изменённых файлов: 99 добавлений и 66 удалений

Просмотреть файл

@ -23,7 +23,8 @@ def cluster_install_cmd(zip_resource_file: batch_models.ResourceFile, custom_scr
'sed -i -e "s/Defaults requiretty.*/ #Defaults requiretty/g" /etc/sudoers',
'unzip $AZ_BATCH_TASK_WORKING_DIR/%s' % zip_resource_file.file_path,
'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/main.sh',
'dos2unix $AZ_BATCH_TASK_WORKING_DIR/main.sh', # Convert windows line ending to unix if applicable
# Convert windows line ending to unix if applicable
'dos2unix $AZ_BATCH_TASK_WORKING_DIR/main.sh',
'$AZ_BATCH_TASK_WORKING_DIR/main.sh'
]
@ -194,7 +195,8 @@ def create_cluster(
vm_size=vm_size,
target_dedicated_nodes=vm_count,
target_low_priority_nodes=vm_low_pri_count,
start_task=generate_cluster_start_task(pool_id, zip_resource_file, custom_script),
start_task=generate_cluster_start_task(
pool_id, zip_resource_file, custom_script),
enable_inter_node_communication=True,
max_tasks_per_node=1)
@ -203,51 +205,17 @@ def create_cluster(
pool,
wait)
return # TODO
# Create job
job = batch_models.JobAddParameter(
id=job_id,
pool_info=batch_models.PoolInformation(pool_id=pool_id))
# Add job to batch
batch_client.job.add(job)
# create application/coordination commands
coordination_cmd = cluster_connect_cmd()
application_cmd = cluster_start_cmd(
constants._WEBUI_PORT, constants._JUPYTER_PORT)
# reuse pool_id as multi-instance task id
task_id = pool_id
# Create multi-instance task
task = batch_models.TaskAddParameter(
id=task_id,
command_line=util.wrap_commands_in_shell(application_cmd),
resource_files=[],
multi_instance_settings=batch_models.MultiInstanceSettings(
number_of_instances=vm_count + vm_low_pri_count,
coordination_command_line=util.wrap_commands_in_shell(
coordination_cmd),
common_resource_files=[]))
# Add task to batch job (which has the same name as pool_id)
try:
batch_client.task.add(job_id=job_id, task=task)
except batch_models.batch_error.BatchErrorException as err:
util.print_batch_exception(err)
if err.error.code != 'JobExists':
raise
else:
print('Job {!r} already exists'.format(job_id))
# batch_client.job.add(job) # TODO
# Wait for the app to finish
if wait == True:
util.wait_for_tasks_to_complete(
job_id,
timedelta(minutes=60))
util.wait_for_master_to_be_ready(pool_id)
if username is not None and password is not None:
create_user(pool_id, username, password)
@ -263,7 +231,7 @@ def create_user(
batch_client = azure_api.get_batch_client()
# TODO wait for pool to be ready?
# Create new ssh user for the master node
batch_client.compute_node.add_user(
pool_id,

Просмотреть файл

@ -40,6 +40,24 @@ def wait_for_tasks_to_complete(job_id, timeout):
raise TimeoutError("Timed out waiting for tasks to complete")
def wait_for_master_to_be_ready(cluster_id: str):
batch_client = azure_api.get_batch_client()
master_node_id = None
while True:
if not master_node_id:
master_node_id = get_master_node_id(cluster_id)
if not master_node_id:
time.sleep(5)
continue
master_node = batch_client.compute_node.get(cluster_id, master_node_id)
if master_node.state == batch_models.ComputeNodeState.idle or master_node.state == batch_models.ComputeNodeState.running:
break
else:
time.sleep(10)
time.sleep(5)
def upload_file_to_container(container_name, file_path, use_full_path) -> batch_models.ResourceFile:
"""
Uploads a local file to an Azure Blob storage container.
@ -92,6 +110,7 @@ def print_configuration(config):
print("\nConfiguration is:")
print(configuration_dict)
def get_master_node_id_from_pool(pool: batch_models.CloudPool):
"""
:returns: the id of the node that is the assigned master of this pool
@ -105,6 +124,7 @@ def get_master_node_id_from_pool(pool: batch_models.CloudPool):
return None
def get_master_node_id(pool_id):
batch_client = azure_api.get_batch_client()
return get_master_node_id_from_pool(batch_client.pool.get(pool_id))
@ -124,7 +144,7 @@ def create_pool_if_not_exist(pool, wait=True):
try:
batch_client.pool.add(pool)
if wait:
wait_for_all_nodes_state(batch_client, pool, frozenset(
wait_for_all_nodes_state(pool, frozenset(
(batch_models.ComputeNodeState.start_task_failed,
batch_models.ComputeNodeState.unusable,
batch_models.ComputeNodeState.idle)

Просмотреть файл

@ -13,11 +13,11 @@ def setup_node():
def setup_as_master():
print("Setting up as master.")
spark.setup_connection()
spark.start_spark()
spark.start_spark_master()
# spark.start_spark_worker()
def setup_as_worker():
print("Setting up as worker.")
spark.setup_connection()
# spark.start_spark()
spark.start_spark_worker()

Просмотреть файл

@ -61,14 +61,18 @@ def find_master(client: batch.BatchServiceClient) -> bool:
master = get_master_node_id(pool)
if master:
print("Pool already has a master '%s'. This node will be a worker" % master)
return False
if master == config.node_id:
print("Node is already the master '%s'" % master)
return True
else:
print("Pool already has a master '%s'. This node will be a worker" % master)
return False
else:
print("Pool has no master. Fighting for the throne! (%i/5)" % (i + 1))
result = try_assign_self_as_master(client, pool)
if result:
print("The battle has been won! Node %s is the new master.", config.node_id)
print("The battle has been won! Node %s is the new master." % config.node_id)
return True
raise CannotAllocateMasterError("Unable to assign node as a master in 5 tries")

Просмотреть файл

@ -20,6 +20,10 @@ def get_pool() -> batchmodels.CloudPool:
return batch_client.pool.get(config.pool_id)
def get_node(node_id: str) -> batchmodels.ComputeNode:
return batch_client.compute_node.get(config.pool_id, node_id)
def list_nodes() -> List[batchmodels.ComputeNode]:
"""
List all the nodes in the pool.
@ -49,8 +53,9 @@ def setup_connection():
"""
wait_for_pool_ready()
print("Pool is now steady. Setting up master")
master_node_ip = pick_master.get_master_node_id(batch_client.pool.get(config.pool_id))
master_node_ip = pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
nodes = list_nodes()
master_file = open(os.path.join(spark_conf_folder, "master"), 'w')
@ -58,10 +63,12 @@ def setup_connection():
for node in nodes:
if node.id == master_node_ip:
print("Adding node %s as a master" % node.id)
print("Adding node %s as a master with ip %s" %
(node.id, node.ip_address))
master_file.write("%s\n" % node.ip_address)
else:
print("Adding node %s as a slave" % node.id)
print("Adding node %s as a slave with ip %s" %
(node.id, node.ip_address))
slaves_file.write("%s\n" % node.ip_address)
master_file.close()
@ -69,7 +76,8 @@ def setup_connection():
def generate_jupyter_config():
master_node_ip = pick_master.get_master_node_id(batch_client.pool.get(config.pool_id))
master_node_ip = pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
return dict(
display_name="PySpark",
language="python",
@ -91,34 +99,67 @@ def generate_jupyter_config():
def setup_jupyter():
print("Setting up jupyter.")
call(["/anaconda/envs/py35/bin/jupyter", "notebook", "--generate-config"])
with open("test.txt", "a") as config_file:
jupyter_config_file = os.path.join(os.path.expanduser("~"), ".jupyter/jupyter_notebook_config.py")
with open(jupyter_config_file, "a") as config_file:
config_file.write('\n')
config_file.write('c.NotebookApp.token=""\n')
config_file.write('c.NotebookApp.password=""\n')
shutil.rmtree('/usr/local/share/jupyter/kernels')
os.makedirs('/usr/local/share/jupyter/kernels/pyspark', exist_ok=True)
os.makedirs('/usr/local/share/jupyter/kernels/pyspark', exist_ok = True)
with open('/usr/local/share/jupyter/kernels/pyspark/kernel.json', 'w') as outfile:
data = generate_jupyter_config()
data=generate_jupyter_config()
json.dump(data, outfile)
def start_jupyter():
jupyter_port = config.jupyter_port
my_env = os.environ.copy()
my_env["PYSPARK_DRIVER_PYTHON"] = "/anaconda/envs/py35/bin/jupyter"
my_env["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook --no-browser --port='%s'" % jupyter_port
jupyter_port=config.jupyter_port
# call("pyspark", "&", env=my_env)
Popen(["pyspark"], close_fds=True)
my_env=os.environ.copy()
my_env["PYSPARK_DRIVER_PYTHON"]="/anaconda/envs/py35/bin/jupyter"
my_env["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser --port='%s'" % jupyter_port
Popen(["pyspark"], close_fds = True)
def start_spark():
webui_port = config.webui_port
def wait_for_master():
print("Waiting for master to be ready.")
master_node_ip=pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
while True:
master_node=get_node(master_node_ip)
exe = os.path.join(spark_home, "sbin", "start-all.sh")
call([exe, "--webui-port", str(webui_port), "&"])
if master_node.state == batchmodels.ComputeNodeState.idle or master_node.state == batchmodels.ComputeNodeState.running:
break
else:
print("Still waiting on master")
time.sleep(10)
def start_spark_master():
webui_port=config.webui_port
master_ip=get_node(config.node_id).ip_address
exe=os.path.join(spark_home, "sbin", "start-master.sh")
cmd=[exe, "-h", master_ip]
print("Starting master with '%s'" % " ".join(cmd))
call(cmd)
setup_jupyter()
start_jupyter()
def start_spark_worker():
wait_for_master()
exe=os.path.join(spark_home, "sbin", "start-slave.sh")
master_node_id=pick_master.get_master_node_id(
batch_client.pool.get(config.pool_id))
master_node=get_node(master_node_id)
my_env=os.environ.copy()
my_env["SPARK_MASTER_IP"]=master_node.ip_address
cmd=[exe, "spark://%s:7077" % master_node.ip_address]
print("Connecting to master with '%s'" % " ".join(cmd))
call(cmd)