Setup spark from the start task

This commit is contained in:
Timothee Guerin 2017-07-10 16:04:17 -07:00
Родитель 0385df991f
Коммит a4026fa702
4 изменённых файлов: 95 добавлений и 72 удалений

Просмотреть файл

@ -252,6 +252,21 @@ def create_cluster(
if username is not None and password is not None:
create_user(pool_id, username, password)
master_node_metadata_key = "_spark_master_node"
def get_master_node_id(pool: batch_models.CloudPool):
"""
:returns: the id of the node that is the assigned master of this pool
"""
if pool.metadata is None:
return None
for metadata in pool.metadata:
if metadata.name == master_node_metadata_key:
return metadata.value
return None
def create_user(
pool_id,
@ -262,23 +277,24 @@ def create_user(
"""
batch_client = azure_api.get_batch_client()
# TODO wait
# Get master node id from task
master_node_id = None
try:
# job and task are both named pool_id
master_node_id = batch_client.task \
.get(job_id=pool_id, task_id=pool_id) \
.node_info.node_id
except AttributeError as err:
print('cluster, "{}", is still setting up - '.format(pool_id) +
'please wait for your cluster to be created ' +
'before creating a user')
exit()
# master_node_id = None
# try:
# # job and task are both named pool_id
# master_node_id = batch_client.task \
# .get(job_id=pool_id, task_id=pool_id) \
# .node_info.node_id
# except AttributeError as err:
# print('cluster, "{}", is still setting up - '.format(pool_id) +
# 'please wait for your cluster to be created ' +
# 'before creating a user')
# exit()
pool = batch_client.pool.get(pool_id)
# Create new ssh user for the master node
batch_client.compute_node.add_user(
pool_id,
master_node_id,
get_master_node_id(pool),
batch_models.ComputeNodeUser(
username,
is_admin=True,
@ -391,9 +407,7 @@ def ssh(
batch_client = azure_api.get_batch_client()
# Get master node id from task (job and task are both named pool_id)
master_node_id = batch_client.task \
.get(job_id=pool_id, task_id=pool_id) \
.node_info.node_id
master_node_id = get_master_node_id(batch_client.pool.get(pool_id))
# get remote login settings for the user
remote_login_settings = batch_client.compute_node.get_remote_login_settings(

Просмотреть файл

@ -9,6 +9,10 @@ pool_id = os.environ["AZ_BATCH_POOL_ID"]
node_id = os.environ["AZ_BATCH_NODE_ID"]
is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"]
# TODO save this as env/metadata
master_ui_port = 8082
webui_port = 4040
jupyter_port = 7777
def get_client() -> batch.BatchServiceClient:
credentials = batchauth.SharedKeyCredentials(

Просмотреть файл

@ -14,9 +14,9 @@ def setup_node():
def setup_as_master():
print("Setting up as master.")
spark.setup_connection()
# spark.start_spark()
spark.start_spark()
def setup_as_worker():
print("Setting up as worker.")
# spark.start_spark()
spark.start_spark()

Просмотреть файл

@ -3,12 +3,16 @@
"""
import time
import os
from subprocess import call
from typing import List
import azure.batch.batch_service_client as batch
import azure.batch.models as batchmodels
from core import config
import shutil
import json
batch_client = config.batch_client
spark_home = "/dsvm/tools/spark/current"
spark_conf_folder = os.path.join(spark_home, "conf")
def get_pool() -> batchmodels.CloudPool:
@ -19,7 +23,8 @@ def list_nodes() -> List[batchmodels.ComputeNode]:
"""
List all the nodes in the pool.
"""
# TODO use continuation token & verify against current/target dedicated of pool
# TODO use continuation token & verify against current/target dedicated of
# pool
return batch_client.compute_node.list(config.pool_id)
@ -32,7 +37,8 @@ def wait_for_pool_ready() -> batchmodels.CloudPool:
if pool.allocation_state == batchmodels.AllocationState.steady:
return pool
else:
print("Waiting for pool to be steady. It is currently %s" % pool.allocation_state)
print("Waiting for pool to be steady. It is currently %s" %
pool.allocation_state)
time.sleep(5) # Sleep for 10 seconds before trying again
@ -43,9 +49,6 @@ def setup_connection():
wait_for_pool_ready()
print("Pool is now steady. Setting up master")
spark_home = "/dsvm/tools/spark/current"
spark_conf_folder = os.path.join(spark_home, "conf")
nodes = list_nodes()
master_file = open(os.path.join(spark_conf_folder, "master"), 'w')
@ -55,61 +58,63 @@ def setup_connection():
if node.id == config.node_id:
print("Adding node %s as a master" % node.id)
master_file.write("%s\n" % node.ip_address)
else:
else:
print("Adding node %s as a slave" % node.id)
slaves_file.write("%s\n" % node.ip_address)
master_file.close()
slaves_file.close()
def start_spark(webui_port, jupyter_port):
return [
# set SPARK_HOME environment vars
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
# get master node ip
'export MASTER_NODE=$(cat $SPARK_HOME/conf/master)',
def generate_jupyter_config():
return dict(
display_name="PySpark",
language="python",
argv=[
"/usr/bin/python3",
"-m",
"ipykernel",
"-f",
"",
],
env=dict(
SPARK_HOME="/dsvm/tools/spark/current",
PYSPARK_PYTHON="/usr/bin/python3",
PYSPARK_SUBMIT_ARGS="--master spark://${MASTER_NODE%:*}:7077 pyspark-shell",
)
)
# kick off start-all spark command (which starts web ui)
'($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)',
# jupyter setup: remove auth
'/anaconda/envs/py35/bin/jupyter notebook --generate-config',
'echo >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo c.NotebookApp.token=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo c.NotebookApp.password=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
def setup_jupyter():
print("Setting up jupyter.")
call(["/anaconda/envs/py35/bin/jupyter", "notebook", "--generate-config"])
with open("test.txt", "a") as config_file:
config_file.write('\n')
config_file.write('c.NotebookApp.token=""\n')
config_file.write('c.NotebookApp.password=""\n')
shutil.rmtree('/usr/local/share/jupyter/kernels')
os.makedirs('/usr/local/share/jupyter/kernels/pyspark', exist_ok=True)
# create jupyter kernal for pyspark
'rm -rf /usr/local/share/jupyter/kernels/*',
'mkdir /usr/local/share/jupyter/kernels/pyspark',
'touch /usr/local/share/jupyter/kernels/pyspark/kernel.json',
'echo { ' +
'\\\"display_name\\\": \\\"PySpark\\\", ' +
'\\\"language\\\": \\\"python\\\", ' +
'\\\"argv\\\": [ ' +
'\\\"/usr/bin/python3\\\", ' +
'\\\"-m\\\", ' +
'\\\"ipykernel\\\", ' +
'\\\"-f\\\", ' +
'\\\"{connection_file}\\\" ' +
'], ' +
'\\\"env\\\": { ' +
'\\\"SPARK_HOME\\\": \\\"/dsvm/tools/spark/current\\\", ' +
'\\\"PYSPARK_PYTHON\\\": \\\"/usr/bin/python3\\\", ' +
'\\\"PYSPARK_SUBMIT_ARGS\\\": ' +
'\\\"--master spark://${MASTER_NODE%:*}:7077 ' +
# '--executor-memory 6400M ' +
# '--driver-memory 6400M ' +
'pyspark-shell\\\" ' +
'}' +
'} >> /usr/local/share/jupyter/kernels/pyspark/kernel.json',
with open('/usr/local/share/jupyter/kernels/pyspark/kernel.json', 'w') as outfile:
data = generate_jupyter_config()
json.dump(data, outfile)
# start jupyter notebook
'(PYSPARK_DRIVER_PYTHON=/anaconda/envs/py35/bin/jupyter ' +
'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' +
'pyspark &)' # +
# '--master spark://${MASTER_NODE%:*}:7077 ' +
# '--executor-memory 6400M ' +
# '--driver-memory 6400M &)'
]
def start_jupyter():
jupyter_port = config.jupyter_port
my_env = os.environ.copy()
my_env["PYSPARK_DRIVER_PYTHON"] = "/anaconda/envs/py35/bin/jupyter"
my_env["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook --no-browser --port='%s'" % jupyter_port
call("pyspark", "&", env=my_env)
def start_spark():
webui_port = config.webui_port
exe = os.path.join(spark_home, "sbin", "start-all.sh")
call([exe, "--webui-port", str(webui_port), "&"])
setup_jupyter()
start_jupyter()