This commit is contained in:
Timothee Guerin 2018-03-23 09:39:05 -07:00
Родитель ee307e0a01
Коммит 373c04a218
9 изменённых файлов: 37 добавлений и 30 удалений

Просмотреть файл

@ -7,8 +7,8 @@ set -e
# --------------------
# Setup custom scripts
# --------------------
custom_script_dir=$DOCKER_WORKING_DIR/custom-scripts
aztk_dir=$DOCKER_WORKING_DIR/aztk
custom_script_dir=$AZTK_WORKING_DIR/custom-scripts
aztk_dir=$AZTK_WORKING_DIR/aztk
# -----------------------
# Preload jupyter samples
@ -28,7 +28,7 @@ done
echo "Starting setup using Docker"
$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/pip install -r $(dirname $0)/requirements.txt
export PYTHONPATH=$PYTHONPATH:$DOCKER_WORKING_DIR
export PYTHONPATH=$PYTHONPATH:$AZTK_WORKING_DIR
echo "Running main.py script"
$(pyenv root)/versions/$AZTK_PYTHON_VERSION/bin/python $(dirname $0)/main.py setup-spark-container

Просмотреть файл

@ -6,11 +6,11 @@ from Crypto.Cipher import AES, PKCS1_OAEP
from datetime import datetime, timezone, timedelta
import yaml
'''
Creates a user if the user configuration file at $DOCKER_WORKING_DIR/user.yaml exists
Creates a user if the user configuration file at $AZTK_WORKING_DIR/user.yaml exists
'''
def create_user(batch_client):
path = os.path.join(os.environ['DOCKER_WORKING_DIR'], "user.yaml")
path = os.path.join(os.environ['AZTK_WORKING_DIR'], "user.yaml")
if not os.path.isfile(path):
print("No user to create.")
@ -43,7 +43,7 @@ def decrypt_password(user_conf):
tag = user_conf['tag']
# Read private key
private_key = RSA.import_key(open(os.path.join(os.environ['DOCKER_WORKING_DIR'], 'id_rsa')).read())
private_key = RSA.import_key(open(os.path.join(os.environ['AZTK_WORKING_DIR'], 'id_rsa')).read())
# Decrypt the session key with the public RSA key
cipher_rsa = PKCS1_OAEP.new(private_key)
session_key = cipher_rsa.decrypt(encrypted_aes_session_key)

Просмотреть файл

@ -12,17 +12,16 @@ def setup_node(docker_run_cmd: str):
client = config.batch_client
create_user.create_user(batch_client=client)
if os.environ['AZ_BATCH_NODE_IS_DEDICATED'] == "true" or os.environ['MIXED_MODE'] == "False":
if os.environ['AZ_BATCH_NODE_IS_DEDICATED'] == "true" or os.environ['AZTK_MIXED_MODE'] == "False":
is_master = pick_master.find_master(client)
else:
is_master = False
wait_until_master_selected.main()
is_worker = not is_master or os.environ["WORKER_ON_MASTER"]
is_worker = not is_master or os.environ["AZTK_WORKER_ON_MASTER"]
master_node_id = pick_master.get_master_node_id(config.batch_client.pool.get(config.pool_id))
master_node = config.batch_client.compute_node.get(config.pool_id, master_node_id)
master_node_ip = master_node.ip_address
os.environ['AZTK_WORKING_DIR'] = "/mnt/batch/tasks/startup/wd"
env = os.environ.copy()
env["AZTK_IS_MASTER"] = is_master

Просмотреть файл

@ -14,7 +14,7 @@ def _read_yaml_file(path=None):
custom_scripts = yaml.load(stream)
except yaml.YAMLError as err:
print("Error in cluster.yaml: {0}".format(err))
return custom_scripts
@ -25,7 +25,7 @@ def _run_on_this_node(script_obj=None, is_master=False, is_worker=False):
return True
if script_obj['runOn'] == 'all-nodes':
return True
return False
@ -74,7 +74,7 @@ def run_custom_scripts(is_master: bool = False, is_worker: bool = False):
else:
os.environ["IS_WORKER"] = "0"
custom_scripts_dir = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'custom-scripts')
custom_scripts_dir = os.path.join(os.environ['AZTK_WORKING_DIR'], 'custom-scripts')
custom_scripts = _read_yaml_file(os.path.join(custom_scripts_dir, 'custom-scripts.yaml'))

Просмотреть файл

@ -22,7 +22,7 @@ def setup_as_master():
print("Setting up as master.")
setup_connection()
start_spark_master()
if os.environ["WORKER_ON_MASTER"] == "True":
if os.environ["AZTK_WORKER_ON_MASTER"] == "True":
start_spark_worker()
@ -161,8 +161,8 @@ def setup_conf():
def setup_ssh_keys():
pub_key_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'id_rsa.pub')
priv_key_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'id_rsa')
pub_key_path_src = os.path.join(os.environ['AZTK_WORKING_DIR'], 'id_rsa.pub')
priv_key_path_src = os.path.join(os.environ['AZTK_WORKING_DIR'], 'id_rsa')
ssh_key_dest = '/root/.ssh'
if not os.path.exists(ssh_key_dest):
@ -173,26 +173,26 @@ def setup_ssh_keys():
def copy_spark_env():
spark_env_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/spark-env.sh')
spark_env_path_src = os.path.join(os.environ['AZTK_WORKING_DIR'], 'conf/spark-env.sh')
spark_env_path_dest = os.path.join(spark_home, 'conf/spark-env.sh')
copyfile(spark_env_path_src, spark_env_path_dest)
def copy_spark_defaults():
spark_default_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/spark-defaults.conf')
spark_default_path_src = os.path.join(os.environ['AZTK_WORKING_DIR'], 'conf/spark-defaults.conf')
spark_default_path_dest = os.path.join(spark_home, 'conf/spark-defaults.conf')
copyfile(spark_default_path_src, spark_default_path_dest)
def copy_core_site():
spark_core_site_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/core-site.xml')
spark_core_site_src = os.path.join(os.environ['AZTK_WORKING_DIR'], 'conf/core-site.xml')
spark_core_site_dest = os.path.join(spark_home, 'conf/core-site.xml')
copyfile(spark_core_site_src, spark_core_site_dest)
def copy_jars():
# Copy jars to $SPARK_HOME/jars
spark_default_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'jars')
spark_default_path_src = os.path.join(os.environ['AZTK_WORKING_DIR'], 'jars')
spark_default_path_dest = os.path.join(spark_home, 'jars')
try:

Просмотреть файл

@ -4,6 +4,7 @@
# Usage:
# setup_node.sh [container_name] [gpu_enabled] [docker_repo] [docker_cmd]
export AZTK_WORKING_DIR=/mnt/batch/tasks/startup/wd
container_name=$1
gpu_enabled=$2
@ -65,7 +66,7 @@ else
python3 --version
# Install python dependencies
pip3 install -r $(dirname $0)/requirements.txt
export PYTHONPATH=$PYTHONPATH:$DOCKER_WORKING_DIR
export PYTHONPATH=$PYTHONPATH:$AZTK_WORKING_DIR
echo "Running setup python script"
python3 $(dirname $0)/main.py setup-node $docker_run_cmd
@ -78,7 +79,7 @@ else
# wait until container setup is complete
docker exec spark /bin/bash -c 'python $DOCKER_WORKING_DIR/aztk/node_scripts/wait_until_setup_complete.py'
docker exec spark /bin/bash -c 'python $AZTK_WORKING_DIR/aztk/node_scripts/wait_until_setup_complete.py'
# Setup symbolic link for the docker logs
docker_log=$(docker inspect --format='{{.LogPath}}' $container_name)

Просмотреть файл

@ -49,14 +49,10 @@ def __docker_run_cmd(docker_repo: str = None,
cmd.add_option('-e', 'SP_STORAGE_RESOURCE_ID=$SP_STORAGE_RESOURCE_ID')
cmd.add_option('-e', 'AZ_BATCH_POOL_ID=$AZ_BATCH_POOL_ID')
cmd.add_option('-e', 'AZ_BATCH_NODE_ID=$AZ_BATCH_NODE_ID')
cmd.add_option('-e', 'AZTK_WORKER_ON_MASTER=$AZTK_WORKER_ON_MASTER')
cmd.add_option('-e', 'AZTK_MIXED_MODE=$AZTK_MIXED_MODE')
cmd.add_option(
'-e', 'AZ_BATCH_NODE_IS_DEDICATED=$AZ_BATCH_NODE_IS_DEDICATED')
if worker_on_master is not None:
cmd.add_option('-e', 'WORKER_ON_MASTER={}'.format(worker_on_master))
else:
# default to True if not specified
cmd.add_option('-e', 'WORKER_ON_MASTER={}'.format(True))
cmd.add_option('-e', 'MIXED_MODE={}'.format(mixed_mode))
cmd.add_option('-e', 'SPARK_WEB_UI_PORT=$SPARK_WEB_UI_PORT')
cmd.add_option('-e', 'SPARK_WORKER_UI_PORT=$SPARK_WORKER_UI_PORT')
cmd.add_option('-e', 'SPARK_CONTAINER_NAME=$SPARK_CONTAINER_NAME')
@ -84,6 +80,17 @@ def __docker_run_cmd(docker_repo: str = None,
return cmd.to_str()
def _get_aztk_environment(worker_on_master, mixed_mode):
envs = []
envs.append(batch_models.EnvironmentSetting(name="AZTK_MIXED_MODE", value=mixed_mode))
if worker_on_master is not None:
envs.append(batch_models.EnvironmentSetting(
name="AZTK_WORKER_ON_MASTER", value=worker_on_master))
else:
envs.append(batch_models.EnvironmentSetting(
name="AZTK_WORKER_ON_MASTER", value=False))
return envs
def __get_docker_credentials(spark_client):
creds = []
docker = spark_client.secrets_config.docker
@ -214,7 +221,7 @@ def generate_cluster_start_task(
name="SPARK_CONTAINER_NAME", value=spark_container_name),
batch_models.EnvironmentSetting(
name="SPARK_SUBMIT_LOGS_FILE", value=spark_submit_logs_file),
] + __get_docker_credentials(spark_client)
] + __get_docker_credentials(spark_client) + _get_aztk_environment(worker_on_master, mixed_mode)
# start task command
command = __cluster_install_cmd(zip_resource_file, gpu_enabled, docker_repo, plugins, worker_on_master, file_shares, mixed_mode)

Просмотреть файл

@ -19,7 +19,7 @@ def __app_cmd():
docker_exec.add_argument("-i")
docker_exec.add_option("-e", "AZ_BATCH_TASK_WORKING_DIR=$AZ_BATCH_TASK_WORKING_DIR")
docker_exec.add_option("-e", "AZ_BATCH_JOB_ID=$AZ_BATCH_JOB_ID")
docker_exec.add_argument("spark /bin/bash >> output.log 2>&1 -c \"python \$DOCKER_WORKING_DIR/job_submission.py\"")
docker_exec.add_argument("spark /bin/bash >> output.log 2>&1 -c \"python \$AZTK_WORKING_DIR/job_submission.py\"")
return docker_exec.to_str()

Просмотреть файл

@ -83,7 +83,7 @@ def generate_task(spark_client, container_id, application):
task_cmd.add_argument('spark /bin/bash >> output.log 2>&1')
task_cmd.add_argument('-c "cd $AZ_BATCH_TASK_WORKING_DIR; ' \
'\$(pyenv root)/versions/\$AZTK_PYTHON_VERSION/bin/python ' \
'\$DOCKER_WORKING_DIR/submit.py"')
'\$AZTK_WORKING_DIR/submit.py"')
# Create task
task = batch_models.TaskAddParameter(