зеркало из https://github.com/Azure/aztk.git
Feature: Azure Files (#241)
* initial take on installing azure files * fix cluster.yaml parsing of files shares * remove test code * add docs for Azure Files
This commit is contained in:
Родитель
62f3995c2c
Коммит
cabcc29b3c
|
@ -80,6 +80,7 @@ class ClusterConfig:
|
|||
self.username = None
|
||||
self.password = None
|
||||
self.custom_scripts = None
|
||||
self.file_shares = None
|
||||
self.docker_repo = None
|
||||
self.wait = None
|
||||
|
||||
|
@ -126,6 +127,9 @@ class ClusterConfig:
|
|||
if config.get('custom_scripts') not in [[None], None]:
|
||||
self.custom_scripts = config['custom_scripts']
|
||||
|
||||
if config.get('azure_files') not in [[None], None]:
|
||||
self.file_shares = config['azure_files']
|
||||
|
||||
if config.get('docker_repo') is not None:
|
||||
self.docker_repo = config['docker_repo']
|
||||
|
||||
|
|
|
@ -65,6 +65,20 @@ def execute(args: typing.NamedTuple):
|
|||
else:
|
||||
custom_scripts = None
|
||||
|
||||
if cluster_conf.file_shares:
|
||||
file_shares = []
|
||||
for file_share in cluster_conf.file_shares:
|
||||
file_shares.append(
|
||||
aztk_sdk.models.FileShare(
|
||||
storage_account_name=file_share['storage_account_name'],
|
||||
storage_account_key=file_share['storage_account_key'],
|
||||
file_share_path=file_share['file_share_path'],
|
||||
mount_path=file_share['mount_path']
|
||||
)
|
||||
)
|
||||
else:
|
||||
file_shares = None
|
||||
|
||||
jars_src = aztk_sdk.utils.constants.DEFAULT_SPARK_JARS_SOURCE
|
||||
|
||||
# create spark cluster
|
||||
|
@ -75,6 +89,7 @@ def execute(args: typing.NamedTuple):
|
|||
vm_low_pri_count=cluster_conf.size_low_pri,
|
||||
vm_size=cluster_conf.vm_size,
|
||||
custom_scripts=custom_scripts,
|
||||
file_shares=file_shares,
|
||||
docker_repo=cluster_conf.docker_repo,
|
||||
spark_configuration=aztk_sdk.spark.models.SparkConfiguration(
|
||||
spark_defaults_conf=os.path.join(
|
||||
|
@ -119,6 +134,7 @@ def print_cluster_conf(cluster_conf):
|
|||
log.info("> low priority: %s", cluster_conf.size_low_pri)
|
||||
log.info("spark cluster vm size: %s", cluster_conf.vm_size)
|
||||
log.info("custom scripts: %s", cluster_conf.custom_scripts)
|
||||
log.info("file shares: %s", len(cluster_conf.file_shares) if cluster_conf.file_shares is not None else 0)
|
||||
log.info("docker repo name: %s", cluster_conf.docker_repo)
|
||||
log.info("wait for cluster: %s", cluster_conf.wait)
|
||||
log.info("username: %s", cluster_conf.username)
|
||||
|
|
|
@ -2,6 +2,15 @@ from typing import List
|
|||
import aztk_sdk.utils.constants as constants
|
||||
import azure.batch.models as batch_models
|
||||
|
||||
class FileShare:
|
||||
def __init__(self, storage_account_name: str = None,
|
||||
storage_account_key: str = None,
|
||||
file_share_path: str = None,
|
||||
mount_path: str = None):
|
||||
self.storage_account_name = storage_account_name
|
||||
self.storage_account_key = storage_account_key
|
||||
self.file_share_path = file_share_path
|
||||
self.mount_path = mount_path
|
||||
|
||||
class CustomScript:
|
||||
def __init__(self, name: str = None, script: str = None, run_on=None):
|
||||
|
@ -14,6 +23,7 @@ class ClusterConfiguration:
|
|||
def __init__(
|
||||
self,
|
||||
custom_scripts: List[CustomScript] = None,
|
||||
file_shares: List[FileShare] = None,
|
||||
cluster_id: str = None,
|
||||
vm_count=None,
|
||||
vm_low_pri_count=None,
|
||||
|
@ -21,6 +31,7 @@ class ClusterConfiguration:
|
|||
docker_repo: str=None):
|
||||
|
||||
self.custom_scripts = custom_scripts
|
||||
self.file_shares = file_shares
|
||||
self.cluster_id = cluster_id
|
||||
self.vm_count = vm_count
|
||||
self.vm_size = vm_size
|
||||
|
|
|
@ -21,7 +21,10 @@ class Client(BaseClient):
|
|||
def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = False):
|
||||
try:
|
||||
zip_resource_files = upload_node_scripts.zip_scripts(self.blob_client, cluster_conf.custom_scripts, cluster_conf.spark_configuration)
|
||||
start_task = create_cluster_helper.generate_cluster_start_task(self, zip_resource_files, cluster_conf.docker_repo)
|
||||
start_task = create_cluster_helper.generate_cluster_start_task(self,
|
||||
zip_resource_files,
|
||||
cluster_conf.docker_repo,
|
||||
cluster_conf.file_shares)
|
||||
|
||||
software_metadata_key = "spark"
|
||||
|
||||
|
@ -32,17 +35,17 @@ class Client(BaseClient):
|
|||
|
||||
cluster = self.__create_pool_and_job(
|
||||
cluster_conf, software_metadata_key, start_task, vm_image)
|
||||
|
||||
|
||||
# Wait for the master to be ready
|
||||
if wait:
|
||||
util.wait_for_master_to_be_ready(self, cluster.id)
|
||||
cluster = self.get_cluster(cluster.id)
|
||||
|
||||
|
||||
return cluster
|
||||
|
||||
|
||||
except batch_error.BatchErrorException as e:
|
||||
raise error.AztkError(helpers.format_batch_exception(e))
|
||||
|
||||
|
||||
def create_clusters_in_parallel(self, cluster_confs):
|
||||
for cluster_conf in cluster_confs:
|
||||
self.create_cluster(cluster_conf)
|
||||
|
@ -59,7 +62,7 @@ class Client(BaseClient):
|
|||
return models.Cluster(pool, nodes)
|
||||
except batch_error.BatchErrorException as e:
|
||||
raise error.AztkError(helpers.format_batch_exception(e))
|
||||
|
||||
|
||||
def list_clusters(self):
|
||||
try:
|
||||
return [models.Cluster(pool) for pool in self.__list_clusters(aztk_sdk.models.Software.spark)]
|
||||
|
@ -77,11 +80,11 @@ class Client(BaseClient):
|
|||
submit_helper.submit_application(self, cluster_id, application, wait)
|
||||
except batch_error.BatchErrorException as e:
|
||||
raise error.AztkError(helpers.format_batch_exception(e))
|
||||
|
||||
|
||||
def submit_all_applications(self, cluster_id: str, applications):
|
||||
for application in applications:
|
||||
self.submit(cluster_id, application)
|
||||
|
||||
|
||||
def wait_until_application_done(self, cluster_id: str, task_id: str):
|
||||
try:
|
||||
helpers.wait_for_task_to_complete(job_id=cluster_id, task_id=task_id, batch_client=self.batch_client)
|
||||
|
@ -102,7 +105,7 @@ class Client(BaseClient):
|
|||
return models.Cluster(pool, nodes)
|
||||
except batch_error.BatchErrorException as e:
|
||||
raise error.AztkError(helpers.format_batch_exception(e))
|
||||
|
||||
|
||||
def wait_until_all_clusters_are_ready(self, clusters: List[str]):
|
||||
for cluster_id in clusters:
|
||||
self.wait_until_cluster_is_ready(cluster_id)
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
from typing import List
|
||||
from aztk_sdk.utils.command_builder import CommandBuilder
|
||||
from aztk_sdk.utils import helpers
|
||||
from aztk_sdk.utils import constants
|
||||
from aztk_sdk.utils import constants, helpers
|
||||
from aztk_sdk import models as aztk_models
|
||||
|
||||
import azure.batch.models as batch_models
|
||||
POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
|
||||
auto_user=batch_models.AutoUserSpecification(
|
||||
|
@ -10,7 +12,7 @@ POOL_ADMIN_USER_IDENTITY = batch_models.UserIdentity(
|
|||
'''
|
||||
Cluster create helper methods
|
||||
'''
|
||||
def __docker_run_cmd(docker_repo: str = None) -> str:
|
||||
def __docker_run_cmd(docker_repo: str = None, file_mounts = []) -> str:
|
||||
"""
|
||||
Build the docker run command by setting up the environment variables
|
||||
"""
|
||||
|
@ -20,6 +22,10 @@ def __docker_run_cmd(docker_repo: str = None) -> str:
|
|||
cmd.add_option('--name', constants.DOCKER_SPARK_CONTAINER_NAME)
|
||||
cmd.add_option('-v', '/mnt/batch/tasks:/batch')
|
||||
|
||||
if file_mounts:
|
||||
for mount in file_mounts:
|
||||
cmd.add_option('-v', '{0}:{0}'.format(mount.mount_path))
|
||||
|
||||
cmd.add_option('-e', 'DOCKER_WORKING_DIR=/batch/startup/wd')
|
||||
cmd.add_option('-e', 'AZ_BATCH_ACCOUNT_NAME=$AZ_BATCH_ACCOUNT_NAME')
|
||||
cmd.add_option('-e', 'BATCH_ACCOUNT_KEY=$BATCH_ACCOUNT_KEY')
|
||||
|
@ -69,14 +75,30 @@ def __get_docker_credentials(spark_client):
|
|||
return creds
|
||||
|
||||
def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
|
||||
docker_repo: str = None):
|
||||
docker_repo: str = None,
|
||||
file_mounts = []):
|
||||
"""
|
||||
For Docker on ubuntu 16.04 - return the command line
|
||||
to be run on the start task of the pool to setup spark.
|
||||
"""
|
||||
docker_repo = docker_repo or constants.DEFAULT_DOCKER_REPO
|
||||
|
||||
ret = [
|
||||
shares = []
|
||||
|
||||
if file_mounts:
|
||||
for mount in file_mounts:
|
||||
# Create the directory on the node
|
||||
shares.append('mkdir -p {0}'.format(mount.mount_path))
|
||||
|
||||
# Mount the file share
|
||||
shares.append('mount -t cifs //{0}.file.core.windows.net/{2} {3} -o vers=3.0,username={0},password={1},dir_mode=0777,file_mode=0777,sec=ntlmssp'.format(
|
||||
mount.storage_account_name,
|
||||
mount.storage_account_key,
|
||||
mount.file_share_path,
|
||||
mount.mount_path
|
||||
))
|
||||
|
||||
setup = [
|
||||
'apt-get -y clean',
|
||||
'apt-get -y update',
|
||||
'apt-get install --fix-missing',
|
||||
|
@ -87,15 +109,17 @@ def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
|
|||
'/bin/bash $AZ_BATCH_TASK_WORKING_DIR/setup_node.sh {0} {1} "{2}"'.format(
|
||||
constants.DOCKER_SPARK_CONTAINER_NAME,
|
||||
docker_repo,
|
||||
__docker_run_cmd(docker_repo)),
|
||||
__docker_run_cmd(docker_repo, file_mounts)),
|
||||
]
|
||||
|
||||
return ret
|
||||
commands = shares + setup
|
||||
return commands
|
||||
|
||||
def generate_cluster_start_task(
|
||||
spark_client,
|
||||
zip_resource_file: batch_models.ResourceFile,
|
||||
docker_repo: str = None):
|
||||
docker_repo: str = None,
|
||||
file_shares: List[aztk_models.FileShare] = None):
|
||||
"""
|
||||
This will return the start task object for the pool to be created.
|
||||
:param cluster_id str: Id of the cluster(Used for uploading the resource files)
|
||||
|
@ -103,7 +127,6 @@ def generate_cluster_start_task(
|
|||
"""
|
||||
|
||||
resource_files = [zip_resource_file]
|
||||
|
||||
spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT
|
||||
spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT
|
||||
spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT
|
||||
|
@ -132,7 +155,7 @@ def generate_cluster_start_task(
|
|||
] + __get_docker_credentials(spark_client)
|
||||
|
||||
# start task command
|
||||
command = __cluster_install_cmd(zip_resource_file, docker_repo)
|
||||
command = __cluster_install_cmd(zip_resource_file, docker_repo, file_shares)
|
||||
|
||||
return batch_models.StartTask(
|
||||
command_line=helpers.wrap_commands_in_shell(command),
|
||||
|
|
|
@ -60,6 +60,7 @@ class ClusterConfiguration(aztk_sdk.models.ClusterConfiguration):
|
|||
def __init__(
|
||||
self,
|
||||
custom_scripts: List[CustomScript] = None,
|
||||
file_shares: List[aztk_sdk.models.FileShare] = None,
|
||||
cluster_id: str = None,
|
||||
vm_count=None,
|
||||
vm_low_pri_count=None,
|
||||
|
@ -71,7 +72,8 @@ class ClusterConfiguration(aztk_sdk.models.ClusterConfiguration):
|
|||
vm_count=vm_count,
|
||||
vm_low_pri_count=vm_low_pri_count,
|
||||
vm_size=vm_size,
|
||||
docker_repo=docker_repo
|
||||
docker_repo=docker_repo,
|
||||
file_shares=file_shares
|
||||
)
|
||||
self.spark_configuration = spark_configuration
|
||||
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
# Azure Files
|
||||
|
||||
The ability to load a file share on the cluster is really useful when you want to be able to share data across all the nodes, and/or want that data to be persisted longer than the lifetime of the cluster. [Azure Files](https://docs.microsoft.com/azure/storage/files/storage-files-introduction) provides a very easy way to mount a share into the cluster and have it accessible to all nodes. This is useful in cases where you have small data sets you want to process (less than 1GB) or have notebooks that you want to re-use between clusters.
|
||||
|
||||
Mounting an Azure Files share in the cluster only required updating the cluster.yaml file at `.aztk/cluster.yaml`. For example, the following configuration will load two files shares into the cluster, one with my notebooks and one will a small data set that I have previously uploaded to Azure Files.
|
||||
|
||||
```yaml
|
||||
azure_files:
|
||||
- storage_account_name: STORAGE_ACCOUNT_NAME
|
||||
storage_account_key: STORAGE_ACCOUNT_KEY
|
||||
# Name of the file share in Azure Files
|
||||
file_share_path: data
|
||||
# Mount point on the node in the cluster
|
||||
mount_path: /mnt/data
|
||||
- storage_account_name: STORAGE_ACCOUNT_NAME
|
||||
storage_account_key: STORAGE_ACCOUNT_KEY
|
||||
# Name of the file share in Azure Files
|
||||
file_share_path: notebooks
|
||||
# Mount point on the node in the cluster
|
||||
mount_path: /mnt/notebooks
|
||||
```
|
||||
|
||||
From the cluster I can now access both of these file shares directly simply by navigating to /mnt/data or /mnt/notebooks respectively.
|
|
@ -13,8 +13,7 @@ custom_script_dir=$DOCKER_WORKING_DIR/custom-scripts
|
|||
# Preload jupyter samples
|
||||
# TODO: remove when we support uploading random (non-executable) files as part custom-scripts
|
||||
# -----------------------
|
||||
mkdir /jupyter
|
||||
mkdir /jupyter/samples
|
||||
mkdir -p /jupyter/samples
|
||||
|
||||
# add all files from 'jupyter-samples' to container folder '/pyspark/samples'
|
||||
for file in $(dirname $0)/jupyter-samples/*; do
|
||||
|
|
Загрузка…
Ссылка в новой задаче