This commit is contained in:
Timothee Guerin 2017-07-10 12:33:49 -07:00
Родитель d59acb69f7
Коммит 58201d1b0d
12 изменённых файлов: 225 добавлений и 95 удалений

7
.vscode/settings.json поставляемый
Просмотреть файл

@ -1,7 +1,10 @@
{
"python.linting.pylintEnabled": false,
// "python.linting.pylintEnabled": false,
"search.exclude": {
"build/**": true,
"bin/**": true
}
},
"python.autoComplete.extraPaths": [
"${workspaceRoot}/node"
]
}

Просмотреть файл

@ -1,9 +1,7 @@
from . import util, constants, azure_api, upload_node_scripts
import random
from datetime import datetime, timedelta
import azure.batch.models as batch_models
from subprocess import call
import sys
import azure.batch.models as batch_models
from . import util, constants, azure_api, upload_node_scripts
pool_admin_user = batch_models.UserIdentity(
auto_user=batch_models.AutoUserSpecification(
@ -24,9 +22,9 @@ def cluster_install_cmd(zip_resource_file: batch_models.ResourceFile, custom_scr
# To avoid error: "sudo: sorry, you must have a tty to run sudo"
'sed -i -e "s/Defaults requiretty.*/ #Defaults requiretty/g" /etc/sudoers',
'/bin/sh -c "unzip $AZ_BATCH_TASK_WORKING_DIR/%s"' % zip_resource_file.file_path,
'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/setup_node.sh',
'dos2unix $AZ_BATCH_TASK_WORKING_DIR/setup_node.sh', # Convert windows line ending to unix if applicable
'/bin/bash -c "$AZ_BATCH_TASK_WORKING_DIR/setup_node.sh"'
'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/main.sh',
'dos2unix $AZ_BATCH_TASK_WORKING_DIR/main.sh', # Convert windows line ending to unix if applicable
'/bin/bash -c "$AZ_BATCH_TASK_WORKING_DIR/main.sh"'
]
if custom_script_file is not None:
@ -119,7 +117,7 @@ def cluster_start_cmd(webui_port, jupyter_port):
]
def generate_cluster_start_task(zip_resource_file: batch_models.ResourceFile, custom_script: str = None):
def generate_cluster_start_task(cluster_id: str, zip_resource_file: batch_models.ResourceFile, custom_script: str = None):
"""
This will return the start task object for the pool to be created.
:param custom_script str: Path to a local file to be uploaded to storage and run after spark started.
@ -131,7 +129,7 @@ def generate_cluster_start_task(zip_resource_file: batch_models.ResourceFile, cu
if custom_script is not None:
resource_files.append(
util.upload_file_to_container(
container_name=pool_id,
container_name=cluster_id,
file_path=custom_script,
use_full_path=True))
@ -196,7 +194,7 @@ def create_cluster(
vm_size=vm_size,
target_dedicated_nodes=vm_count,
target_low_priority_nodes=vm_low_pri_count,
start_task=generate_cluster_start_task(zip_resource_file, custom_script),
start_task=generate_cluster_start_task(pool_id, zip_resource_file, custom_script),
enable_inter_node_communication=True,
max_tasks_per_node=1)

Просмотреть файл

@ -1,39 +1,45 @@
import os
import zipfile
from . import constants, util
from . import util
root = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")
local_tmp_zipfile = "tmp/node-scripts.zip";
local_tmp_zipfile = "tmp/node-scripts.zip"
def ensure_dir(file_path):
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
def zipdir(path, ziph):
"""
Zip all the files in the given directory into the zip file handler
"""
for root, dirs, files in os.walk(path):
for base, _, files in os.walk(path):
relative_folder = os.path.relpath(base, path)
for file in files:
ziph.write(os.path.join(root, file), file)
ziph.write(os.path.join(base, file),
os.path.join(relative_folder, file))
def zip():
def create_zip():
ensure_dir(local_tmp_zipfile)
zipf = zipfile.ZipFile(local_tmp_zipfile, 'w', zipfile.ZIP_DEFLATED)
zipdir(os.path.join(root, "node"), zipf)
zipdir(os.path.join(root, "node_scripts"), zipf)
zipf.close()
print("Ziped file")
def upload():
print("Uploading node scripts...")
return util.upload_file_to_container(
container_name = "spark-node-scripts",
file_path = local_tmp_zipfile,
use_full_path = False)
container_name="spark-node-scripts",
file_path=local_tmp_zipfile,
use_full_path=False)
def zip_and_upload():
zip()
return upload()
create_zip()
return upload()

Просмотреть файл

@ -1,30 +0,0 @@
import os
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batchauth
import azure.batch.models as batchmodels
pool_id = "sparktest"
node_id = "abc"
master_node_metadata_key = "_spark_master_node"
def get_client():
# account_name = os.environ["AZ_BATCH_ACCOUNT_NAME"]
# account_key = os.environ["ACCOUNT_KEY"]
# account_url = os.environ["ACCOUNT_URL"]
account_name = "prodtest1"
account_key = "o7qjfiRHz1CrjeDLIRUGTZfepWkdJUTc1OHcNEuD4QKUdWFzJ2rAXZlLv9MCyL8ZPXsBTSxash3q+sLdQLEJXA=="
account_url = "https://prodtest1.brazilsouth.batch.azure.com"
credentials = batchauth.SharedKeyCredentials(
account_name,
account_key)
return batch.BatchServiceClient(credentials, base_url=account_url)
if __name__ == "__main__":
client = get_client()
client.pool.patch(pool_id, batchmodels.PoolPatchParameter(
metadata=[],
))
print("Cleared metadata!")

Просмотреть файл

Просмотреть файл

@ -0,0 +1,25 @@
import os
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batchauth
account_name = os.environ["AZ_BATCH_ACCOUNT_NAME"]
account_key = os.environ["ACCOUNT_KEY"]
account_url = os.environ["ACCOUNT_URL"]
pool_id = os.environ["AZ_BATCH_POOL_ID"]
node_id = os.environ["AZ_BATCH_NODE_ID"]
is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"]
def get_client() -> batch.BatchServiceClient:
credentials = batchauth.SharedKeyCredentials(
account_name,
account_key)
return batch.BatchServiceClient(credentials, base_url=account_url)
batch_client = get_client()
print("Pool id is", node_id)
print("Node id is", node_id)
print("Account name", account_name)
print("Is dedicated", is_dedicated)

Просмотреть файл

Просмотреть файл

@ -2,34 +2,18 @@
This is the code that all nodes will run in their start task to try to allocate the master
"""
import os
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batchauth
import azure.batch.models as batchmodels
import azure.batch.models.batch_error as batcherror
from core import config
pool_id = os.environ["AZ_BATCH_POOL_ID"]
node_id = os.environ["AZ_BATCH_NODE_ID"]
account_name = os.environ["AZ_BATCH_ACCOUNT_NAME"]
account_key = os.environ["ACCOUNT_KEY"]
account_url = os.environ["ACCOUNT_URL"]
is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"]
print("Pool id is", node_id)
print("Node id is", node_id)
print("Account name", account_name)
print("Is dedicated", is_dedicated)
master_node_metadata_key = "_spark_master_node"
def get_client():
credentials = batchauth.SharedKeyCredentials(
account_name,
account_key)
return batch.BatchServiceClient(credentials, base_url=account_url)
class CannotAllocateMasterError(Exception):
pass
def get_master_node_id(pool: batchmodels.CloudPool):
@ -49,10 +33,10 @@ def get_master_node_id(pool: batchmodels.CloudPool):
def try_assign_self_as_master(client: batch.BatchServiceClient, pool: batchmodels.CloudPool):
currentMetadata = pool.metadata or []
newMetadata = currentMetadata + \
[{"name": master_node_metadata_key, "value": node_id}]
[{"name": master_node_metadata_key, "value": config.node_id}]
try:
client.pool.patch(pool_id, batchmodels.PoolPatchParameter(
client.pool.patch(config.pool_id, batchmodels.PoolPatchParameter(
metadata=newMetadata
), batchmodels.PoolPatchOptions(
if_match=pool.e_tag,
@ -63,12 +47,16 @@ def try_assign_self_as_master(client: batch.BatchServiceClient, pool: batchmodel
return False
def find_master(client: batch.BatchServiceClient):
def find_master(client: batch.BatchServiceClient) -> bool:
"""
Try to set a master for the cluster. If the node is dedicated it will try to assign itself if none already claimed it.
:returns bool: If the node is the master it returns true otherwise returns false
"""
# If not dedicated the node cannot be a master
if not is_dedicated:
return True
if not config.is_dedicated:
return False
for i in range(0, 5):
pool = client.pool.get(pool_id)
pool = client.pool.get(config.pool_id)
master = get_master_node_id(pool)
if master:
@ -79,20 +67,7 @@ def find_master(client: batch.BatchServiceClient):
result = try_assign_self_as_master(client, pool)
if result:
print("The battle has been won! Node %s is the new master.", node_id)
print("The battle has been won! Node %s is the new master.", config.node_id)
return True
print("Unable to assign node as a master in 5 tries")
return False
def run():
client = get_client()
result = find_master(client)
if not result:
exit(1)
if __name__ == "__main__":
run()
raise CannotAllocateMasterError("Unable to assign node as a master in 5 tries")

Просмотреть файл

@ -0,0 +1,116 @@
"""
Code that handle spark configuration
"""
import time
import azure.batch.batch_service_client as batch
import azure.batch.models as batchmodels
from core import config
batch_client = config.batch_client
def get_pool() -> batchmodels.CloudPool:
return batch_client.pool.get(config.pool_id)
def list_nodes() -> List[batchmodels.ComputeNode]:
return batch_client.compute_node.list(config.pool_id)
def wait_for_pool_ready() -> batchmodels.CloudPool:
while True:
pool = get_pool()
if pool.state == batchmodels.AllocationState.steady
return pool
else:
print("Waiting for pool to be steady.")
time.sleep(5) # Sleep for 10 seconds before trying again
def connect_node(batch_client: batch.BatchServiceClient):
pool = wait_for_pool_ready()
print("Pool is now steady. Setting up master")
spark_home = "/dsvm/tools/spark/current"
spark_conf_folder = os.path.join(spark_home, "conf")
nodes = list_nodes()
for node in nodes:
return [
# set SPARK_HOME environment vars
'export PATH=$PATH:$SPARK_HOME/bin',
# copy a 'slaves' file from the slaves.template in $SPARK_HOME/conf
'cp $SPARK_HOME/conf/slaves.template $SPARK_HOME/conf/slaves'
# delete existing content & create a new line in the slaves file
'echo > $SPARK_HOME/conf/slaves',
# make empty 'master' file in $SPARK/conf
'cp $SPARK_HOME/conf/slaves $SPARK_HOME/conf/master',
# add batch pool ips to newly created slaves files
'IFS="," read -r -a workerips <<< $AZ_BATCH_HOST_LIST',
'for index in "${!workerips[@]}"',
'do echo "${workerips[index]}"',
'if [ "${AZ_BATCH_MASTER_NODE%:*}" = "${workerips[index]}" ]',
'then echo "${workerips[index]}" >> $SPARK_HOME/conf/master',
'else echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves',
'fi',
'done'
]
def start_cmd(webui_port, jupyter_port):
return [
# set SPARK_HOME environment vars
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
# get master node ip
'export MASTER_NODE=$(cat $SPARK_HOME/conf/master)',
# kick off start-all spark command (which starts web ui)
'($SPARK_HOME/sbin/start-all.sh --webui-port ' + str(webui_port) + ' &)',
# jupyter setup: remove auth
'/anaconda/envs/py35/bin/jupyter notebook --generate-config',
'echo >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo c.NotebookApp.token=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
'echo c.NotebookApp.password=\\\"\\\" >> $HOME/.jupyter/jupyter_notebook_config.py',
# create jupyter kernal for pyspark
'rm -rf /usr/local/share/jupyter/kernels/*',
'mkdir /usr/local/share/jupyter/kernels/pyspark',
'touch /usr/local/share/jupyter/kernels/pyspark/kernel.json',
'echo { ' +
'\\\"display_name\\\": \\\"PySpark\\\", ' +
'\\\"language\\\": \\\"python\\\", ' +
'\\\"argv\\\": [ ' +
'\\\"/usr/bin/python3\\\", ' +
'\\\"-m\\\", ' +
'\\\"ipykernel\\\", ' +
'\\\"-f\\\", ' +
'\\\"{connection_file}\\\" ' +
'], ' +
'\\\"env\\\": { ' +
'\\\"SPARK_HOME\\\": \\\"/dsvm/tools/spark/current\\\", ' +
'\\\"PYSPARK_PYTHON\\\": \\\"/usr/bin/python3\\\", ' +
'\\\"PYSPARK_SUBMIT_ARGS\\\": ' +
'\\\"--master spark://${MASTER_NODE%:*}:7077 ' +
# '--executor-memory 6400M ' +
# '--driver-memory 6400M ' +
'pyspark-shell\\\" ' +
'}' +
'} >> /usr/local/share/jupyter/kernels/pyspark/kernel.json',
# start jupyter notebook
'(PYSPARK_DRIVER_PYTHON=/anaconda/envs/py35/bin/jupyter ' +
'PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=' + str(jupyter_port) + '" ' +
'pyspark &)' # +
# '--master spark://${MASTER_NODE%:*}:7077 ' +
# '--executor-memory 6400M ' +
# '--driver-memory 6400M &)'
]

37
node_scripts/main.py Normal file
Просмотреть файл

@ -0,0 +1,37 @@
import sys
from install import pick_master
from core import config
def setup_as_master():
print("Setting up as master.")
def setup_as_worker():
print("Setting up as worker.")
def run():
if len(sys.argv) < 2:
print("Error: Expected at least one argument")
exit(1)
action = sys.argv[1]
if action == "install":
client = config.batch_client
is_master = pick_master.find_master(client)
if is_master:
setup_as_master()
else:
setup_as_worker()
else:
print("Action not supported")
if __name__ == "__main__":
run()

Просмотреть файл

@ -4,6 +4,6 @@ set -ev
export PATH=/anaconda/envs/py35/bin:$PATH
echo "Starting setup"
pip install -r requirements.txt
pip install -r $(dirname $0)/requirements.txt
echo "Installed dependencies, picking master"
python pick_master.py
python $(dirname $0)/main.py install

Просмотреть файл