Feature: docker
This commit is contained in:
JS 2017-08-17 12:21:00 -04:00 коммит произвёл Timothee Guerin
Родитель fbfdaf489d
Коммит d94c43ed82
23 изменённых файлов: 397 добавлений и 107 удалений

Просмотреть файл

@ -22,6 +22,8 @@ A suite of distributed tools to help engineers scale their work into Azure.
## Getting Started ## Getting Started
The entire experience of this package is centered around a few commands.
### Create and setup your cluster ### Create and setup your cluster
First, create your cluster: First, create your cluster:
@ -42,6 +44,18 @@ azb spark cluster create \
--vm-size <vm-size> --vm-size <vm-size>
``` ```
By default, this package runs Spark in docker from an ubuntu16.04 base image on a ubuntu16.04 VM. More info on this image can be found in the **docker-images** folder in this repo.
You can opt out of using this image and use the Azure CentOS DSVM instead - the Azure CentOS DSVM has Spark 2.0.2 pre-installed (*as of 07/24/17*). To do this, use the --no-docker flag, and it will default to using the Azure DSVM.
```
azb spark cluster create \
--id <my-cluster-id> \
--size-low-pri <number of low-pri nodes> \
--vm-size <vm-size> \
--no-docker
```
You can also add a user directly in this command using the same inputs as the `add-user` command described bellow. You can also add a user directly in this command using the same inputs as the `add-user` command described bellow.
#### Add a user to your cluster to connect #### Add a user to your cluster to connect
@ -70,6 +84,7 @@ azb spark cluster add-user \
NOTE: The cluster id (--id) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters. NOTE: The cluster id (--id) can only contain alphanumeric characters including hyphens and underscores, and cannot contain more than 64 characters.
### Submit a Spark job ### Submit a Spark job
Now you can submit jobs to run against the cluster: Now you can submit jobs to run against the cluster:

3
custom-scripts/simple.sh Normal file
Просмотреть файл

@ -0,0 +1,3 @@
#!/bin/bash
echo "This is a custom script!"

Просмотреть файл

@ -1,5 +1,7 @@
#!/bin/bash #!/bin/bash
# Custom script to setup a WASB connection on the CentOS DSVM
export SPARK_HOME=/dsvm/tools/spark/current export SPARK_HOME=/dsvm/tools/spark/current
cd $SPARK_HOME/conf cd $SPARK_HOME/conf

0
docker-images/README.md Normal file
Просмотреть файл

Просмотреть файл

@ -0,0 +1 @@
# TODO

Просмотреть файл

@ -0,0 +1,47 @@
FROM ubuntu:16.04
# set up apt-get
RUN apt-get -y update
# install base
RUN apt-get -y install software-properties-common
# install java
RUN apt-add-repository ppa:webupd8team/java && \
apt-get -y update && \
apt-get -y install default-jdk
# install common tools
RUN apt-get -y install git
# install curl
RUN apt-get -y install curl
# install pip
RUN apt-get -y install python3-pip && \
pip3 install --upgrade pip
# install jupyter
RUN pip3 install jupyter && \
pip3 install --upgrade jupyter
# install p4j
RUN curl https://pypi.python.org/packages/1f/b0/882c144fe70cc3f1e55d62b8611069ff07c2d611d99f228503606dd1aee4/py4j-0.10.0.tar.gz | tar xvz -C /home && \
cd /home/py4j-0.10.0 && \
python3 setup.py install
# install spark
RUN curl https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0-bin-hadoop2.7.tgz | tar xvz -C /home
# set env vars
RUN export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64 && \
export SPARK_HOME=/home/spark-2.2.0-bin-hadoop2.7 && \
export PYSPARK_PYTHON=python3 && \
export PATH=$PATH:$SPARK_HOME/bin
# set env vars in .bashrc
RUN echo 'export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64' >> ~/.bashrc && \
echo 'export SPARK_HOME=/home/spark-2.2.0-bin-hadoop2.7' >> ~/.bashrc && \
echo 'export PYSPARK_PYTHON=python3' >> ~/.bashrc && \
echo 'export PATH=$PATH:$SPARK_HOME/bin' >> ~/.bashrc
CMD ["/bin/bash"]

Просмотреть файл

@ -0,0 +1 @@
Spark-2.2.0 image on ubuntu16.04 for Spark on Azure Batch

Просмотреть файл

@ -0,0 +1,33 @@
FROM jiata/spark:latest
ARG STORAGE_ACCOUNT_NAME
ARG STORAGE_ACCOUNT_KEY
ENV STORAGE_ACCOUNT_NAME $STORAGE_ACCOUNT_NAME
ENV STORAGE_ACCOUNT_KEY $STORAGE_ACCOUNT_KEY
ENV SPARK_HOME /home/spark-2.2.0-bin-hadoop2.7
RUN cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
RUN echo 'spark.jars /home/spark-2.2.0-bin-hadoop2.7/jars/azure-storage-2.0.0.jar,/home/spark-2.2.0-bin-hadoop2.7/jars/hadoop-azure-2.7.3.jar' >> $SPARK_HOME/conf/spark-defaults.conf
RUN echo '<?xml version="1.0" encoding="UTF-8"?>' >> $SPARK_HOME/conf/core-site.xml && \
echo '<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>' >> $SPARK_HOME/conf/core-site.xml && \
echo '<configuration>' >> $SPARK_HOME/conf/core-site.xml && \
echo '<property>' >> $SPARK_HOME/conf/core-site.xml && \
echo '<name>fs.AbstractFileSystem.wasb.Impl</name>' >> $SPARK_HOME/conf/core-site.xml && \
echo '<value>org.apache.hadoop.fs.azure.Wasb</value>' >> $SPARK_HOME/conf/core-site.xml && \
echo '</property>' >> $SPARK_HOME/conf/core-site.xml && \
echo '<property>' >> $SPARK_HOME/conf/core-site.xml && \
echo '<name>fs.azure.account.key.'$STORAGE_ACCOUNT_NAME'.blob.core.windows.net</name>' >> $SPARK_HOME/conf/core-site.xml && \
echo '<value>'$STORAGE_ACCOUNT_KEY'</value>' >> $SPARK_HOME/conf/core-site.xml && \
echo '</property>' >> $SPARK_HOME/conf/core-site.xml && \
echo '</configuration>' >> $SPARK_HOME/conf/core-site.xml
RUN apt-get install wget
RUN cd $SPARK_HOME/jars && \
wget http://repo1.maven.org/maven2/com/microsoft/azure/azure-storage/2.0.0/azure-storage-2.0.0.jar && \
wget http://central.maven.org/maven2/org/apache/hadoop/hadoop-azure/2.7.3/hadoop-azure-2.7.3.jar
CMD ["bin/bash"]

Просмотреть файл

@ -0,0 +1,9 @@
# Spark 2.2.0 with a built in WASB connector
This docker image downloads and configures the azure-storage and hadoop-azure jars so that spark can interact directly with Azure Storage.
## Build Instructions
docker build -t <image-tag> \
--build-arg STORAGE_ACCOUNT_NAME=<your-storage-account-name> \
--build-arg STORAGE_ACCOUNT_KEY=<your-storage-account-key> \
.

Просмотреть файл

@ -1,3 +1,4 @@
import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
from dtde.core import CommandBuilder, ssh from dtde.core import CommandBuilder, ssh
from subprocess import call from subprocess import call
@ -14,36 +15,56 @@ POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
elevation_level=batch_models.ElevationLevel.admin)) elevation_level=batch_models.ElevationLevel.admin))
def cluster_install_cmd(zip_resource_file: batch_models.ResourceFile, custom_script_file): def cluster_install_cmd(zip_resource_file: batch_models.ResourceFile):
""" """
This will return the command line to be run on the start task of the pool to setup spark. For Docker on ubuntu 16.04 - return the command line to be run on the start task of the pool to setup spark.
""" """
ret = [ ret = [
# setup spark home and permissions for spark folder 'apt-get -y clean',
'export SPARK_HOME=/dsvm/tools/spark/current', 'apt-get -y update',
'export PATH=$PATH:$SPARK_HOME/bin', 'apt-get install --fix-missing',
'chmod -R 777 $SPARK_HOME', 'apt-get -y install unzip',
'chmod -R 777 /usr/local/share/jupyter/kernels',
# To avoid error: "sudo: sorry, you must have a tty to run sudo"
'sed -i -e "s/Defaults requiretty.*/ #Defaults requiretty/g" /etc/sudoers',
'unzip $AZ_BATCH_TASK_WORKING_DIR/{0}'.format( 'unzip $AZ_BATCH_TASK_WORKING_DIR/{0}'.format(
zip_resource_file.file_path), zip_resource_file.file_path),
'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/main.sh', 'chmod 777 $AZ_BATCH_TASK_WORKING_DIR/setup_node.sh',
# Convert windows line ending to unix if applicable '/bin/bash $AZ_BATCH_TASK_WORKING_DIR/setup_node.sh {0} {1} "{2}"'.format(
'dos2unix $AZ_BATCH_TASK_WORKING_DIR/main.sh', constants.DOCKER_SPARK_CONTAINER_NAME, constants.DOCKER_REPO_NAME, docker_run_cmd()),
'$AZ_BATCH_TASK_WORKING_DIR/main.sh'
] ]
if custom_script_file is not None:
ret.extend([
'/bin/sh -c {}'.format(custom_script_file),
])
ret.extend(['exit 0'])
return ret return ret
def docker_run_cmd() -> str:
"""
Build the docker run command by setting up the environment variables
"""
cmd = CommandBuilder('docker run')
cmd.add_option('--net', 'host')
cmd.add_option('--name', constants.DOCKER_SPARK_CONTAINER_NAME)
cmd.add_option('-v', '/mnt/batch/tasks:/batch')
cmd.add_option('-e', 'DOCKER_WORKING_DIR=/batch/startup/wd')
cmd.add_option('-e', 'AZ_BATCH_ACCOUNT_NAME=$AZ_BATCH_ACCOUNT_NAME')
cmd.add_option('-e', 'BATCH_ACCOUNT_KEY=$BATCH_ACCOUNT_KEY')
cmd.add_option('-e', 'BATCH_ACCOUNT_URL=$BATCH_ACCOUNT_URL')
cmd.add_option('-e', 'AZ_BATCH_POOL_ID=$AZ_BATCH_POOL_ID')
cmd.add_option('-e', 'AZ_BATCH_NODE_ID=$AZ_BATCH_NODE_ID')
cmd.add_option(
'-e', 'AZ_BATCH_NODE_IS_DEDICATED=$AZ_BATCH_NODE_IS_DEDICATED')
cmd.add_option('-e', 'SPARK_MASTER_UI_PORT=$SPARK_MASTER_UI_PORT')
cmd.add_option('-e', 'SPARK_WORKER_UI_PORT=$SPARK_WORKER_UI_PORT')
cmd.add_option('-e', 'SPARK_JUPYTER_PORT=$SPARK_JUPYTER_PORT')
cmd.add_option('-e', 'SPARK_WEB_UI_PORT=$SPARK_WEB_UI_PORT')
cmd.add_option('-p', '8080:8080')
cmd.add_option('-p', '7077:7077')
cmd.add_option('-p', '4040:4040')
cmd.add_option('-p', '8888:8888')
cmd.add_option('-d', constants.DOCKER_REPO_NAME)
cmd.add_argument('/bin/bash /batch/startup/wd/docker_main.sh')
return cmd.to_str()
def generate_cluster_start_task( def generate_cluster_start_task(
cluster_id: str, cluster_id: str,
zip_resource_file: batch_models.ResourceFile, zip_resource_file: batch_models.ResourceFile,
@ -63,7 +84,12 @@ def generate_cluster_start_task(
util.upload_file_to_container( util.upload_file_to_container(
container_name=cluster_id, container_name=cluster_id,
file_path=custom_script, file_path=custom_script,
use_full_path=True)) node_path="custom-scripts/{0}".format(os.path.basename(custom_script))))
spark_master_ui_port = constants.DOCKER_SPARK_MASTER_UI_PORT
spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT
spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT
spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT
# TODO use certificate # TODO use certificate
batch_config = azure_api.get_batch_config() batch_config = azure_api.get_batch_config()
@ -72,10 +98,18 @@ def generate_cluster_start_task(
name="BATCH_ACCOUNT_KEY", value=batch_config.account_key), name="BATCH_ACCOUNT_KEY", value=batch_config.account_key),
batch_models.EnvironmentSetting( batch_models.EnvironmentSetting(
name="BATCH_ACCOUNT_URL", value=batch_config.account_url), name="BATCH_ACCOUNT_URL", value=batch_config.account_url),
batch_models.EnvironmentSetting(
name="SPARK_MASTER_UI_PORT", value=spark_master_ui_port),
batch_models.EnvironmentSetting(
name="SPARK_WORKER_UI_PORT", value=spark_worker_ui_port),
batch_models.EnvironmentSetting(
name="SPARK_JUPYTER_PORT", value=spark_jupyter_port),
batch_models.EnvironmentSetting(
name="SPARK_WEB_UI_PORT", value=spark_web_ui_port),
] ]
# start task command # start task command
command = cluster_install_cmd(zip_resource_file, custom_script) command = cluster_install_cmd(zip_resource_file)
return batch_models.StartTask( return batch_models.StartTask(
command_line=util.wrap_commands_in_shell(command), command_line=util.wrap_commands_in_shell(command),
@ -113,9 +147,9 @@ def create_cluster(
batch_client = azure_api.get_batch_client() batch_client = azure_api.get_batch_client()
# vm image # vm image
publisher = 'microsoft-ads' publisher = 'Canonical'
offer = 'linux-data-science-vm' offer = 'UbuntuServer'
sku = 'linuxdsvm' sku = '16.04'
# reuse pool_id as job_id # reuse pool_id as job_id
pool_id = cluster_id pool_id = cluster_id
@ -221,12 +255,12 @@ class Cluster:
def pretty_node_count(cluster: Cluster)-> str: def pretty_node_count(cluster: Cluster)-> str:
if cluster.pool.allocation_state.value is batch_models.AllocationState.resizing: if cluster.pool.allocation_state is batch_models.AllocationState.resizing:
return '{} -> {}'.format( return '{} -> {}'.format(
cluster.total_current_nodes, cluster.total_current_nodes,
cluster.total_target_nodes) cluster.total_target_nodes)
else: else:
return '{}'.format(cluster.dedicated_nodes) return '{}'.format(cluster.total_current_nodes)
def print_cluster(cluster: Cluster): def print_cluster(cluster: Cluster):
@ -371,14 +405,21 @@ def ssh_in_master(
master_node_ip = remote_login_settings.remote_login_ip_address master_node_ip = remote_login_settings.remote_login_ip_address
master_node_port = remote_login_settings.remote_login_port master_node_port = remote_login_settings.remote_login_port
pool = batch_client.pool.get(cluster_id)
spark_master_ui_port = constants.DOCKER_SPARK_MASTER_UI_PORT
spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT
spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT
spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT
ssh_command = CommandBuilder('ssh') ssh_command = CommandBuilder('ssh')
ssh_command.add_option("-L", "{0}:localhost:{1}".format( ssh_command.add_option("-L", "{0}:localhost:{1}".format(
masterui, constants.SPARK_MASTER_UI_PORT), enable=bool(masterui)) masterui, spark_master_ui_port), enable=bool(masterui))
ssh_command.add_option(
"-L", "{0}:localhost:{1}".format(webui, constants.SPARK_WEBUI_PORT), enable=bool(webui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format( ssh_command.add_option("-L", "{0}:localhost:{1}".format(
jupyter, constants.SPARK_JUPYTER_PORT), enable=bool(jupyter)) webui, spark_web_ui_port), enable=bool(webui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format(
jupyter, spark_jupyter_port), enable=bool(jupyter))
if ports is not None: if ports is not None:
for port in ports: for port in ports:

Просмотреть файл

@ -5,9 +5,23 @@ import os
""" """
CLI_EXE = 'azb' CLI_EXE = 'azb'
SPARK_MASTER_UI_PORT = 8082 # DOCKER_REPO_NAME = "jiata/spark:latest"
SPARK_WEBUI_PORT = 4040 DOCKER_REPO_NAME = "jiata/spark-2.2.0:latest"
SPARK_JUPYTER_PORT = 7777 DOCKER_SPARK_CONTAINER_NAME = "spark"
# DOCKER
DOCKER_SPARK_MASTER_UI_PORT = 8080
DOCKER_SPARK_WORKER_UI_PORT = 8081
DOCKER_SPARK_JUPYTER_PORT = 8888
DOCKER_SPARK_WEB_UI_PORT = 4040
DOCKER_SPARK_HOME = "/home/spark-2.2.0-bin-hadoop2.7"
# DSVM
DSVM_SPARK_MASTER_UI_PORT = 8082
DSVM_SPARK_WORKER_UI_PORT = 8083
DSVM_SPARK_JUPYTER_PORT = 7777
DSVM_SPARK_WEB_UI_PORT = 4040
DSVM_SPARK_HOME = "/dsvm/tools/spark/current"
""" """
Root path of this repository Root path of this repository

Просмотреть файл

@ -2,9 +2,16 @@ from datetime import timedelta
from typing import List from typing import List
from dtde.core import CommandBuilder from dtde.core import CommandBuilder
import azure.batch.models as batch_models import azure.batch.models as batch_models
from . import azure_api, util from . import azure_api, util, constants
def get_node(node_id: str, cluster_id: str) -> batch_models.ComputeNode:
batch_client = azure_api.get_batch_client()
return batch_client.compute_node.get(cluster_id, node_id)
def app_submit_cmd( def app_submit_cmd(
cluster_id: str,
name: str, name: str,
app: str, app: str,
app_args: str, app_args: str,
@ -19,11 +26,21 @@ def app_submit_cmd(
executor_memory: str, executor_memory: str,
driver_cores: str, driver_cores: str,
executor_cores: str): executor_cores: str):
spark_submit_cmd = CommandBuilder('$SPARK_HOME/bin/spark-submit')
master_id = util.get_master_node_id(cluster_id)
master_ip = get_node(master_id, cluster_id).ip_address
# get pool data from pool meta key/value store
batch_client = azure_api.get_batch_client()
pool = batch_client.pool.get(cluster_id)
spark_home = constants.DOCKER_SPARK_HOME
spark_submit_cmd = CommandBuilder(
'{0}/bin/spark-submit'.format(spark_home))
spark_submit_cmd.add_option(
'--master', 'spark://{0}:7077'.format(master_ip))
spark_submit_cmd.add_option('--name', name) spark_submit_cmd.add_option('--name', name)
spark_submit_cmd.add_option('--master', 'spark://${MASTER_NODE%:*}:7077')
spark_submit_cmd.add_option('--class', main_class)
spark_submit_cmd.add_option('--class', main_class) spark_submit_cmd.add_option('--class', main_class)
spark_submit_cmd.add_option('--jars', jars and ','.join(jars)) spark_submit_cmd.add_option('--jars', jars and ','.join(jars))
spark_submit_cmd.add_option('--py-files', py_files and ','.join(py_files)) spark_submit_cmd.add_option('--py-files', py_files and ','.join(py_files))
@ -35,42 +52,39 @@ def app_submit_cmd(
spark_submit_cmd.add_option('--executor-memory', executor_memory) spark_submit_cmd.add_option('--executor-memory', executor_memory)
spark_submit_cmd.add_option('--driver-cores', driver_cores) spark_submit_cmd.add_option('--driver-cores', driver_cores)
spark_submit_cmd.add_option('--executor-cores', executor_cores) spark_submit_cmd.add_option('--executor-cores', executor_cores)
spark_submit_cmd.add_argument( spark_submit_cmd.add_argument(
'$AZ_BATCH_TASK_WORKING_DIR/' + app + ' ' + ' '.join(app_args)) '/batch/workitems/{0}/{1}/{2}/wd/'.format(cluster_id, "job-1", name) +
app + ' ' + ' '.join(app_args))
docker_exec_cmd = CommandBuilder('sudo docker exec')
docker_exec_cmd.add_option('-e', 'PYSPARK_PYTHON=/usr/bin/python3')
docker_exec_cmd.add_option('-i', constants.DOCKER_SPARK_CONTAINER_NAME)
docker_exec_cmd.add_argument(spark_submit_cmd.to_str())
return [ return [
# set SPARK_HOME environment vars docker_exec_cmd.to_str()
'export SPARK_HOME=/dsvm/tools/spark/current',
'export PATH=$PATH:$SPARK_HOME/bin',
# set the runtime to python 3
'export PYSPARK_PYTHON=/usr/bin/python3',
'export PYSPARK_DRIVER_PYTHON=python3',
# get master node ip
'export MASTER_NODE=$(cat $SPARK_HOME/conf/master)',
'echo "Master node ip is $MASTER_NODE"',
spark_submit_cmd.to_str(),
] ]
def submit_app( def submit_app(
cluster_id:str, cluster_id: str,
name:str, name: str,
app:str, app: str,
app_args:List[str], app_args: List[str],
wait:bool, wait: bool,
main_class:str, main_class: str,
jars:List[str], jars: List[str],
py_files:List[str], py_files: List[str],
files:List[str], files: List[str],
driver_java_options:str, driver_java_options: str,
driver_library_path:str, driver_library_path: str,
driver_class_path:str, driver_class_path: str,
driver_memory:str, driver_memory: str,
executor_memory:str, executor_memory: str,
driver_cores:str, driver_cores: str,
executor_cores:str): executor_cores: str):
""" """
Submit a spark app Submit a spark app
""" """
@ -99,6 +113,7 @@ def submit_app(
# create command to submit task # create command to submit task
cmd = app_submit_cmd( cmd = app_submit_cmd(
cluster_id=cluster_id,
name=name, name=name,
app=app, app=app,
app_args=app_args, app_args=app_args,
@ -134,7 +149,7 @@ def submit_app(
elevation_level=batch_models.ElevationLevel.admin)) elevation_level=batch_models.ElevationLevel.admin))
) )
# Add task to batch job (which has the same name as pool_id) # Add task to batch job (which has the same name as cluster_id)
job_id = cluster_id job_id = cluster_id
batch_client.task.add(job_id=job_id, task=task) batch_client.task.add(job_id=job_id, task=task)

Просмотреть файл

@ -1,3 +1,5 @@
import fnmatch
import io
import os import os
import logging import logging
import zipfile import zipfile
@ -21,9 +23,15 @@ def zipdir(path, ziph):
for base, _, files in os.walk(path): for base, _, files in os.walk(path):
relative_folder = os.path.relpath(base, path) relative_folder = os.path.relpath(base, path)
for file in files: for file in files:
ziph.write(os.path.join(base, file), if __includeFile(file):
os.path.join(relative_folder, file)) with io.open(os.path.join(base, file), 'r') as f:
ziph.writestr(os.path.join(relative_folder, file), f.read().replace('\r\n', '\n'))
def __includeFile(filename: str) -> bool:
if fnmatch.fnmatch(filename, '*.pyc'):
return False
return True
def __create_zip(): def __create_zip():
ensure_dir(local_tmp_zipfile) ensure_dir(local_tmp_zipfile)

Просмотреть файл

@ -75,13 +75,14 @@ def wait_for_master_to_be_ready(cluster_id: str):
time.sleep(5) time.sleep(5)
def upload_file_to_container(container_name, file_path, use_full_path) -> batch_models.ResourceFile: def upload_file_to_container(container_name, file_path, use_full_path = False, node_path = None) -> batch_models.ResourceFile:
""" """
Uploads a local file to an Azure Blob storage container. Uploads a local file to an Azure Blob storage container.
:param block_blob_client: A blob service client. :param block_blob_client: A blob service client.
:type block_blob_client: `azure.storage.blob.BlockBlobService` :type block_blob_client: `azure.storage.blob.BlockBlobService`
:param str container_name: The name of the Azure Blob storage container. :param str container_name: The name of the Azure Blob storage container.
:param str file_path: The local path to the file. :param str file_path: The local path to the file.
:param str node_path: Path on the local node. By default will be the same as file_path
:rtype: `azure.batch.models.ResourceFile` :rtype: `azure.batch.models.ResourceFile`
:return: A ResourceFile initialized with a SAS URL appropriate for Batch :return: A ResourceFile initialized with a SAS URL appropriate for Batch
tasks. tasks.
@ -89,11 +90,14 @@ def upload_file_to_container(container_name, file_path, use_full_path) -> batch_
block_blob_client = azure_api.get_blob_client() block_blob_client = azure_api.get_blob_client()
blob_name = None blob_name = None
if (use_full_path): if use_full_path:
blob_name = file_path blob_name = file_path
else: else:
blob_name = os.path.basename(file_path) blob_name = os.path.basename(file_path)
if not node_path:
node_path = blob_name
block_blob_client.create_container(container_name, block_blob_client.create_container(container_name,
fail_on_exist=False) fail_on_exist=False)
@ -111,7 +115,7 @@ def upload_file_to_container(container_name, file_path, use_full_path) -> batch_
blob_name, blob_name,
sas_token=sas_token) sas_token=sas_token)
return batch_models.ResourceFile(file_path=blob_name, return batch_models.ResourceFile(file_path=node_path,
blob_source=sas_url) blob_source=sas_url)

Просмотреть файл

@ -6,16 +6,15 @@ import azure.batch.batch_auth as batchauth
account_name = os.environ["AZ_BATCH_ACCOUNT_NAME"] account_name = os.environ["AZ_BATCH_ACCOUNT_NAME"]
account_key = os.environ["BATCH_ACCOUNT_KEY"] account_key = os.environ["BATCH_ACCOUNT_KEY"]
account_url = os.environ["BATCH_ACCOUNT_URL"] account_url = os.environ["BATCH_ACCOUNT_URL"]
pool_id = os.environ["AZ_BATCH_POOL_ID"] pool_id = os.environ["AZ_BATCH_POOL_ID"]
node_id = os.environ["AZ_BATCH_NODE_ID"] node_id = os.environ["AZ_BATCH_NODE_ID"]
is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"] is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"]
# TODO save this as env/metadata spark_master_ui_port = os.environ["SPARK_MASTER_UI_PORT"]
SPARK_MASTER_UI_PORT = 8082 spark_worker_ui_port = os.environ["SPARK_WORKER_UI_PORT"]
SPARK_WORKER_UI_PORT = 8083 spark_jupyter_port = os.environ["SPARK_JUPYTER_PORT"]
SPARK_WEB_UI_PORT = 4040 spark_web_ui_port = os.environ["SPARK_WEB_UI_PORT"]
SPARK_JUPYTER_PORT = 7777
def get_client() -> batch.BatchServiceClient: def get_client() -> batch.BatchServiceClient:
credentials = batchauth.SharedKeyCredentials( credentials = batchauth.SharedKeyCredentials(
@ -23,7 +22,6 @@ def get_client() -> batch.BatchServiceClient:
account_key) account_key)
return batch.BatchServiceClient(credentials, base_url=account_url) return batch.BatchServiceClient(credentials, base_url=account_url)
batch_client = get_client() batch_client = get_client()
logging.info("Pool id is %s", pool_id) logging.info("Pool id is %s", pool_id)

Просмотреть файл

@ -0,0 +1,28 @@
#!/bin/bash
# This file is the entry point of the docker container.
# It will run the custom scripts if present and start spark.
set -e
export PATH=/usr/bin/python3:$PATH
custom_script_dir=$DOCKER_WORKING_DIR/custom-scripts
if [ -d "$custom_script_dir" ]; then
echo "Custom script dir '$custom_script_dir' exists. Running all script there."
for script in $custom_script_dir/*.sh; do
echo "Running custom script $script"
bash $script
done
else
echo "Custom script dir '$custom_script_dir' doesn't exists. Will not run any custom scripts."
fi
echo "Starting setup using Docker"
pip3 install -r $(dirname $0)/requirements.txt
echo "Running main.py script"
python3 $(dirname $0)/main.py install
# sleep to keep container running
while true; do sleep 1; done

11
node_scripts/dsvm_main.sh Normal file
Просмотреть файл

@ -0,0 +1,11 @@
#!/bin/bash
set -ev
export PATH=/anaconda/envs/py35/bin:$PATH
echo "Starting setup with the Azure DSVM"
pip install -r $(dirname $0)/requirements.txt
echo "Running main.py script"
python $(dirname $0)/main.py install

Просмотреть файл

@ -11,6 +11,7 @@ def setup_node():
else: else:
setup_as_worker() setup_as_worker()
def setup_as_master(): def setup_as_master():
print("Setting up as master.") print("Setting up as master.")
spark.setup_connection() spark.setup_connection()

Просмотреть файл

@ -7,14 +7,11 @@ import azure.batch.models as batchmodels
import azure.batch.models.batch_error as batcherror import azure.batch.models.batch_error as batcherror
from core import config from core import config
MASTER_NODE_METADATA_KEY = "_spark_master_node" MASTER_NODE_METADATA_KEY = "_spark_master_node"
class CannotAllocateMasterError(Exception): class CannotAllocateMasterError(Exception):
pass pass
def get_master_node_id(pool: batchmodels.CloudPool): def get_master_node_id(pool: batchmodels.CloudPool):
""" """
:returns: the id of the node that is the assigned master of this pool :returns: the id of the node that is the assigned master of this pool
@ -28,7 +25,6 @@ def get_master_node_id(pool: batchmodels.CloudPool):
return None return None
def try_assign_self_as_master(client: batch.BatchServiceClient, pool: batchmodels.CloudPool): def try_assign_self_as_master(client: batch.BatchServiceClient, pool: batchmodels.CloudPool):
current_metadata = pool.metadata or [] current_metadata = pool.metadata or []
new_metadata = current_metadata + [{"name": MASTER_NODE_METADATA_KEY, "value": config.node_id}] new_metadata = current_metadata + [{"name": MASTER_NODE_METADATA_KEY, "value": config.node_id}]

Просмотреть файл

@ -6,14 +6,17 @@ import time
import os import os
import json import json
import shutil import shutil
from subprocess import call, Popen from subprocess import call, Popen, check_output
from typing import List from typing import List
import azure.batch.models as batchmodels import azure.batch.models as batchmodels
from core import config from core import config
from install import pick_master from install import pick_master
batch_client = config.batch_client batch_client = config.batch_client
spark_home = "/dsvm/tools/spark/current"
spark_home = "/home/spark-2.2.0-bin-hadoop2.7"
pyspark_driver_python = "/usr/local/bin/jupyter"
spark_conf_folder = os.path.join(spark_home, "conf") spark_conf_folder = os.path.join(spark_home, "conf")
@ -33,6 +36,7 @@ def list_nodes() -> List[batchmodels.ComputeNode]:
# pool # pool
return batch_client.compute_node.list(config.pool_id) return batch_client.compute_node.list(config.pool_id)
def setup_connection(): def setup_connection():
""" """
This setup spark config with which nodes are slaves and which are master This setup spark config with which nodes are slaves and which are master
@ -44,7 +48,8 @@ def setup_connection():
master_config_file = os.path.join(spark_conf_folder, "master") master_config_file = os.path.join(spark_conf_folder, "master")
master_file = open(master_config_file, 'w') master_file = open(master_config_file, 'w')
print("Adding master node ip {0} to config file '{1}'".format(master_node.ip_address, master_config_file)) print("Adding master node ip {0} to config file '{1}'".format(
master_node.ip_address, master_config_file))
master_file.write("{0}\n".format(master_node.ip_address)) master_file.write("{0}\n".format(master_node.ip_address))
master_file.close() master_file.close()
@ -65,19 +70,27 @@ def generate_jupyter_config():
"{connection_file}", "{connection_file}",
], ],
env=dict( env=dict(
SPARK_HOME="/dsvm/tools/spark/current", SPARK_HOME=spark_home,
PYSPARK_PYTHON="/usr/bin/python3", PYSPARK_PYTHON="/usr/bin/python3",
PYSPARK_SUBMIT_ARGS="--master spark://{0}:7077 pyspark-shell".format(master_node_ip), PYSPARK_SUBMIT_ARGS="--master spark://{0}:7077 pyspark-shell".format(
master_node_ip),
) )
) )
def setup_jupyter(): def setup_jupyter():
print("Setting up jupyter.") print("Setting up jupyter.")
call(["/anaconda/envs/py35/bin/jupyter", "notebook", "--generate-config"])
jupyter_config_file = os.path.join(os.path.expanduser( jupyter_config_file = os.path.join(os.path.expanduser(
"~"), ".jupyter/jupyter_notebook_config.py") "~"), ".jupyter/jupyter_notebook_config.py")
if os.path.isfile(jupyter_config_file):
print("Jupyter config is already set. Skipping setup. (Start task is probably reruning after reboot)")
return
generate_jupyter_config_cmd = ["jupyter", "notebook", "--generate-config"]
generate_jupyter_config_cmd.append("--allow-root")
call(generate_jupyter_config_cmd)
with open(jupyter_config_file, "a") as config_file: with open(jupyter_config_file, "a") as config_file:
config_file.write('\n') config_file.write('\n')
@ -92,16 +105,24 @@ def setup_jupyter():
def start_jupyter(): def start_jupyter():
jupyter_port = config.SPARK_JUPYTER_PORT jupyter_port = config.spark_jupyter_port
pyspark_driver_python_opts = "notebook --no-browser --port='{0}'".format(
jupyter_port)
pyspark_driver_python_opts += " --allow-root"
my_env = os.environ.copy() my_env = os.environ.copy()
my_env["PYSPARK_DRIVER_PYTHON"] = "/anaconda/envs/py35/bin/jupyter" my_env["PYSPARK_DRIVER_PYTHON"] = pyspark_driver_python
my_env["PYSPARK_DRIVER_PYTHON_OPTS"] = "notebook --no-browser --port='{0}'".format(jupyter_port) my_env["PYSPARK_DRIVER_PYTHON_OPTS"] = pyspark_driver_python_opts
pyspark_wd = os.path.join(os.getcwd(), "pyspark") pyspark_wd = os.path.join(os.getcwd(), "pyspark")
os.mkdir(pyspark_wd) if not os.path.exists(pyspark_wd):
os.mkdir(pyspark_wd)
print("Starting pyspark") print("Starting pyspark")
process = Popen(["pyspark"], env=my_env, cwd=pyspark_wd) process = Popen([
os.path.join(spark_home, "bin/pyspark")
], env=my_env, cwd=pyspark_wd)
print("Started pyspark with pid {0}".format(process.pid)) print("Started pyspark with pid {0}".format(process.pid))
@ -126,7 +147,8 @@ def wait_for_master():
def start_spark_master(): def start_spark_master():
master_ip = get_node(config.node_id).ip_address master_ip = get_node(config.node_id).ip_address
exe = os.path.join(spark_home, "sbin", "start-master.sh") exe = os.path.join(spark_home, "sbin", "start-master.sh")
cmd = [exe, "-h", master_ip, "--webui-port", str(config.SPARK_MASTER_UI_PORT)] cmd = [exe, "-h", master_ip, "--webui-port",
str(config.spark_master_ui_port)]
print("Starting master with '{0}'".format(" ".join(cmd))) print("Starting master with '{0}'".format(" ".join(cmd)))
call(cmd) call(cmd)
@ -144,6 +166,7 @@ def start_spark_worker():
my_env = os.environ.copy() my_env = os.environ.copy()
my_env["SPARK_MASTER_IP"] = master_node.ip_address my_env["SPARK_MASTER_IP"] = master_node.ip_address
cmd = [exe, "spark://{0}:7077".format(master_node.ip_address), "--webui-port", str(config.SPARK_WORKER_UI_PORT)] cmd = [exe, "spark://{0}:7077".format(master_node.ip_address),
"--webui-port", str(config.spark_worker_ui_port)]
print("Connecting to master with '{0}'".format(" ".join(cmd))) print("Connecting to master with '{0}'".format(" ".join(cmd)))
call(cmd) call(cmd)

Просмотреть файл

@ -1,7 +1,6 @@
import sys import sys
from install import install from install import install
def run(): def run():
if len(sys.argv) < 2: if len(sys.argv) < 2:
print("Error: Expected at least one argument") print("Error: Expected at least one argument")

Просмотреть файл

@ -1,9 +0,0 @@
#!/bin/bash
set -ev
export PATH=/anaconda/envs/py35/bin:$PATH
echo "Starting setup"
pip install -r $(dirname $0)/requirements.txt
echo "Installed dependencies, picking master"
python $(dirname $0)/main.py install

Просмотреть файл

@ -0,0 +1,50 @@
#!/bin/bash
# Entry point for the start task. It will install all dependencies and start docker.
# Usage:
# setup_node.sh [container_name] [docker_repo] [docker_cmd]
container_name=$1
repo_name=$2
docker_run_cmd=$3
apt-get -y install linux-image-extra-$(uname -r) linux-image-extra-virtual
apt-get -y install apt-transport-https
apt-get -y install curl
apt-get -y install ca-certificates
apt-get -y install software-properties-common
# Install docker
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
apt-get -y update
apt-get -y install docker-ce
docker pull container_name
# Unzip resource files and set permissions
apt-get -y install unzip
chmod 777 $AZ_BATCH_TASK_WORKING_DIR/docker_main.sh
chmod -R 777 $AZ_BATCH_TASK_WORKING_DIR/custom-scripts
# Check docker is running
docker info > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo "UNKNOWN - Unable to talk to the docker daemon"
exit 3
fi
# If the container already exists just restart. Otherwise create it
if [ "$(docker ps -a -q -f name=$container_name)" ]; then
echo "Docker container is already setup. Restarting it."
docker restart $container_name
else
echo "Creating docker container."
# Start docker
eval $docker_run_cmd
# Setup symbolic link for the docker logs
docker_log=$(docker inspect --format='{{.LogPath}}' $container_name)
mkdir -p $AZ_BATCH_TASK_WORKING_DIR/logs
ln -s $docker_log $AZ_BATCH_TASK_WORKING_DIR/logs/docker.log
fi