This commit is contained in:
jiata 2017-04-06 21:02:15 +00:00
Коммит ace37b0fea
12 изменённых файлов: 827 добавлений и 0 удалений

3
.gitignore поставляемый Normal file
Просмотреть файл

@ -0,0 +1,3 @@
configuration.cfg
*.pyc
test

2
README.md Normal file
Просмотреть файл

@ -0,0 +1,2 @@
# redbull
Run Apache Spark on Azure Batch

46
jobs/pi.py Normal file
Просмотреть файл

@ -0,0 +1,46 @@
from __future__ import print_function
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()

271
lib/setup.py Normal file
Просмотреть файл

@ -0,0 +1,271 @@
import util
try:
import configparser
except ImportError:
import ConfigParser as configparser
import os
import datetime
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.models as batch_models
import azure.storage.blob as blob
deployment_suffix = '-42-dsvm'
# config file path
_CONFIG_PATH = os.path.join(os.path.dirname(__file__), '../configuration.cfg')
# pool configs
_VM_SIZE = 'Standard_D2_v2'
_VM_COUNT = 5
_POOL_ID = 'spark-test-pool' + deployment_suffix
_JOB_ID = 'spark-test-job' + deployment_suffix
_MULTIINSTANCE_TASK_ID = 'multiinstance-spark-task' + deployment_suffix
# vm image
_VM_IMAGE_OPTIONS = {
'ubuntu': {
'publisher': 'Canonical',
'offer': 'UbuntuServer',
'sku': '16.04'
},
'dsvm': {
'publisher': 'microsoft-ads',
'offer': 'linux-data-science-vm',
'sku': 'linuxdsvm'
}
}
_VM_IMAGE = _VM_IMAGE_OPTIONS['dsvm']
# tasks variables
_CONTAINER_NAME = 'sparkresourcesfiles'
_START_TASK_NAME = 'spark-install.sh'
_START_TASK_PATH = os.path.join('resource-files/dsvm-image', _START_TASK_NAME)
_APPLICATION_TASK_NAME = 'spark-start.sh'
_APPLICATION_TASK_PATH = os.path.join('resource-files/dsvm-image', _APPLICATION_TASK_NAME)
_COORDINATION_TASK_NAME = 'spark-connect.sh'
_COORDINATION_TASK_PATH = os.path.join('resource-files/dsvm-image', _COORDINATION_TASK_NAME)
# ssh user variables
_USERNAME = 'jiata'
_PASSWORD = 'B1gComput#'
def connect(batch_client):
"""
Creates a batch user for the master node, retrieves
the user's login settings and generates ssh tunnel command
:param batch_client: the batch client
:type batch_client: azure.batch.BatchServiceClient
:return string: the ssh tunnel command to run
"""
# get master node id from task
master_node_id = batch_client.task.get(_JOB_ID, _MULTIINSTANCE_TASK_ID).node_info.node_id
# create new ssh user for the master node
batch_client.compute_node.add_user(
_POOL_ID,
master_node_id,
batch_models.ComputeNodeUser(
_USERNAME,
is_admin = True,
password = _PASSWORD))
print('\nuser {} added to node {} in pool {}'.format(_USERNAME, master_node_id, _POOL_ID))
# get remote login settings for the user
remote_login_settings = batch_client.compute_node.get_remote_login_settings(
_POOL_ID, master_node_id)
master_node_ip = remote_login_settings.remote_login_ip_address
master_node_port = remote_login_settings.remote_login_port
# build ssh tunnel command
ssh_tunnel_command = "ssh -L 8080:localhost:8080 -L 8888:localhost:8888 " + \
_USERNAME + "@" + str(master_node_ip) + " -p " + str(master_node_port)
return ssh_tunnel_command
def submit_task(batch_client, blob_client,
coordination_command_resource_file, application_command_resource_file):
"""
Submits a job to the Azure Batch service and adds a task
:param batch_client: the batch client
:type batch_client: azure.batch.BatchServiceClient
:param blob_client: the storage blob client
:type blob_client: azure.storage.BlobBlockService
:param coordination_command_resource_file:
the resource file that the coordination command will use
:type coordination_command_resource_file: azure.batch.models.ResourceFile
:param application_command_resource_file:
the resource file that the application command will use
:type application_command_resource_file: azure.batch.models.ResourceFile
"""
application_command = "/bin/sh -c $AZ_BATCH_TASK_WORKING_DIR/" + _APPLICATION_TASK_NAME
coordination_command = "/bin/sh -c $AZ_BATCH_TASK_DIR/" + _COORDINATION_TASK_NAME
# create multi-instance task
task = batch_models.TaskAddParameter(
id = _MULTIINSTANCE_TASK_ID,
command_line = application_command,
resource_files = [application_command_resource_file],
run_elevated = False,
multi_instance_settings = batch_models.MultiInstanceSettings(
number_of_instances = _VM_COUNT,
coordination_command_line = coordination_command,
common_resource_files = [coordination_command_resource_file]))
# add task to job
batch_client.task.add(job_id = _JOB_ID, task = task)
def submit_job(batch_client):
"""
Submits a job to the Azure Batch service and adds a task
:param batch_client: the batch client
:type batch_client: azure.batch.BatchServiceClient
"""
#create job
job = batch_models.JobAddParameter(
id = _JOB_ID,
pool_info=batch_models.PoolInformation(pool_id = _POOL_ID))
# add job to batch
batch_client.job.add(job)
def create_pool(batch_client, blob_client, start_task_resource_file):
"""
Create an Azure Batch pool
:param batch_client: the batch client
:type batch_client: azure.batch.BatchServiceClient
:param blob_client: the storage blob client
:type blob_client: azure.storage.blob.BlockBlobService
:param start_task_resource_file: the resource file that the start task will use
:type start_task_resource_file: azure.batch.models.ResourceFile
"""
# Get a verified node agent sku
sku_to_use, image_ref_to_use = \
util.select_latest_verified_vm_image_with_node_agent_sku(
batch_client, _VM_IMAGE['publisher'], _VM_IMAGE['offer'], _VM_IMAGE['sku'])
# Confiure the pool
pool = batch_models.PoolAddParameter(
id = _POOL_ID,
virtual_machine_configuration = batch_models.VirtualMachineConfiguration(
image_reference = image_ref_to_use,
node_agent_sku_id = sku_to_use),
vm_size = _VM_SIZE,
target_dedicated = _VM_COUNT,
start_task = batch_models.StartTask(
command_line = "/bin/sh -c " + _START_TASK_NAME,
resource_files = [start_task_resource_file],
run_elevated = True,
wait_for_success = True),
enable_inter_node_communication = True,
max_tasks_per_node = 1)
# Create the pool
util.create_pool_if_not_exist(batch_client, pool)
if __name__ == '__main__':
"""
Start script
"""
global_config = configparser.ConfigParser()
global_config.read(_CONFIG_PATH)
util.print_configuration(global_config)
'''
# Set up the configuration
batch_account_key = global_config.get('Batch', 'batchaccountkey')
batch_account_name = global_config.get('Batch', 'batchaccountname')
batch_service_url = global_config.get('Batch', 'batchserviceurl')
storage_account_key = global_config.get('Storage', 'storageaccountkey')
storage_account_name = global_config.get('Storage', 'storageaccountname')
storage_account_suffix = global_config.get('Storage', 'storageaccountsuffix')
# Set up SharedKeyCredentials
credentials = batch_auth.SharedKeyCredentials(
batch_account_name,
batch_account_key)
# Set up Batch Client
batch_client = batch.BatchServiceClient(
credentials,
base_url=batch_service_url)
# Set retry policy
batch_client.config.retry_policy.retries = 5
# Set up BlockBlobStorage
blob_client = blob.BlockBlobService(
account_name = storage_account_name,
account_key = storage_account_key,
endpoint_suffix = storage_account_suffix)
"""
Upload resource files to storage container
"""
# Upload start task resource files to blob storage
start_task_resource_file = \
util.upload_file_to_container(
blob_client, _CONTAINER_NAME, _START_TASK_PATH)
# Upload Coordination command resource files to blob storage
coordination_command_resource_file = \
util.upload_file_to_container(
blob_client, _CONTAINER_NAME, _COORDINATION_TASK_PATH)
# Upload Application command resource files to blob storage
application_command_resource_file = \
util.upload_file_to_container(
blob_client, _CONTAINER_NAME, _APPLICATION_TASK_PATH)
"""
Start pool, Start Job, Start task!
"""
# Create a pool if the pool doesn't already exist
create_pool(
batch_client,
blob_client,
start_task_resource_file)
# Submit a job
submit_job(batch_client)
# Submit a task
submit_task(
batch_client,
blob_client,
coordination_command_resource_file,
application_command_resource_file)
# wait for the job to finish
util.wait_for_tasks_to_complete(
batch_client,
_JOB_ID,
datetime.timedelta(minutes=60))
"""
Return user/credentials to ssh tunnel into Master node
"""
# Create user and get her credentials to create the ssh tunnel
ssh_tunnel_command = connect(batch_client)
print('\nuse the following command to get started!')
print(ssh_tunnel_command)
print()
'''

191
lib/util.py Normal file
Просмотреть файл

@ -0,0 +1,191 @@
from __future__ import print_function
import datetime
import io
import os
import time
import azure.storage.blob as blob
import azure.batch.models as batch_models
_STANDARD_OUT_FILE_NAME = 'stdout.txt'
_STANDARD_ERROR_FILE_NAME = 'stderr.txt'
_SAMPLES_CONFIG_FILE_NAME = 'configuration.cfg'
def wait_for_tasks_to_complete(batch_client, job_id, timeout):
"""Waits for all the tasks in a particular job to complete.
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param str job_id: The id of the job to monitor.
:param timeout: The maximum amount of time to wait.
:type timeout: `datetime.timedelta`
"""
time_to_timeout_at = datetime.datetime.now() + timeout
while datetime.datetime.now() < time_to_timeout_at:
tasks = batch_client.task.list(job_id)
incomplete_tasks = [task for task in tasks if
task.state != batch_models.TaskState.completed]
if not incomplete_tasks:
return
time.sleep(5)
raise TimeoutError("Timed out waiting for tasks to complete")
def upload_file_to_container(block_blob_client, container_name, file_path):
"""
Uploads a local file to an Azure Blob storage container.
:param block_blob_client: A blob service client.
:type block_blob_client: `azure.storage.blob.BlockBlobService`
:param str container_name: The name of the Azure Blob storage container.
:param str file_path: The local path to the file.
:rtype: `azure.batch.models.ResourceFile`
:return: A ResourceFile initialized with a SAS URL appropriate for Batch
tasks.
"""
blob_name = os.path.basename(file_path)
'''
print('\nUploading file {} to container [{}]...'.format(file_path,
container_name))
'''
block_blob_client.create_blob_from_path(container_name,
blob_name,
file_path)
sas_token = block_blob_client.generate_blob_shared_access_signature(
container_name,
blob_name,
permission=blob.BlobPermissions.READ,
expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=2))
sas_url = block_blob_client.make_blob_url(container_name,
blob_name,
sas_token=sas_token)
return batch_models.ResourceFile(file_path=blob_name,
blob_source=sas_url)
def print_configuration(config):
"""Prints the configuration being used as a dictionary
:param config: The configuration.
:type config: `configparser.ConfigParser`
"""
configuration_dict = {s: dict(config.items(s)) for s in
config.sections() + ['DEFAULT']}
print("\nConfiguration is:")
print(configuration_dict)
def create_pool_if_not_exist(batch_client, pool):
"""Creates the specified pool if it doesn't already exist
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param pool: The pool to create.
:type pool: `batchserviceclient.models.PoolAddParameter`
"""
try:
print("\nAttempting to create pool:", pool.id)
batch_client.pool.add(pool)
print("\nCreated pool:", pool.id)
except batch_models.BatchErrorException as e:
if e.error.code != "PoolExists":
raise
else:
print("\nPool {!r} already exists".format(pool.id))
def select_latest_verified_vm_image_with_node_agent_sku(
batch_client, publisher, offer, sku_starts_with):
"""Select the latest verified image that Azure Batch supports given
a publisher, offer and sku (starts with filter).
:param batch_client: The batch client to use.
:type batch_client: `batchserviceclient.BatchServiceClient`
:param str publisher: vm image publisher
:param str offer: vm image offer
:param str sku_starts_with: vm sku starts with filter
:rtype: tuple
:return: (node agent sku id to use, vm image ref to use)
"""
# get verified vm image list and node agent sku ids from service
node_agent_skus = batch_client.account.list_node_agent_skus()
# pick the latest supported sku
skus_to_use = [
(sku, image_ref) for sku in node_agent_skus for image_ref in sorted(
sku.verified_image_references, key=lambda item: item.sku)
if image_ref.publisher.lower() == publisher.lower() and
image_ref.offer.lower() == offer.lower() and
image_ref.sku.startswith(sku_starts_with)
]
# skus are listed in reverse order, pick first for latest
sku_to_use, image_ref_to_use = skus_to_use[0]
return (sku_to_use.id, image_ref_to_use)
def create_sas_token(
block_blob_client, container_name, blob_name, permission, expiry=None,
timeout=None):
"""Create a blob sas token
:param block_blob_client: The storage block blob client to use.
:type block_blob_client: `azure.storage.blob.BlockBlobService`
:param str container_name: The name of the container to upload the blob to.
:param str blob_name: The name of the blob to upload the local file to.
:param expiry: The SAS expiry time.
:type expiry: `datetime.datetime`
:param int timeout: timeout in minutes from now for expiry,
will only be used if expiry is not specified
:return: A SAS token
:rtype: str
"""
if expiry is None:
if timeout is None:
timeout = 30
expiry = datetime.datetime.utcnow() + datetime.timedelta(
minutes=timeout)
return block_blob_client.generate_blob_shared_access_signature(
container_name, blob_name, permission=permission, expiry=expiry)
def upload_blob_and_create_sas(
block_blob_client, container_name, blob_name, file_name, expiry,
timeout=None):
"""Uploads a file from local disk to Azure Storage and creates
a SAS for it.
:param block_blob_client: The storage block blob client to use.
:type block_blob_client: `azure.storage.blob.BlockBlobService`
:param str container_name: The name of the container to upload the blob to.
:param str blob_name: The name of the blob to upload the local file to.
:param str file_name: The name of the local file to upload.
:param expiry: The SAS expiry time.
:type expiry: `datetime.datetime`
:param int timeout: timeout in minutes from now for expiry,
will only be used if expiry is not specified
:return: A SAS URL to the blob with the specified expiry time.
:rtype: str
"""
block_blob_client.create_container(
container_name,
fail_on_exist=False)
block_blob_client.create_blob_from_path(
container_name,
blob_name,
file_name)
sas_token = create_sas_token(
block_blob_client,
container_name,
blob_name,
permission=blob.BlobPermissions.READ,
expiry=expiry,
timeout=timeout)
sas_url = block_blob_client.make_blob_url(
container_name,
blob_name,
sas_token=sas_token)
return sas_url

Просмотреть файл

@ -0,0 +1,46 @@
#!/bin/bash
echo "Running Coordination Command"
echo "Setting up networking for spark..."
echo ""
echo "CPP_NODES:"
echo $CCP_NODES
echo "AZ_BATCH_NODE_LIST:"
echo $AZ_BATCH_NODE_LIST
echo "AZ_BATCH_HOST_LIST:"
echo $AZ_BATCH_HOST_LIST
echo "AZ_BATCH_MASTER_NODE:"
echo $AZ_BATCH_MASTER_NODE
echo "AZ_BATCH_TASK_SHARED_DIR:"
echo $AZ_BATCH_TASK_SHARED_DIR
echo "AZ_BATCH_IS_CURRENT_NODE_MASTER:"
echo $AZ_BATCH_IS_CURRENT_NODE_MASTER
echo "AZ_BATCH_NODE_SHARED_DIR:"
echo $AZ_BATCH_NODE_SHARED_DIR
export SPARK_HOME=/dsvm/tools/spark/spark-2.0.2
export PATH=$PATH:$SPARK_HOME/bin
echo "SPARK_HOME"
echo $SPARK_HOME
cp $SPARK_HOME/conf/slaves.template $SPARK_HOME/conf/slaves
echo "" >> $SPARK_HOME/conf/slaves # add newline to slaves file
IFS=',' read -r -a workerips <<< $AZ_BATCH_HOST_LIST
for index in "${!workerips[@]}"
do
echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves
echo "${workerips[index]}"
done
echo ""
echo "-------------------------------"
echo ""

Просмотреть файл

@ -0,0 +1,9 @@
#!/bin/bash
echo "Running Spark Install Script"
export SPARK_HOME=/dsvm/tools/spark/spark-2.0.2
export PATH=$PATH:$SPARK_HOME/bin
chmod -R 777 $SPARK_HOME
exit 0

Просмотреть файл

@ -0,0 +1,68 @@
#!/bin/bash
echo "Running Application Command"
echo ""
echo "CPP_NODES:"
echo $CCP_NODES
echo "AZ_BATCH_NODE_LIST:"
echo $AZ_BATCH_NODE_LIST
echo "AZ_BATCH_HOST_LIST:"
echo $AZ_BATCH_HOST_LIST
echo "AZ_BATCH_MASTER_NODE:"
echo $AZ_BATCH_MASTER_NODE
echo "AZ_BATCH_TASK_SHARED_DIR:"
echo $AZ_BATCH_TASK_SHARED_DIR
echo "AZ_BATCH_IS_CURRENT_NODE_MASTER:"
echo $AZ_BATCH_IS_CURRENT_NODE_MASTER
export SPARK_HOME=/dsvm/tools/spark/spark-2.0.2
export PATH=$PATH:$SPARK_HOME/bin
echo "SPARK_HOME:"
echo $SPARK_HOME
cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
# split $AZ_BATCH_MASTER_NODE (10.0.0.X:PORT) to only IP portion
m=${AZ_BATCH_MASTER_NODE%:*}
# add IP of master-node to $SPARK_HOME/conf/spark-env.sh
echo $m >> $SPARK_HOME/conf/spark-env.sh
echo ""
echo "----------------------------"
echo ""
echo "Running start-all.sh to start the spark cluster:"
# start spark cluster - run in background process
bash $SPARK_HOME/sbin/start-all.sh &
# install and setup jupyter
# TODO this needs to run with root user (but running the multi-instance task on run_elavated will cause the start-all.sh script to fail)
<< INSTALL_JUPYTER
pip install jupyter
pip install toree
jupyter toree install --spark_home=$SPARK_HOME --interpreters=PySpark
jupyter notebook --no-browser
INSTALL_JUPYTER
# command to start spark with pyspark once ssh'ed into master node:
<< START_PYSPARK
path/to/spark_home/bin/pyspark --master spark://localhost:7077
START_PYSPARK
# command to run spark job with submit-job once ssh'ed into master node:
<< SUBMIT_SPARK_SCRIPT
path/to/spark_home/bin/spark-submit \
--master spark://localhost:7077 \
path/to/spark_home/examples/src/main/python/pi.py \
1000
SUBMIT_SPARK_SCRIPT

Просмотреть файл

@ -0,0 +1,27 @@
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()

Просмотреть файл

@ -0,0 +1,59 @@
#!/bin/bash
echo "Running Coordination Command"
echo "Setting up networking for spark..."
echo ""
echo "CPP_NODES:"
echo $CCP_NODES
echo "AZ_BATCH_NODE_LIST:"
echo $AZ_BATCH_NODE_LIST
echo "AZ_BATCH_HOST_LIST:"
echo $AZ_BATCH_HOST_LIST
echo "AZ_BATCH_MASTER_NODE:"
echo $AZ_BATCH_MASTER_NODE
echo "AZ_BATCH_TASK_SHARED_DIR:"
echo $AZ_BATCH_TASK_SHARED_DIR
echo "AZ_BATCH_IS_CURRENT_NODE_MASTER:"
echo $AZ_BATCH_IS_CURRENT_NODE_MASTER
echo "AZ_BATCH_NODE_SHARED_DIR:"
echo $AZ_BATCH_NODE_SHARED_DIR
export JAVA_HOME=/usr/lib/jvm/java-8-oracle
export PATH=$PATH:$JAVA_HOME/bin
echo "JAVA_HOME:"
echo $JAVA_HOME
cd $AZ_BATCH_NODE_SHARED_DIR
tar xvf spark-2.1.0-bin-hadoop2.7.tgz
cp spark-2.1.0-bin-hadoop2.7/conf/slaves.template spark-2.1.0-bin-hadoop2.7/conf/slaves
# pushd $AZ_BATCH_NODE_SHARED_DIR
# tar xvf spark-2.1.0-bin-hadoop2.7.tgz
# cp spark-2.1.0-bin-hadoop2.7/conf/slaves.template spark-2.1.0-bin-hadoop2.7/conf/slaves
# popd
export SPARK_HOME=$AZ_BATCH_NODE_SHARED_DIR/spark-2.1.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
echo "SPARK_HOME"
echo $SPARK_HOME
echo "" >> $SPARK_HOME/conf/slaves # add newline to slaves file
IFS=',' read -r -a workerips <<< $AZ_BATCH_HOST_LIST
for index in "${!workerips[@]}"
do
echo "${workerips[index]}" >> $SPARK_HOME/conf/slaves
echo "${workerips[index]}"
done
echo ""
echo "-------------------------------"
echo ""

Просмотреть файл

@ -0,0 +1,45 @@
#!/bin/bash
echo "Running Spark Install Script"
echo 'debconf debconf/frontend select Noninteractive' | sudo debconf-set-selections
# installation of Oracle Java JDK.
sudo apt-get -y update
sudo apt-get -y install python-software-properties
sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get -y update
echo debconf shared/accepted-oracle-license-v1-1 select true | sudo debconf-set-selections
echo debconf shared/accepted-oracle-license-v1-1 seen true | sudo debconf-set-selections
sudo apt-get -y install oracle-java8-installer
# Install pip(3) for package management
sudo apt-get -y install python-pip
pip install --upgrade pip
# Installation of commonly used python scipy tools
sudo apt-get -y install python-numpy python-scipy python-matplotlib ipython ipython-notebook python-pandas python-sympy python-nose
# Installation of scala
wget http://www.scala-lang.org/files/archive/scala-2.11.1.deb
sudo dpkg -i scala-2.11.1.deb
sudo apt-get -y update
sudo apt-get -y install scala
# Installation of sbt
# http://www.scala-sbt.org/0.13/docs/Installing-sbt-on-Linux.html
echo "deb http://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
sudo apt-get update
sudo apt-get install sbt
# Downloading spark
wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
cp spark-2.1.0-bin-hadoop2.7.tgz $AZ_BATCH_NODE_SHARED_DIR
sudo chmod -R 777 $AZ_BATCH_NODE_SHARED_DIR/spark-2.1.0-bin-hadoop2.7
# tar xvf spark-2.1.0-bin-hadoop2.7.tgz
# cp spark-2.1.0-bin-hadoop2.7/conf/slaves.template spark-2.1.0-bin-hadoop2.7/conf/slaves
# cp -r spark-2.1.0-bin-hadoop2.7 $AZ_BATCH_NODE_SHARED_DIR
exit 0

Просмотреть файл

@ -0,0 +1,60 @@
#!/bin/bash
echo "Running Application Command"
echo ""
echo "CPP_NODES:"
echo $CCP_NODES
echo "AZ_BATCH_NODE_LIST:"
echo $AZ_BATCH_NODE_LIST
echo "AZ_BATCH_HOST_LIST:"
echo $AZ_BATCH_HOST_LIST
echo "AZ_BATCH_MASTER_NODE:"
echo $AZ_BATCH_MASTER_NODE
echo "AZ_BATCH_TASK_SHARED_DIR:"
echo $AZ_BATCH_TASK_SHARED_DIR
echo "AZ_BATCH_IS_CURRENT_NODE_MASTER:"
echo $AZ_BATCH_IS_CURRENT_NODE_MASTER
export SPARK_HOME=$AZ_BATCH_NODE_SHARED_DIR/spark-2.1.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
echo "SPARK_HOME:"
echo $SPARK_HOME
echo ""
echo "----------------------------"
echo ""
echo "Running start-all.sh to start the spark cluster:"
# start spark cluster - run in background process
bash $SPARK_HOME/sbin/start-all.sh &
# install and setup jupyter
# TODO this needs to run with root user (but running the multi-instance task on run_elavated will cause the start-all.sh script to fail)
<< INSTALL_JUPYTER
pip install jupyter
pip install toree
jupyter toree install --spark_home=$SPARK_HOME --interpreters=PySpark
jupyter notebook --no-browser
INSTALL_JUPYTER
# command to start spark with pyspark once ssh'ed into master node:
<< START_PYSPARK
path/to/spark_home/bin/pyspark --master spark://localhost:7077
START_PYSPARK
# command to run spark job with submit-job once ssh'ed into master node:
<< SUBMIT_SPARK_SCRIPT
path/to/spark_home/bin/spark-submit \
--master spark://localhost:7077 \
path/to/spark_home/examples/src/main/python/pi.py \
1000
SUBMIT_SPARK_SCRIPT