This commit is contained in:
Timothee Guerin 2018-02-26 16:36:31 -08:00 коммит произвёл GitHub
Родитель b833561c7e
Коммит c724d9403f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
48 изменённых файлов: 1157 добавлений и 305 удалений

Просмотреть файл

@ -4,10 +4,7 @@ indent_size = 4
insert_final_newline = true
trim_trailing_whitespace = true
[*.json]
indent_size = 2
[*.yml]
[*.{json,yml,yaml}]
indent_size = 2
[*.xml]

4
.vscode/launch.json поставляемый
Просмотреть файл

@ -9,10 +9,10 @@
"type": "python",
"request": "launch",
"stopOnEntry": false,
"internalConsoleOptions": "openOnSessionStart",
"pythonPath": "${config:python.pythonPath}",
"program": "${workspaceFolder}/cli/entrypoint.py",
"cwd": "${workspaceFolder}",
"console": "integratedTerminal",
"args": [
"spark", "cluster", "list"
],
@ -27,9 +27,9 @@
"type": "python",
"request": "launch",
"stopOnEntry": false,
"internalConsoleOptions": "openOnSessionStart",
"pythonPath": "${config:python.pythonPath}",
"program": "${workspaceFolder}/cli/entrypoint.py",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}",
"args": [
"spark", "cluster", "create", "--id", "spark-debug"

Просмотреть файл

@ -1,6 +1,7 @@
import asyncio
import concurrent.futures
import sys
import yaml
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta, timezone
@ -25,6 +26,7 @@ class Client:
self.batch_client = azure_api.make_batch_client(secrets_config)
self.blob_client = azure_api.make_blob_client(secrets_config)
'''
General Batch Operations
'''
@ -54,7 +56,7 @@ class Client:
return job_exists or pool_exists
def __create_pool_and_job(self, cluster_conf, software_metadata_key: str, start_task, VmImageModel):
def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, VmImageModel):
"""
Create a pool and job
:param cluster_conf: the configuration object used to create the cluster
@ -64,6 +66,7 @@ class Client:
:param VmImageModel: the type of image to provision for the cluster
:param wait: wait until the cluster is ready
"""
helpers.save_cluster_config(cluster_conf, self.blob_client)
# reuse pool_id as job_id
pool_id = cluster_conf.cluster_id
job_id = cluster_conf.cluster_id

Просмотреть файл

@ -1,15 +1,31 @@
"""
Contains all errors used in Aztk.
All error should inherit from `AztkError`
"""
class AztkError(Exception):
def __init__(self, message: str = None):
super().__init__()
self.message = message
super().__init__(message)
class ClusterNotReadyError(AztkError):
pass
class AzureApiInitError(AztkError):
def __init__(self, message: str = None):
super().__init__()
self.message = message
pass
class InvalidPluginConfigurationError(AztkError):
pass
class InvalidModelError(AztkError):
pass
class MissingRequiredAttributeError(InvalidModelError):
pass
class InvalidCustomScriptError(InvalidModelError):
pass
class InvalidPluginReferenceError(InvalidModelError):
pass

Просмотреть файл

@ -0,0 +1,5 @@
"""
Module containing classes used in the library but without any use for SDK user
"""
from .configuration_base import *

Просмотреть файл

@ -0,0 +1,42 @@
import yaml
from aztk.error import AztkError
class ConfigurationBase:
"""
Base class for any configuration.
Include methods to help with validation
"""
@classmethod
def from_dict(cls, args: dict):
"""
Create a new model from a dict values
The dict is cleaned from null values and passed expanded to the constructor
"""
try:
clean = dict((k, v) for k, v in args.items() if v)
return cls(**clean)
except TypeError as e:
pretty_args = yaml.dump(args, default_flow_style=False)
raise AztkError("{0} {1}\n{2}".format(cls.__name__, str(e), pretty_args))
def validate(self):
raise NotImplementedError("Validate not implemented")
def valid(self):
try:
self.validate()
return True
except AztkError:
return False
def _validate_required(self, attrs):
for attr in attrs:
if not getattr(self, attr):
raise AztkError("{0} missing {1}.".format(self.__class__.__name__, attr))
def _merge_attributes(self, other, attrs):
for attr in attrs:
val = getattr(other, attr)
if val is not None:
setattr(self, attr, val)

1
aztk/models/__init__.py Normal file
Просмотреть файл

@ -0,0 +1 @@
from .models import *

Просмотреть файл

@ -1,37 +1,14 @@
import io
from typing import List
import azure.batch.models as batch_models
import aztk.utils.constants as constants
from aztk import error
from aztk.utils import constants
import azure.batch.models as batch_models
from aztk.models.plugins import PluginConfiguration
from aztk.internal import ConfigurationBase
import yaml
import logging
class ConfigurationBase:
"""
Base class for any configuration.
Include methods to help with validation
"""
def validate(self):
raise NotImplementedError("Validate not implemented")
def valid(self):
try:
self.validate()
return True
except error.AztkError:
return False
def _validate_required(self, attrs):
for attr in attrs:
if not getattr(self, attr):
raise error.AztkError(
"{0} missing {1}.".format(self.__class__.__name__, attr))
def _merge_attributes(self, other, attrs):
for attr in attrs:
val = getattr(other, attr)
if val is not None:
setattr(self, attr, val)
class FileShare:
def __init__(self,
storage_account_name: str = None,
@ -57,7 +34,10 @@ class CustomScript:
class UserConfiguration(ConfigurationBase):
def __init__(self, username: str, ssh_key: str = None, password: str = None):
def __init__(self,
username: str,
ssh_key: str = None,
password: str = None):
self.username = username
self.ssh_key = ssh_key
self.password = password
@ -69,21 +49,24 @@ class UserConfiguration(ConfigurationBase):
"password",
])
class ClusterConfiguration(ConfigurationBase):
"""
Cluster configuration model
"""
def __init__(
self,
custom_scripts: List[CustomScript] = None,
file_shares: List[FileShare] = None,
cluster_id: str = None,
vm_count=0,
vm_low_pri_count=0,
vm_size=None,
subnet_id=None,
docker_repo: str=None,
user_configuration: UserConfiguration=None):
def __init__(self,
custom_scripts: List[CustomScript] = None,
file_shares: List[FileShare] = None,
cluster_id: str = None,
vm_count=0,
vm_low_pri_count=0,
vm_size=None,
subnet_id=None,
docker_repo: str = None,
plugins: List[PluginConfiguration] = None,
user_configuration: UserConfiguration = None):
super().__init__()
self.custom_scripts = custom_scripts
self.file_shares = file_shares
@ -94,6 +77,7 @@ class ClusterConfiguration(ConfigurationBase):
self.subnet_id = subnet_id
self.docker_repo = docker_repo
self.user_configuration = user_configuration
self.plugins = plugins
def merge(self, other):
"""
@ -110,6 +94,7 @@ class ClusterConfiguration(ConfigurationBase):
"docker_repo",
"vm_count",
"vm_low_pri_count",
"plugins",
])
if other.user_configuration:
@ -118,6 +103,10 @@ class ClusterConfiguration(ConfigurationBase):
else:
self.user_configuration = other.user_configuration
if self.plugins:
for plugin in self.plugins:
plugin.validate()
def mixed_mode(self) -> bool:
return self.vm_count > 0 and self.vm_low_pri_count > 0
@ -133,15 +122,18 @@ class ClusterConfiguration(ConfigurationBase):
if self.vm_count == 0 and self.vm_low_pri_count == 0:
raise error.AztkError(
"Please supply a valid (greater than 0) size or size_low_pri value either in the cluster.yaml configuration file or with a parameter (--size or --size-low-pri)")
"Please supply a valid (greater than 0) size or size_low_pri value either in the cluster.yaml configuration file or with a parameter (--size or --size-low-pri)"
)
if self.vm_size is None:
raise error.AztkError(
"Please supply a vm_size in either the cluster.yaml configuration file or with a parameter (--vm-size)")
"Please supply a vm_size in either the cluster.yaml configuration file or with a parameter (--vm-size)"
)
if self.mixed_mode() and not self.subnet_id:
raise error.AztkError(
"You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes). Set the VNET's subnet_id in your cluster.yaml.")
"You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes). Set the VNET's subnet_id in your cluster.yaml."
)
class RemoteLogin:
@ -217,11 +209,7 @@ class SharedKeyConfiguration(ConfigurationBase):
class DockerConfiguration(ConfigurationBase):
def __init__(
self,
endpoint=None,
username=None,
password=None):
def __init__(self, endpoint=None, username=None, password=None):
self.endpoint = endpoint
self.username = username
@ -232,13 +220,12 @@ class DockerConfiguration(ConfigurationBase):
class SecretsConfiguration(ConfigurationBase):
def __init__(
self,
service_principal=None,
shared_key=None,
docker=None,
ssh_pub_key=None,
ssh_priv_key=None):
def __init__(self,
service_principal=None,
shared_key=None,
docker=None,
ssh_pub_key=None,
ssh_priv_key=None):
self.service_principal = service_principal
self.shared_key = shared_key
self.docker = docker
@ -249,14 +236,16 @@ class SecretsConfiguration(ConfigurationBase):
def validate(self):
if self.service_principal and self.shared_key:
raise error.AztkError(
"Both service_principal and shared_key auth are configured, must use only one")
"Both service_principal and shared_key auth are configured, must use only one"
)
elif self.service_principal:
self.service_principal.validate()
elif self.shared_key:
self.shared_key.validate()
else:
raise error.AztkError(
"Neither service_principal and shared_key auth are configured, must use only one")
"Neither service_principal and shared_key auth are configured, must use only one"
)
def is_aad(self):
return self.service_principal is not None
@ -270,7 +259,9 @@ class VmImage:
class Cluster:
def __init__(self, pool: batch_models.CloudPool, nodes: batch_models.ComputeNodePaged = None):
def __init__(self,
pool: batch_models.CloudPool,
nodes: batch_models.ComputeNodePaged = None):
self.id = pool.id
self.pool = pool
self.nodes = nodes
@ -288,11 +279,13 @@ class Cluster:
self.target_dedicated_nodes = pool.target_dedicated_nodes
self.target_low_pri_nodes = pool.target_low_priority_nodes
class SSHLog():
def __init__(self, output, node_id):
self.output = output
self.node_id = node_id
class Software:
"""
Enum with list of available softwares

Просмотреть файл

@ -0,0 +1,2 @@
from .plugin_file import *
from .plugin_configuration import *

Просмотреть файл

@ -0,0 +1,2 @@
from .plugin_manager import *
from .plugin_reference import *

Просмотреть файл

@ -0,0 +1,73 @@
import os
import inspect
import importlib.util
from aztk.utils import constants
from aztk.error import InvalidPluginReferenceError
from aztk.spark.models import plugins
class PluginArgument:
def __init__(self, name: str, required: bool, default=None):
self.name = name
self.required = required
self.default = default
class PluginManager:
# Indexing of all the predefined plugins
plugins = dict(
jupyter=plugins.JupyterPlugin,
rstudio_server=plugins.RStudioServerPlugin,
hdfs=plugins.HDFSPlugin,
)
def __init__(self):
self.loaded = False
def has_plugin(self, name: str):
return name in self.plugins
def get_plugin(self, name: str, args: dict = None):
args = args or dict()
if not self.has_plugin(name):
raise InvalidPluginReferenceError("Cannot find a plugin with name '{0}'".format(name))
plugin_cls = self.plugins[name]
self._validate_args(plugin_cls, args)
return plugin_cls(**args)
def get_args_for(self, cls):
signature = inspect.signature(cls)
args = dict()
for k, v in signature.parameters.items():
args[k] = PluginArgument(k, default=v.default, required=v.default is inspect.Parameter.empty)
return args
def _validate_args(self, plugin_cls, args: dict):
"""
Validate the given args are valid for the plugin
"""
plugin_args = self.get_args_for(plugin_cls)
self._validate_no_extra_args(plugin_cls, plugin_args, args)
for arg in plugin_args.values():
if args.get(arg.name) is None:
if arg.required:
message = "Missing a required argument {0} for plugin {1}".format(
arg.name, plugin_cls.__name__)
raise InvalidPluginReferenceError(message)
args[arg.name] = arg.default
def _validate_no_extra_args(self, plugin_cls, plugin_args: dict, args: dict):
for name in args:
if not name in plugin_args:
message = "Plugin {0} doesn't have an argument called '{1}'".format(
plugin_cls.__name__, name)
raise InvalidPluginReferenceError(message)
plugin_manager = PluginManager()

Просмотреть файл

@ -0,0 +1,21 @@
from aztk.error import InvalidPluginConfigurationError, InvalidModelError
from aztk.internal import ConfigurationBase
from aztk.models import PluginConfiguration
from .plugin_manager import plugin_manager
class PluginReference(ConfigurationBase):
"""
Contains the configuration to use a plugin
"""
def __init__(self, name, args: dict = None):
super().__init__()
self.name = name
self.args = args or dict()
def get_plugin(self) -> PluginConfiguration:
return plugin_manager.get_plugin(self.name, self.args)
def validate(self) -> bool:
if not self.name:
raise InvalidModelError("Plugin is missing a name")

Просмотреть файл

@ -0,0 +1,74 @@
import inspect
from typing import List, Union
from enum import Enum
from .plugin_file import PluginFile
from aztk.internal import ConfigurationBase
class PluginPort:
"""
Definition for a port that should be opened on node
:param internal: Port on the node
:param public: [Optional] Port available to the user. If none won't open any port to the user
:param name: [Optional] name to differentiate ports if you have multiple
"""
def __init__(self, internal: int, public: Union[int, bool] = False, name=None):
self.internal = internal
self.expose_publicly = bool(public)
self.public_port = None
if self.expose_publicly:
if public is True:
self.public_port = internal
else:
self.public_port = public
self.name = name
class PluginRunTarget(Enum):
Master = "master"
Worker = "worker"
All = "all-nodes"
class PluginConfiguration(ConfigurationBase):
"""
Plugin manifest that should be returned in the main.py of your plugin
:param name: Name of the plugin. Used to reference the plugin
:param runOn: Where the plugin should run
:param files: List of files to upload
:param args:
:param env:
"""
def __init__(self,
name: str,
ports: List[PluginPort] = None,
files: List[PluginFile] = None,
execute: str = None,
args=None,
env=None,
run_on: PluginRunTarget = PluginRunTarget.Master):
self.name = name
# self.docker_image = docker_image
self.run_on = run_on
self.ports = ports or []
self.files = files or []
self.args = args or []
self.env = env or dict()
self.execute = execute
def has_arg(self, name: str):
for x in self.args:
if x.name == name:
return True
else:
return False
def validate(self):
self._validate_required([
"name",
"execute",
])

Просмотреть файл

@ -0,0 +1,28 @@
import io
from typing import Union
class PluginFile:
"""
Reference to a file for a plugin.
"""
def __init__(self, target: str, local_path: str):
self.target = target
self.local_path = local_path
# TODO handle folders?
def content(self):
with open(self.local_path, "r") as f:
return f.read()
class TextPluginFile:
def __init__(self, target: str, content: Union[str,io.StringIO]):
if isinstance(content, str):
self._content = content
else:
self._content = content.getValue()
def content(self):
return self._content

Просмотреть файл

@ -1,2 +1,2 @@
from . import models
from .client import Client
from .models import *
from .client import Client

Просмотреть файл

@ -28,13 +28,15 @@ class Client(BaseClient):
cluster_conf.cluster_id,
cluster_conf.custom_scripts,
cluster_conf.spark_configuration,
cluster_conf.user_configuration)
cluster_conf.user_configuration,
cluster_conf.plugins)
start_task = create_cluster_helper.generate_cluster_start_task(self,
zip_resource_files,
cluster_conf.gpu_enabled(),
cluster_conf.docker_repo,
cluster_conf.file_shares,
cluster_conf.plugins,
cluster_conf.mixed_mode(),
cluster_conf.worker_on_master)

Просмотреть файл

@ -18,6 +18,7 @@ def __docker_run_cmd(docker_repo: str = None,
gpu_enabled: bool = False,
worker_on_master: bool = True,
file_mounts = None,
plugins = None,
mixed_mode = False) -> str:
"""
Build the docker run command by setting up the environment variables
@ -60,23 +61,18 @@ def __docker_run_cmd(docker_repo: str = None,
cmd.add_option('-e', 'SPARK_WORKER_UI_PORT=$SPARK_WORKER_UI_PORT')
cmd.add_option('-e', 'SPARK_CONTAINER_NAME=$SPARK_CONTAINER_NAME')
cmd.add_option('-e', 'SPARK_SUBMIT_LOGS_FILE=$SPARK_SUBMIT_LOGS_FILE')
cmd.add_option('-e', 'SPARK_JUPYTER_PORT=$SPARK_JUPYTER_PORT')
cmd.add_option('-e', 'SPARK_JOB_UI_PORT=$SPARK_JOB_UI_PORT')
cmd.add_option('-p', '8080:8080') # Spark Master UI
cmd.add_option('-p', '7077:7077') # Spark Master
cmd.add_option('-p', '7337:7337') # Spark Shuffle Service
cmd.add_option('-p', '4040:4040') # Job UI
cmd.add_option('-p', '8888:8888') # Jupyter UI
cmd.add_option('-p', '8787:8787') # Rstudio Server
cmd.add_option('-p', '18080:18080') # Spark History Server UI
cmd.add_option('-p', '3022:3022') # Docker SSH
cmd.add_option('-p', '8020:8020') # Namenode IPC: ClientProtocol
cmd.add_option('-p', '9000:9000') # Namenode IPC: ClientProtocol
cmd.add_option('-p', '50010:50010') # Datanode http data transfer
cmd.add_option('-p', '50020:50020') # Datanode IPC metaata operations
cmd.add_option('-p', '50070:50070') # Namenode WebUI
cmd.add_option('-p', '50075:50075') # DataNode WebUI
cmd.add_option('-p', '50090:50090') # Secondary NameNode http address
if plugins:
for plugin in plugins:
for port in plugin.ports:
cmd.add_option('-p', '{0}:{1}'.format(port.internal, port.internal)) # Jupyter UI
cmd.add_option('-d', docker_repo)
cmd.add_argument('/bin/bash /mnt/batch/tasks/startup/wd/docker_main.sh')
@ -133,6 +129,7 @@ def __get_secrets_env(spark_client):
def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
gpu_enabled: bool,
docker_repo: str = None,
plugins = None,
worker_on_master: bool = True,
file_mounts = None,
mixed_mode: bool = False):
@ -170,7 +167,7 @@ def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile,
constants.DOCKER_SPARK_CONTAINER_NAME,
gpu_enabled,
docker_repo,
__docker_run_cmd(docker_repo, gpu_enabled, worker_on_master, file_mounts, mixed_mode)),
__docker_run_cmd(docker_repo, gpu_enabled, worker_on_master, file_mounts, plugins, mixed_mode)),
]
commands = shares + setup
@ -182,6 +179,7 @@ def generate_cluster_start_task(
gpu_enabled: bool,
docker_repo: str = None,
file_shares: List[aztk_models.FileShare] = None,
plugins: List[aztk_models.PluginConfiguration] = None,
mixed_mode: bool = False,
worker_on_master: bool = True):
"""
@ -193,9 +191,7 @@ def generate_cluster_start_task(
resource_files = [zip_resource_file]
spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT
spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT
spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT
spark_job_ui_port = constants.DOCKER_SPARK_JOB_UI_PORT
spark_rstudio_server_port = constants.DOCKER_SPARK_RSTUDIO_SERVER_PORT
spark_container_name = constants.DOCKER_SPARK_CONTAINER_NAME
spark_submit_logs_file = constants.SPARK_SUBMIT_LOGS_FILE
@ -206,16 +202,12 @@ def generate_cluster_start_task(
name="SPARK_WEB_UI_PORT", value=spark_web_ui_port),
batch_models.EnvironmentSetting(
name="SPARK_WORKER_UI_PORT", value=spark_worker_ui_port),
batch_models.EnvironmentSetting(
name="SPARK_JUPYTER_PORT", value=spark_jupyter_port),
batch_models.EnvironmentSetting(
name="SPARK_JOB_UI_PORT", value=spark_job_ui_port),
batch_models.EnvironmentSetting(
name="SPARK_CONTAINER_NAME", value=spark_container_name),
batch_models.EnvironmentSetting(
name="SPARK_SUBMIT_LOGS_FILE", value=spark_submit_logs_file),
batch_models.EnvironmentSetting(
name="SPARK_RSTUDIO_SERVER_PORT", value=spark_rstudio_server_port),
] + __get_docker_credentials(spark_client)
# start task command

Просмотреть файл

@ -0,0 +1 @@
from .models import *

Просмотреть файл

@ -82,6 +82,9 @@ class SharedKeyConfiguration(aztk.models.SharedKeyConfiguration):
class DockerConfiguration(aztk.models.DockerConfiguration):
pass
class PluginConfiguration(aztk.models.PluginConfiguration):
pass
class ClusterConfiguration(aztk.models.ClusterConfiguration):
def __init__(

Просмотреть файл

@ -0,0 +1,3 @@
from .hdfs import *
from .jupyter import *
from .rstudio_server import *

Просмотреть файл

@ -0,0 +1 @@
from .configuration import *

Просмотреть файл

@ -0,0 +1,46 @@
import os
from aztk.models.plugins.plugin_configuration import PluginConfiguration, PluginPort, PluginRunTarget
from aztk.models.plugins.plugin_file import PluginFile
from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
class HDFSPlugin(PluginConfiguration):
def __init__(self):
super().__init__(
name="hdfs",
ports=[
PluginPort(
name="File system metadata operations",
internal=8020,
),
PluginPort(
name="File system metadata operations(Backup)",
internal=9000,
),
PluginPort(
name="Datanode data transfer",
internal=50010,
),
PluginPort(
name="Datanode IPC metadata operations",
internal=50020,
),
PluginPort(
name="Namenode",
internal=50070,
public=True,
),
PluginPort(
name="Datanodes",
internal=50075,
public=True,
),
],
run_on=PluginRunTarget.All,
execute="hdfs.sh",
files=[
PluginFile("hdfs.sh", os.path.join(dir_path, "hdfs.sh")),
],
)

Просмотреть файл

@ -0,0 +1,70 @@
#! /bin/bash
# install dependencies
apt update
apt install -y ssh rsync
# configure and run ssh
cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys
chmod 600 /root/.ssh/id_rsa
chmod 700 /root/.ssh/authorized_keys
mkdir /var/run/sshd
echo 'root:screencast' | chpasswd
sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config
sed -i 's/Port 22/Port 3022/' /etc/ssh/sshd_config
sed -i 's/# Port 22/ Port 3022/' /etc/ssh/ssh_config
sed -i 's/# StrictHostKeyChecking ask/StrictHostKeyChecking no/' /etc/ssh/ssh_config
# SSH login fix. Otherwise user is kicked off after login
sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd
/usr/sbin/sshd
# install and configure hadoop
mkdir /home/hadoop-2.8.2
curl http://apache.claz.org/hadoop/common/hadoop-2.8.2/hadoop-2.8.2.tar.gz | tar -xz -C /home
export HADOOP_HOME=/home/hadoop-2.8.2
echo 'export HADOOP_HOME=/home/hadoop-2.8.2' >> ~/.bashrc
export HADOOP_CONF_DIR=/home/hadoop-2.8.2/etc/hadoop
echo 'export HADOOP_CONF_DIR=/home/hadoop-2.8.2/etc/hadoop' >> ~/.bashrc
export PATH=$PATH:$HADOOP_HOME/bin
echo 'export PATH=$PATH:$HADOOP_HOME/bin' >> ~/.bashrc
# Create a directory for the hadoop file system
mkdir -p /batch/hadoop
echo '<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://'$MASTER_IP':8020</value>
</property>
</configuration>' > $HADOOP_HOME/etc/hadoop/core-site.xml
echo '<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.namenode.datanode.registration.ip-hostname-check</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///batch/hadoop</value>
</property>
</configuration>' > $HADOOP_HOME/etc/hadoop/hdfs-site.xml
# run HDFS
if [ $IS_MASTER -eq "1" ]; then
echo 'starting namenode and datanode'
hdfs namenode -format
$HADOOP_HOME/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode
$HADOOP_HOME/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start datanode
else
echo 'starting datanode - namenode at ' $MASTER_IP ':8020'
$HADOOP_HOME/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start datanode
fi

Просмотреть файл

@ -0,0 +1 @@
from .configuration import *

Просмотреть файл

@ -0,0 +1,23 @@
import os
from aztk.models.plugins.plugin_configuration import PluginConfiguration, PluginPort, PluginRunTarget
from aztk.models.plugins.plugin_file import PluginFile
from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
class JupyterPlugin(PluginConfiguration):
def __init__(self):
super().__init__(
name="jupyter",
ports=[
PluginPort(
internal=8888,
public=True,
),
],
run_on=PluginRunTarget.All,
execute="jupyter.sh",
files=[
PluginFile("jupyter.sh", os.path.join(dir_path, "jupyter.sh")),
],
)

Просмотреть файл

@ -0,0 +1,58 @@
#!/bin/bash
# This custom script only works on images where jupyter is pre-installed on the Docker image
#
# This custom script has been tested to work on the following docker images:
# - aztk/python:spark2.2.0-python3.6.2-base
# - aztk/python:spark2.2.0-python3.6.2-gpu
# - aztk/python:spark2.1.0-python3.6.2-base
# - aztk/python:spark2.1.0-python3.6.2-gpu
if [ "$IS_MASTER" = "1" ]; then
pip install jupyter --upgrade
pip install notebook --upgrade
PYSPARK_DRIVER_PYTHON="/.pyenv/versions/${USER_PYTHON_VERSION}/bin/jupyter"
JUPYTER_KERNELS="/.pyenv/versions/${USER_PYTHON_VERSION}/share/jupyter/kernels"
# disable password/token on jupyter notebook
jupyter notebook --generate-config --allow-root
JUPYTER_CONFIG='/.jupyter/jupyter_notebook_config.py'
echo >> $JUPYTER_CONFIG
echo -e 'c.NotebookApp.token=""' >> $JUPYTER_CONFIG
echo -e 'c.NotebookApp.password=""' >> $JUPYTER_CONFIG
# get master ip
MASTER_IP=$(hostname -i)
# remove existing kernels
rm -rf $JUPYTER_KERNELS/*
# set up jupyter to use pyspark
mkdir $JUPYTER_KERNELS/pyspark
touch $JUPYTER_KERNELS/pyspark/kernel.json
cat << EOF > $JUPYTER_KERNELS/pyspark/kernel.json
{
"display_name": "PySpark",
"language": "python",
"argv": [
"python",
"-m",
"ipykernel",
"-f",
"{connection_file}"
],
"env": {
"SPARK_HOME": "$SPARK_HOME",
"PYSPARK_PYTHON": "python",
"PYSPARK_SUBMIT_ARGS": "--master spark://$MASTER_IP:7077 pyspark-shell"
}
}
EOF
# start jupyter notebook from /mnt - this is where we recommend you put your azure files mount point as well
cd /mnt
(PYSPARK_DRIVER_PYTHON=$PYSPARK_DRIVER_PYTHON PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=8888 --allow-root" pyspark &)
fi

Просмотреть файл

@ -0,0 +1 @@
from .configuration import *

Просмотреть файл

@ -0,0 +1,24 @@
import os
from aztk.models.plugins.plugin_configuration import PluginConfiguration, PluginPort, PluginRunTarget
from aztk.models.plugins.plugin_file import PluginFile
from aztk.utils import constants
dir_path = os.path.dirname(os.path.realpath(__file__))
class RStudioServerPlugin(PluginConfiguration):
def __init__(self, version="1.1.383"):
super().__init__(
name="rstudio_server",
ports=[
PluginPort(
internal=8787,
public=True,
),
],
run_on=PluginRunTarget.Master,
execute="rstudio_server.sh",
files=[
PluginFile("rstudio_server.sh", os.path.join(dir_path, "rstudio_server.sh")),
],
env=dict(RSTUDIO_SERVER_VERSION=version),
)

Просмотреть файл

@ -0,0 +1,25 @@
#!/bin/bash
# This custom script only works on images where rstudio server is pre-installed on the Docker image
#
# This custom script has been tested to work on the following docker images:
# - jiata/aztk-r:0.1.0-spark2.2.0-r3.4.1
# - jiata/aztk-r:0.1.0-spark2.1.0-r3.4.1
# - jiata/aztk-r:0.1.0-spark1.6.3-r3.4.1
if [ "$IS_MASTER" = "1" ]; then
## Download and install Rstudio Server
wget https://download2.rstudio.org/rstudio-server-$RSTUDIO_SERVER_VERSION-amd64.deb
gdebi rstudio-server-$RSTUDIO_SERVER_VERSION-amd64.deb --non-interactive
echo "server-app-armor-enabled=0" | tee -a /etc/rstudio/rserver.conf
rm rstudio-server-$RSTUDIO_SERVER_VERSION-amd64.deb
## Preparing default user for Rstudio Server
set -e
useradd -m -d /home/rstudio rstudio -g staff
echo rstudio:rstudio | chpasswd
rstudio-server start
fi

Просмотреть файл

@ -5,13 +5,17 @@ import datetime
import logging
import zipfile
import yaml
import json
from pathlib import Path
from aztk.utils import constants
from aztk.utils import helpers
from aztk.error import InvalidCustomScriptError
import aztk.spark.models
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
from Crypto.Cipher import AES, PKCS1_OAEP
from aztk.spark.models import ClusterConfiguration, PluginConfiguration
from typing import List
root = constants.ROOT_PATH
@ -88,13 +92,17 @@ def __add_custom_scripts(zipf, custom_scripts):
for index, custom_script in enumerate(custom_scripts):
if isinstance(custom_script.script, (str, bytes)):
new_file_name = str(index) + '_' + os.path.basename(custom_script.script)
with io.open(custom_script.script, 'r') as f:
zipf.writestr(os.path.join('custom-scripts', new_file_name), f.read().replace('\r\n', '\n'))
try:
with io.open(custom_script.script, 'r') as f:
zipf.writestr(os.path.join('custom-scripts', new_file_name), f.read().replace('\r\n', '\n'))
except FileNotFoundError:
raise InvalidCustomScriptError("Custom script '{0}' doesn't exists.".format(custom_script.script))
elif isinstance(custom_script.script, aztk.spark.models.File):
new_file_name = str(index) + '_' + custom_script.script.name
zipf.writestr(os.path.join('custom-scripts', new_file_name), custom_script.script.payload.getvalue())
data.append(dict(script=new_file_name, runOn=str(custom_script.run_on)))
zipf.writestr(os.path.join('custom-scripts', 'custom-scripts.yaml'), yaml.dump(data, default_flow_style=False))
return zipf
@ -106,13 +114,15 @@ def __add_file_to_zip(zipf, file_path, zip_file_path, binary):
zipf = zip_file_to_dir(file_path, zip_file_path, zipf, binary)
return zipf
def __add_str_to_zip(zipf, payload, zipf_file_path=None):
if not payload:
return zipf
zipf.writestr(zipf_file_path, payload)
return zipf
def zip_scripts(blob_client, container_id, custom_scripts, spark_configuration, user_conf=None):
def zip_scripts(blob_client, container_id, custom_scripts, spark_configuration, user_conf=None, plugins=None):
zipf = __create_zip()
if custom_scripts:
zipf = __add_custom_scripts(zipf, custom_scripts)
@ -128,6 +138,9 @@ def zip_scripts(blob_client, container_id, custom_scripts, spark_configuration,
for jar in spark_configuration.jars:
zipf = __add_file_to_zip(zipf, jar, 'jars', binary=True)
if plugins:
zipf = __add_plugins(zipf, plugins)
if user_conf:
encrypted_aes_session_key, cipher_aes_nonce, tag, ciphertext = encrypt_password(spark_configuration.ssh_key_pair['pub_key'], user_conf.password)
user_conf = yaml.dump({'username': user_conf.username,
@ -161,3 +174,20 @@ def encrypt_password(ssh_pub_key, password):
cipher_aes = AES.new(session_key, AES.MODE_EAX)
ciphertext, tag = cipher_aes.encrypt_and_digest(password.encode())
return [encrypted_aes_session_key, cipher_aes.nonce, tag, ciphertext]
def __add_plugins(zipf, plugins: List[PluginConfiguration]):
data = []
for plugin in plugins:
for file in plugin.files:
zipf = __add_str_to_zip(zipf, file.content(), 'plugins/{0}/{1}'.format(plugin.name, file.target))
if plugin.execute:
data.append(dict(
name=plugin.name,
execute='{0}/{1}'.format(plugin.name, plugin.execute),
args=plugin.args,
env=plugin.env,
runOn=plugin.run_on.value,
))
zipf.writestr(os.path.join('plugins', 'plugins-manifest.yaml'), yaml.dump(data))
return zipf

Просмотреть файл

@ -1,5 +1,4 @@
import os
"""
DOCKER
"""
@ -12,30 +11,21 @@ DOCKER_SPARK_CONTAINER_NAME = "spark"
# DOCKER SPARK
DOCKER_SPARK_WEB_UI_PORT = 8080
DOCKER_SPARK_WORKER_UI_PORT = 8081
DOCKER_SPARK_RSTUDIO_SERVER_PORT = 8787
DOCKER_SPARK_JUPYTER_PORT = 8888
DOCKER_SPARK_JOB_UI_PORT = 4040
DOCKER_SPARK_JOB_UI_HISTORY_PORT = 18080
DOCKER_SPARK_HOME = "/home/spark-current"
# DOCKER HDFS
DOCKER_SPARK_NAMENODE_UI_PORT = 50070
"""
Root path of this repository
"""
ROOT_PATH = os.path.normpath(os.path.join(os.path.dirname(__file__), '..', '..'))
"""
User home directory path
"""
HOME_DIRECTORY_PATH = os.path.expanduser('~')
"""
Path to the secrets file
"""
DEFAULT_SECRETS_PATH = os.path.join(os.getcwd(), '.aztk/secrets.yaml')
"""
Paths to the cluster configuration files
"""
@ -50,27 +40,25 @@ DEFAULT_SPARK_JOB_CONFIG = os.path.join(os.getcwd(), '.aztk', 'job.yaml')
GLOBAL_SPARK_JOB_CONFIG = os.path.join(HOME_DIRECTORY_PATH, '.aztk', 'job.yaml')
CUSTOM_SCRIPTS_DEST = os.path.join(ROOT_PATH, 'node_scripts', 'custom-scripts')
"""
Source and destination paths for spark init
"""
INIT_DIRECTORY_SOURCE = os.path.join(ROOT_PATH, 'config')
LOCAL_INIT_DIRECTORY_DEST = os.path.join(os.getcwd(), '.aztk')
GLOBAL_INIT_DIRECTORY_DEST = os.path.join(HOME_DIRECTORY_PATH, '.aztk')
"""
Key of the metadata entry for the pool that is used to store the master node id
"""
MASTER_NODE_METADATA_KEY = "_spark_master_node"
"""
Timeout in seconds to wait for the master to be ready
Value: 20 minutes
"""
WAIT_FOR_MASTER_TIMEOUT = 60 * 20
AZTK_SOFTWARE_METADATA_KEY = "_aztk_software"
AZTK_CLUSTER_CONFIG_METADATA_KEY = "_aztk_cluster_config"
TASK_WORKING_DIR = "wd"
SPARK_SUBMIT_LOGS_FILE = "output.log"

Просмотреть файл

@ -4,6 +4,7 @@ import io
import os
import time
import re
import azure.common
import azure.batch.batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.models as batch_models
@ -12,6 +13,8 @@ from aztk.version import __version__
from aztk.utils import constants
from aztk import error
import aztk.models
import yaml
import logging
_STANDARD_OUT_FILE_NAME = 'stdout.txt'
_STANDARD_ERROR_FILE_NAME = 'stderr.txt'
@ -38,8 +41,10 @@ def wait_for_tasks_to_complete(job_id, batch_client):
while True:
tasks = batch_client.task.list(job_id)
incomplete_tasks = [task for task in tasks if
task.state != batch_models.TaskState.completed]
incomplete_tasks = [
task for task in tasks
if task.state != batch_models.TaskState.completed
]
if not incomplete_tasks:
return
time.sleep(5)
@ -61,9 +66,13 @@ def wait_for_task_to_complete(job_id: str, task_id: str, batch_client):
return
def upload_text_to_container(container_name: str, application_name: str, content: str, file_path: str, blob_client=None) -> batch_models.ResourceFile:
def upload_text_to_container(container_name: str,
application_name: str,
content: str,
file_path: str,
blob_client=None) -> batch_models.ResourceFile:
blob_name = file_path
blob_path = application_name + '/' + blob_name # + '/' + time_stamp + '/' + blob_name
blob_path = application_name + '/' + blob_name # + '/' + time_stamp + '/' + blob_name
blob_client.create_container(container_name, fail_on_exist=False)
blob_client.create_blob_from_text(container_name, blob_path, content)
@ -73,12 +82,10 @@ def upload_text_to_container(container_name: str, application_name: str, content
permission=blob.BlobPermissions.READ,
expiry=datetime.datetime.utcnow() + datetime.timedelta(days=365))
sas_url = blob_client.make_blob_url(container_name,
blob_path,
sas_token=sas_token)
sas_url = blob_client.make_blob_url(
container_name, blob_path, sas_token=sas_token)
return batch_models.ResourceFile(file_path=blob_name,
blob_source=sas_url)
return batch_models.ResourceFile(file_path=blob_name, blob_source=sas_url)
def upload_file_to_container(container_name,
@ -109,12 +116,9 @@ def upload_file_to_container(container_name,
if not node_path:
node_path = blob_name
blob_client.create_container(container_name,
fail_on_exist=False)
blob_client.create_container(container_name, fail_on_exist=False)
blob_client.create_blob_from_path(container_name,
blob_path,
file_path)
blob_client.create_blob_from_path(container_name, blob_path, file_path)
sas_token = blob_client.generate_blob_shared_access_signature(
container_name,
@ -122,12 +126,10 @@ def upload_file_to_container(container_name,
permission=blob.BlobPermissions.READ,
expiry=datetime.datetime.utcnow() + datetime.timedelta(days=7))
sas_url = blob_client.make_blob_url(container_name,
blob_path,
sas_token=sas_token)
sas_url = blob_client.make_blob_url(
container_name, blob_path, sas_token=sas_token)
return batch_models.ResourceFile(file_path=node_path,
blob_source=sas_url)
return batch_models.ResourceFile(file_path=node_path, blob_source=sas_url)
def create_pool_if_not_exist(pool, batch_client):
@ -142,7 +144,9 @@ def create_pool_if_not_exist(pool, batch_client):
batch_client.pool.add(pool)
except batch_models.BatchErrorException as e:
if e.error.code == "PoolExists":
raise error.AztkError("A cluster with the same id already exists. Use a different id or delete the existing cluster")
raise error.AztkError(
"A cluster with the same id already exists. Use a different id or delete the existing cluster"
)
else:
raise
return True
@ -169,8 +173,8 @@ def wait_for_all_nodes_state(pool, node_state, batch_client):
nodes = list(batch_client.compute_node.list(pool.id))
totalNodes = pool.target_dedicated_nodes + pool.target_low_priority_nodes
if (len(nodes) >= totalNodes and
all(node.state in node_state for node in nodes)):
if (len(nodes) >= totalNodes
and all(node.state in node_state for node in nodes)):
return nodes
time.sleep(1)
@ -195,9 +199,9 @@ def select_latest_verified_vm_image_with_node_agent_sku(
skus_to_use = [
(sku, image_ref) for sku in node_agent_skus for image_ref in sorted(
sku.verified_image_references, key=lambda item: item.sku)
if image_ref.publisher.lower() == publisher.lower() and
image_ref.offer.lower() == offer.lower() and
image_ref.sku.startswith(sku_starts_with)
if image_ref.publisher.lower() == publisher.lower()
and image_ref.offer.lower() == offer.lower()
and image_ref.sku.startswith(sku_starts_with)
]
# skus are listed in reverse order, pick first for latest
@ -205,9 +209,12 @@ def select_latest_verified_vm_image_with_node_agent_sku(
return (sku_to_use.id, image_ref_to_use)
def create_sas_token(
container_name, blob_name, permission, blob_client, expiry=None,
timeout=None):
def create_sas_token(container_name,
blob_name,
permission,
blob_client,
expiry=None,
timeout=None):
"""
Create a blob sas token
:param blob_client: The storage block blob client to use.
@ -230,9 +237,12 @@ def create_sas_token(
container_name, blob_name, permission=permission, expiry=expiry)
def upload_blob_and_create_sas(
container_name, blob_name, file_name, expiry, blob_client,
timeout=None):
def upload_blob_and_create_sas(container_name,
blob_name,
file_name,
expiry,
blob_client,
timeout=None):
"""
Uploads a file from local disk to Azure Storage and creates a SAS for it.
:param blob_client: The storage block blob client to use.
@ -247,14 +257,9 @@ def upload_blob_and_create_sas(
:return: A SAS URL to the blob with the specified expiry time.
:rtype: str
"""
blob_client.create_container(
container_name,
fail_on_exist=False)
blob_client.create_container(container_name, fail_on_exist=False)
blob_client.create_blob_from_path(
container_name,
blob_name,
file_name)
blob_client.create_blob_from_path(container_name, blob_name, file_name)
sas_token = create_sas_token(
container_name,
@ -265,9 +270,7 @@ def upload_blob_and_create_sas(
timeout=timeout)
sas_url = blob_client.make_blob_url(
container_name,
blob_name,
sas_token=sas_token)
container_name, blob_name, sas_token=sas_token)
return sas_url
@ -292,8 +295,7 @@ def get_connection_info(pool_id, node_id, batch_client):
:param str pool_id: The pool id to look up
:param str node_id: The node id to look up
"""
rls = batch_client.compute_node.get_remote_login_settings(
pool_id, node_id)
rls = batch_client.compute_node.get_remote_login_settings(pool_id, node_id)
remote_ip = rls.remote_login_ip_address
ssh_port = str(rls.remote_login_port)
return (remote_ip, ssh_port)
@ -313,7 +315,7 @@ def get_cluster_total_current_nodes(pool):
return pool.current_dedicated_nodes + pool.current_low_priority_nodes
def normalize_path(path: str)-> str:
def normalize_path(path: str) -> str:
"""
Convert a path in a path that will work well with blob storage and unix
It will replace \ with / and remove relative .
@ -326,7 +328,8 @@ def normalize_path(path: str)-> str:
return path
def get_file_properties(job_id: str, task_id: str, file_path: str, batch_client):
def get_file_properties(job_id: str, task_id: str, file_path: str,
batch_client):
raw = batch_client.file.get_properties_from_task(
job_id, task_id, file_path, raw=True)
@ -374,3 +377,26 @@ def format_batch_exception(batch_exception):
l.append("-------------------------------------------")
return '\n'.join(l)
def save_cluster_config(cluster_config, blob_client):
blob_path = "config.yaml"
content = yaml.dump(cluster_config)
container_name = cluster_config.cluster_id
blob_client.create_container(container_name, fail_on_exist=False)
blob_client.create_blob_from_text(container_name, blob_path, content)
def read_cluster_config(cluster_id: str, blob_client: blob.BlockBlobService):
blob_path = "config.yaml"
try:
result = blob_client.get_blob_to_text(cluster_id, blob_path)
return yaml.load(result.content)
except azure.common.AzureMissingResourceHttpError:
logging.warn(
"Cluster %s doesn't have cluster configuration in storage",
cluster_id)
except yaml.YAMLError:
logging.warn(
"Cluster %s contains invalid cluster configuration in blob",
cluster_id)

Просмотреть файл

@ -1,10 +1,17 @@
import os
import yaml
import typing
from cli import log
import aztk.spark
from aztk.spark.models import SecretsConfiguration, ServicePrincipalConfiguration, SharedKeyConfiguration, DockerConfiguration, ClusterConfiguration, UserConfiguration
from aztk.spark.models import (
SecretsConfiguration,
ServicePrincipalConfiguration,
SharedKeyConfiguration,
DockerConfiguration,
ClusterConfiguration,
UserConfiguration,
PluginConfiguration,
)
from aztk.models.plugins.internal import PluginReference
def load_aztk_screts() -> SecretsConfiguration:
"""
@ -12,8 +19,9 @@ def load_aztk_screts() -> SecretsConfiguration:
"""
secrets = SecretsConfiguration()
# read global ~/secrets.yaml
global_config = _load_secrets_config(os.path.join(
aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk', 'secrets.yaml'))
global_config = _load_secrets_config(
os.path.join(aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk',
'secrets.yaml'))
# read current working directory secrets.yaml
local_config = _load_secrets_config()
@ -30,7 +38,8 @@ def load_aztk_screts() -> SecretsConfiguration:
return secrets
def _load_secrets_config(path: str = aztk.utils.constants.DEFAULT_SECRETS_PATH):
def _load_secrets_config(
path: str = aztk.utils.constants.DEFAULT_SECRETS_PATH):
"""
Loads the secrets.yaml file in the .aztk directory
"""
@ -64,17 +73,16 @@ def _merge_secrets_dict(secrets: SecretsConfiguration, secrets_config):
if shared_key_config and (batch or storage):
raise aztk.error.AztkError(
"Shared keys must be configured either under 'sharedKey:' or under 'batch:' and 'storage:', not both.")
"Shared keys must be configured either under 'sharedKey:' or under 'batch:' and 'storage:', not both."
)
if shared_key_config:
secrets.shared_key = SharedKeyConfiguration(
batch_account_name=shared_key_config.get('batch_account_name'),
batch_account_key=shared_key_config.get('batch_account_key'),
batch_service_url=shared_key_config.get('batch_service_url'),
storage_account_name=shared_key_config.get(
'storage_account_name'),
storage_account_key=shared_key_config.get(
'storage_account_key'),
storage_account_name=shared_key_config.get('storage_account_name'),
storage_account_key=shared_key_config.get('storage_account_key'),
storage_account_suffix=shared_key_config.get(
'storage_account_suffix'),
)
@ -82,13 +90,12 @@ def _merge_secrets_dict(secrets: SecretsConfiguration, secrets_config):
secrets.shared_key = SharedKeyConfiguration()
if batch:
log.warning(
"Your secrets.yaml format is deprecated. To use shared key authentication use the shared_key key. See config/secrets.yaml.template")
"Your secrets.yaml format is deprecated. To use shared key authentication use the shared_key key. See config/secrets.yaml.template"
)
secrets.shared_key.batch_account_name = batch.get(
'batchaccountname')
secrets.shared_key.batch_account_key = batch.get(
'batchaccountkey')
secrets.shared_key.batch_service_url = batch.get(
'batchserviceurl')
secrets.shared_key.batch_account_key = batch.get('batchaccountkey')
secrets.shared_key.batch_service_url = batch.get('batchserviceurl')
if storage:
secrets.shared_key.storage_account_name = storage.get(
@ -113,7 +120,9 @@ def _merge_secrets_dict(secrets: SecretsConfiguration, secrets_config):
secrets.ssh_pub_key = default_config.get('ssh_pub_key')
def read_cluster_config(path: str = aztk.utils.constants.DEFAULT_CLUSTER_CONFIG_PATH) -> ClusterConfiguration:
def read_cluster_config(
path: str = aztk.utils.constants.DEFAULT_CLUSTER_CONFIG_PATH
) -> ClusterConfiguration:
"""
Reads the config file in the .aztk/ directory (.aztk/cluster.yaml)
"""
@ -155,8 +164,7 @@ def cluster_config_from_dict(config: dict):
if config.get('username') is not None:
output.user_configuration = UserConfiguration(
username=config['username']
)
username=config['username'])
if config.get('password') is not None:
output.user_configuration.password = config['password']
@ -167,9 +175,7 @@ def cluster_config_from_dict(config: dict):
output.custom_scripts.append(
aztk.spark.models.CustomScript(
script=custom_script['script'],
run_on=custom_script['runOn']
)
)
run_on=custom_script['runOn']))
if config.get('azure_files') not in [[None], None]:
output.file_shares = []
@ -180,12 +186,17 @@ def cluster_config_from_dict(config: dict):
storage_account_key=file_share['storage_account_key'],
file_share_path=file_share['file_share_path'],
mount_path=file_share['mount_path'],
)
)
))
if config.get('docker_repo') is not None:
output.docker_repo = config['docker_repo']
if config.get('plugins') not in [[None], None]:
output.plugins = []
for plugin in config['plugins']:
ref = PluginReference.from_dict(plugin)
output.plugins.append(ref.get_plugin())
if config.get('worker_on_master') is not None:
output.worker_on_master = config['worker_on_master']
@ -196,7 +207,6 @@ def cluster_config_from_dict(config: dict):
class SshConfig:
def __init__(self):
self.username = None
self.cluster_id = None
@ -207,11 +217,9 @@ class SshConfig:
self.job_ui_port = '4040'
self.job_history_ui_port = '18080'
self.web_ui_port = '8080'
self.jupyter_port = '8888'
self.name_node_ui_port = '50070'
self.rstudio_server_port = '8787'
def _read_config_file(self, path: str = aztk.utils.constants.DEFAULT_SSH_CONFIG_PATH):
def _read_config_file(
self, path: str = aztk.utils.constants.DEFAULT_SSH_CONFIG_PATH):
"""
Reads the config file in the .aztk/ directory (.aztk/ssh.yaml)
"""
@ -261,11 +269,15 @@ class SshConfig:
if config.get('connect') is not None:
self.connect = config['connect']
def merge(self, cluster_id, username, job_ui_port, job_history_ui_port, web_ui_port, jupyter_port, name_node_ui_port, rstudio_server_port, host, connect):
def merge(self, cluster_id, username, job_ui_port, job_history_ui_port,
web_ui_port, jupyter_port, name_node_ui_port,
rstudio_server_port, host, connect):
"""
Merges fields with args object
"""
self._read_config_file(os.path.join(aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk', 'ssh.yaml'))
self._read_config_file(
os.path.join(aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk',
'ssh.yaml'))
self._read_config_file()
self._merge_dict(
dict(
@ -278,17 +290,17 @@ class SshConfig:
name_node_ui_port=name_node_ui_port,
rstudio_server_port=rstudio_server_port,
host=host,
connect=connect
)
)
connect=connect))
if self.cluster_id is None:
raise aztk.error.AztkError(
"Please supply an id for the cluster either in the ssh.yaml configuration file or with a parameter (--id)")
"Please supply an id for the cluster either in the ssh.yaml configuration file or with a parameter (--id)"
)
if self.username is None:
raise aztk.error.AztkError(
"Please supply a username either in the ssh.yaml configuration file or with a parameter (--username)")
"Please supply a username either in the ssh.yaml configuration file or with a parameter (--username)"
)
class JobConfig():
@ -336,10 +348,13 @@ class JobConfig():
if str_path:
abs_path = os.path.abspath(os.path.expanduser(str_path))
if not os.path.exists(abs_path):
raise aztk.error.AztkError("Could not find file: {0}\nCheck your configuration file".format(str_path))
raise aztk.error.AztkError(
"Could not find file: {0}\nCheck your configuration file".
format(str_path))
return abs_path
def _read_config_file(self, path: str = aztk.utils.constants.DEFAULT_SPARK_JOB_CONFIG):
def _read_config_file(
self, path: str = aztk.utils.constants.DEFAULT_SPARK_JOB_CONFIG):
"""
Reads the Job config file in the .aztk/ directory (.aztk/job.yaml)
"""
@ -368,15 +383,19 @@ class JobConfig():
for entry in self.applications:
if entry['name'] is None:
raise aztk.error.AztkError(
"Application specified with no name. Please verify your configuration in job.yaml")
"Application specified with no name. Please verify your configuration in job.yaml"
)
if entry['application'] is None:
raise aztk.error.AztkError(
"No path to application specified for {} in job.yaml".format(entry['name']))
"No path to application specified for {} in job.yaml".
format(entry['name']))
def get_file_if_exists(file):
local_conf_file = os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, file)
global_conf_file = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH, file)
local_conf_file = os.path.join(
aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, file)
global_conf_file = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH,
file)
if os.path.exists(local_conf_file):
return local_conf_file
@ -399,16 +418,16 @@ def load_jars():
# try load global
try:
jars_src = os.path.join(
aztk.utils.constants.GLOBAL_CONFIG_PATH, 'jars')
jars_src = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH,
'jars')
jars = [os.path.join(jars_src, jar) for jar in os.listdir(jars_src)]
except FileNotFoundError:
pass
# try load local, overwrite if found
try:
jars_src = os.path.join(
aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, 'jars')
jars_src = os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE,
'jars')
jars = [os.path.join(jars_src, jar) for jar in os.listdir(jars_src)]
except FileNotFoundError:
pass

Просмотреть файл

@ -10,7 +10,7 @@ import azure.batch.models.batch_error as batch_error
import aztk
from cli import logger, log, utils, constants
from cli.spark.endpoints import spark
from . import plugins
def main():
parser = argparse.ArgumentParser(prog=constants.CLI_EXE)
@ -22,8 +22,11 @@ def main():
subparsers.required = True
spark_parser = subparsers.add_parser(
"spark", help="Commands to run spark jobs")
plugins_parser = subparsers.add_parser(
"plugins", help="Commands to list and view plugins")
spark.setup_parser(spark_parser)
plugins.setup_parser(plugins_parser)
args = parser.parse_args()
parse_common_args(args)
@ -33,7 +36,7 @@ def main():
except batch_error.BatchErrorException as e:
utils.print_batch_exception(e)
except aztk.error.AztkError as e:
log.error(e.message)
log.error(str(e))
def setup_common_args(parser: argparse.ArgumentParser):
@ -54,6 +57,7 @@ def parse_common_args(args: NamedTuple):
def run_software(args: NamedTuple):
softwares = {}
softwares[aztk.models.Software.spark] = spark.execute
softwares["plugins"] = plugins.execute
func = softwares[args.software]
func(args)

30
cli/plugins.py Normal file
Просмотреть файл

@ -0,0 +1,30 @@
import argparse
import typing
from cli import log
from aztk.models.plugins.internal import plugin_manager
def setup_parser(parser: argparse.ArgumentParser):
pass
def execute(args: typing.NamedTuple):
plugins = plugin_manager.plugins
log.info("------------------------------------------------------")
log.info(" Plugins (%i available)",len(plugins))
log.info("------------------------------------------------------")
for name, plugin in plugins.items():
log.info("- %s", name)
args = plugin_manager.get_args_for(plugin)
if args:
log.info(" Arguments:")
for arg in args.values():
log.info(" - %s", arg_str(arg))
else:
log.info(" Arguments: None")
log.info("")
def arg_str(arg):
required = "Required" if arg.required else "Optional(Default: {0})".format(arg.default)
return "{0}: {1}".format(arg.name, required)

Просмотреть файл

@ -69,7 +69,7 @@ def execute(args: typing.NamedTuple):
else:
cluster_conf.user_configuration = None
print_cluster_conf(cluster_conf, wait)
utils.print_cluster_conf(cluster_conf, wait)
spinner = utils.Spinner()
spinner.start()
@ -86,23 +86,3 @@ def execute(args: typing.NamedTuple):
else:
log.info("Cluster %s is being provisioned.", cluster.id)
def print_cluster_conf(cluster_conf: ClusterConfiguration, wait: bool):
user_configuration = cluster_conf.user_configuration
log.info("-------------------------------------------")
log.info("spark cluster id: %s", cluster_conf.cluster_id)
log.info("spark cluster size: %s",
cluster_conf.vm_count + cluster_conf.vm_low_pri_count)
log.info("> dedicated: %s", cluster_conf.vm_count)
log.info("> low priority: %s", cluster_conf.vm_low_pri_count)
log.info("spark cluster vm size: %s", cluster_conf.vm_size)
log.info("custom scripts: %s", len(cluster_conf.custom_scripts) if cluster_conf.custom_scripts else 0)
log.info("subnet ID: %s", cluster_conf.subnet_id)
log.info("file shares: %s", len(cluster_conf.file_shares) if cluster_conf.file_shares is not None else 0)
log.info("docker repo name: %s", cluster_conf.docker_repo)
log.info("wait for cluster: %s", wait)
log.info("username: %s", user_configuration.username)
if user_configuration.password:
log.info("Password: %s", '*' * len(user_configuration.password))
log.info("-------------------------------------------")

Просмотреть файл

@ -1,6 +1,7 @@
import argparse
import typing
import aztk
from cli import log
from cli import utils, config
@ -9,6 +10,10 @@ def setup_parser(parser: argparse.ArgumentParser):
dest='cluster_id',
required=True,
help='The unique id of your spark cluster')
parser.add_argument('--show-config',
dest='show_config',
action='store_true',
help='Show the cluster configuration')
def execute(args: typing.NamedTuple):
@ -16,3 +21,9 @@ def execute(args: typing.NamedTuple):
cluster_id = args.cluster_id
cluster = spark_client.get_cluster(cluster_id)
utils.print_cluster(spark_client, cluster)
configuration = utils.helpers.read_cluster_config(cluster_id, spark_client.blob_client)
if configuration and args.show_config:
log.info("-------------------------------------------")
log.info("Cluster configuration:")
utils.print_cluster_conf(configuration, False)

Просмотреть файл

@ -5,38 +5,36 @@ from cli import utils, config
from cli.config import SshConfig
import aztk
import azure.batch.models.batch_error as batch_error
from aztk.models import ClusterConfiguration
def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id', dest="cluster_id",
help='The unique id of your spark cluster')
parser.add_argument('--webui',
help='Local port to port spark\'s master UI to')
parser.add_argument('--jobui',
help='Local port to port spark\'s job UI to')
parser.add_argument('--jobhistoryui',
help='Local port to port spark\'s job history UI to')
parser.add_argument('--jupyter',
help='Local port to port jupyter to')
parser.add_argument('--namenodeui',
help='Local port to port HDFS NameNode UI to')
parser.add_argument('--rstudioserver',
help='Local port to port rstudio server to')
parser.add_argument('-u', '--username',
help='Username to spark cluster')
parser.add_argument('--host', dest="host",
action='store_true',
help='Connect to the host of the Spark container')
parser.add_argument('--no-connect', dest="connect",
action='store_false',
help='Do not create the ssh session. Only print out \
parser.add_argument('--id', dest="cluster_id", help='The unique id of your spark cluster')
parser.add_argument('--webui', help='Local port to port spark\'s master UI to')
parser.add_argument('--jobui', help='Local port to port spark\'s job UI to')
parser.add_argument('--jobhistoryui', help='Local port to port spark\'s job history UI to')
parser.add_argument('--jupyter', help='Local port to port jupyter to')
parser.add_argument('--namenodeui', help='Local port to port HDFS NameNode UI to')
parser.add_argument('--rstudioserver', help='Local port to port rstudio server to')
parser.add_argument('-u', '--username', help='Username to spark cluster')
parser.add_argument('--host', dest="host", action='store_true', help='Connect to the host of the Spark container')
parser.add_argument(
'--no-connect',
dest="connect",
action='store_false',
help='Do not create the ssh session. Only print out \
the command to run.')
parser.set_defaults(connect=True)
http_prefix = 'http://localhost:'
def execute(args: typing.NamedTuple):
spark_client = aztk.spark.Client(config.load_aztk_screts())
cluster = spark_client.get_cluster(args.cluster_id)
cluster_config = utils.helpers.read_cluster_config(args.cluster_id, spark_client.blob_client)
ssh_conf = SshConfig()
ssh_conf.merge(
@ -49,20 +47,16 @@ def execute(args: typing.NamedTuple):
name_node_ui_port=args.namenodeui,
rstudio_server_port=args.rstudioserver,
host=args.host,
connect=args.connect
)
connect=args.connect)
http_prefix = 'http://localhost:'
log.info("-------------------------------------------")
log.info("spark cluster id: %s", ssh_conf.cluster_id)
log.info("open webui: %s%s", http_prefix, ssh_conf.web_ui_port)
log.info("open jobui: %s%s", http_prefix, ssh_conf.job_ui_port)
log.info("open jobhistoryui: %s%s", http_prefix, ssh_conf.job_history_ui_port)
log.info("open jupyter: %s%s", http_prefix, ssh_conf.jupyter_port)
log.info("open namenodeui: %s%s", http_prefix, ssh_conf.name_node_ui_port)
log.info("open rstudio server: %s%s", http_prefix, ssh_conf.rstudio_server_port)
log.info("ssh username: %s", ssh_conf.username)
log.info("connect: %s", ssh_conf.connect)
utils.log_property("spark cluster id", ssh_conf.cluster_id)
utils.log_property("open webui", "{0}{1}".format(http_prefix, ssh_conf.web_ui_port))
utils.log_property("open jobui", "{0}{1}".format(http_prefix, ssh_conf.job_ui_port))
utils.log_property("open jobhistoryui", "{0}{1}".format(http_prefix, ssh_conf.job_history_ui_port))
print_plugin_ports(cluster_config)
utils.log_property("ssh username", ssh_conf.username)
utils.log_property("connect", ssh_conf.connect)
log.info("-------------------------------------------")
# get ssh command
@ -73,9 +67,6 @@ def execute(args: typing.NamedTuple):
webui=ssh_conf.web_ui_port,
jobui=ssh_conf.job_ui_port,
jobhistoryui=ssh_conf.job_history_ui_port,
namenodeui=ssh_conf.name_node_ui_port,
jupyter=ssh_conf.jupyter_port,
rstudioserver=ssh_conf.rstudio_server_port,
username=ssh_conf.username,
host=ssh_conf.host,
connect=ssh_conf.connect)
@ -90,3 +81,29 @@ def execute(args: typing.NamedTuple):
raise aztk.error.AztkError("The cluster you are trying to connect to does not exist.")
else:
raise
def print_plugin_ports(cluster_config: ClusterConfiguration):
if cluster_config and cluster_config.plugins:
plugins = cluster_config.plugins
has_ports = False
for plugin in plugins:
for port in plugin.ports:
if port.expose_publicly:
has_ports = True
break
if has_ports > 0:
log.info("plugins:")
for plugin in plugins:
for port in plugin.ports:
if port.expose_publicly:
label = " - open {}".format(plugin.name)
if port.name:
label += " {}".format(port.name)
url = "{0}{1}".format(http_prefix, port.public_port)
utils.log_property(label, url)

Просмотреть файл

@ -6,9 +6,10 @@ import time
from subprocess import call
from typing import List
import azure.batch.models as batch_models
import aztk.spark
from aztk import error
from aztk.utils import get_ssh_key
from aztk import error, utils
from aztk.utils import get_ssh_key, helpers
from aztk.models import ClusterConfiguration
from aztk.spark import models
from . import log
@ -32,7 +33,7 @@ def get_ssh_key_or_prompt(ssh_key, username, password, secrets_config):
raise error.AztkError("Failed to get valid password, cannot add user to cluster. It is recommended that you provide a ssh public key in .aztk/secrets.yaml. Or provide an ssh-key or password with commnad line parameters (--ssh-key or --password). You may also run the 'aztk spark cluster add-user' command to add a user to this cluster.")
return ssh_key, password
def print_cluster(client, cluster: aztk.spark.models.Cluster):
def print_cluster(client, cluster: models.Cluster):
node_count = __pretty_node_count(cluster)
log.info("")
@ -64,7 +65,7 @@ def print_cluster(client, cluster: aztk.spark.models.Cluster):
)
log.info('')
def __pretty_node_count(cluster: aztk.spark.models.Cluster) -> str:
def __pretty_node_count(cluster: models.Cluster) -> str:
if cluster.pool.allocation_state is batch_models.AllocationState.resizing:
return '{} -> {}'.format(
cluster.total_current_nodes,
@ -72,7 +73,7 @@ def __pretty_node_count(cluster: aztk.spark.models.Cluster) -> str:
else:
return '{}'.format(cluster.total_current_nodes)
def __pretty_dedicated_node_count(cluster: aztk.spark.models.Cluster)-> str:
def __pretty_dedicated_node_count(cluster: models.Cluster)-> str:
if (cluster.pool.allocation_state is batch_models.AllocationState.resizing
or cluster.pool.state is batch_models.PoolState.deleting)\
and cluster.current_dedicated_nodes != cluster.target_dedicated_nodes:
@ -82,7 +83,7 @@ def __pretty_dedicated_node_count(cluster: aztk.spark.models.Cluster)-> str:
else:
return '{}'.format(cluster.current_dedicated_nodes)
def __pretty_low_pri_node_count(cluster: aztk.spark.models.Cluster)-> str:
def __pretty_low_pri_node_count(cluster: models.Cluster)-> str:
if (cluster.pool.allocation_state is batch_models.AllocationState.resizing
or cluster.pool.state is batch_models.PoolState.deleting)\
and cluster.current_low_pri_nodes != cluster.target_low_pri_nodes:
@ -92,7 +93,7 @@ def __pretty_low_pri_node_count(cluster: aztk.spark.models.Cluster)-> str:
else:
return '{}'.format(cluster.current_low_pri_nodes)
def print_clusters(clusters: List[aztk.spark.models.Cluster]):
def print_clusters(clusters: List[models.Cluster]):
print_format = '{:<34}| {:<10}| {:<20}| {:<7}'
print_format_underline = '{:-<34}|{:-<11}|{:-<21}|{:-<7}'
@ -131,9 +132,6 @@ def ssh_in_master(
webui: str = None,
jobui: str = None,
jobhistoryui: str = None,
jupyter: str = None,
namenodeui: str = None,
rstudioserver: str = None,
ports=None,
host: bool = False,
connect: bool = True):
@ -143,33 +141,30 @@ def ssh_in_master(
:param username: Username to use to ssh
:param webui: Port for the spark master web ui (Local port)
:param jobui: Port for the job web ui (Local port)
:param jupyter: Port for jupyter (Local port)
:param rstudioserver: Port for rstudio server (Local port)
:param ports: an list of local and remote ports
:type ports: [[<local-port>, <remote-port>]]
"""
# Get master node id from task (job and task are both named pool_id)
cluster = client.get_cluster(cluster_id)
configuration = helpers.read_cluster_config(cluster_id, client.blob_client)
master_node_id = cluster.master_node_id
if master_node_id is None:
raise aztk.error.ClusterNotReadyError("Master node has not yet been picked!")
raise error.ClusterNotReadyError("Master node has not yet been picked!")
# get remote login settings for the user
remote_login_settings = client.get_remote_login_settings(cluster.id, master_node_id)
master_node_ip = remote_login_settings.ip_address
master_node_port = remote_login_settings.port
spark_web_ui_port = aztk.utils.constants.DOCKER_SPARK_WEB_UI_PORT
spark_worker_ui_port = aztk.utils.constants.DOCKER_SPARK_WORKER_UI_PORT
spark_rstudio_server_port = aztk.utils.constants.DOCKER_SPARK_RSTUDIO_SERVER_PORT
spark_jupyter_port = aztk.utils.constants.DOCKER_SPARK_JUPYTER_PORT
spark_job_ui_port = aztk.utils.constants.DOCKER_SPARK_JOB_UI_PORT
spark_job_history_ui_port = aztk.utils.constants.DOCKER_SPARK_JOB_UI_HISTORY_PORT
spark_namenode_ui_port = aztk.utils.constants.DOCKER_SPARK_NAMENODE_UI_PORT
spark_web_ui_port = utils.constants.DOCKER_SPARK_WEB_UI_PORT
spark_worker_ui_port = utils.constants.DOCKER_SPARK_WORKER_UI_PORT
spark_job_ui_port = utils.constants.DOCKER_SPARK_JOB_UI_PORT
spark_job_history_ui_port = utils.constants.DOCKER_SPARK_JOB_UI_HISTORY_PORT
ssh_command = aztk.utils.command_builder.CommandBuilder('ssh')
ssh_command = utils.command_builder.CommandBuilder('ssh')
# get ssh private key path if specified
ssh_priv_key = client.secrets_config.ssh_priv_key
@ -183,17 +178,16 @@ def ssh_in_master(
jobui, spark_job_ui_port), enable=bool(jobui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format(
jobhistoryui, spark_job_history_ui_port), enable=bool(jobui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format(
jupyter, spark_jupyter_port), enable=bool(jupyter))
ssh_command.add_option("-L", "{0}:localhost:{1}".format(
namenodeui, spark_namenode_ui_port), enable=bool(namenodeui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format(
rstudioserver, spark_rstudio_server_port), enable=bool(rstudioserver))
if ports is not None:
for port in ports:
ssh_command.add_option(
"-L", "{0}:localhost:{1}".format(port[0], port[1]))
if configuration and configuration.plugins:
for plugin in configuration.plugins:
for port in plugin.ports:
if port.expose_publicly:
ssh_command.add_option("-L", "{0}:localhost:{1}".format(port.public_port, port.internal))
user = username if username is not None else '<username>'
ssh_command.add_argument(
@ -230,7 +224,7 @@ def print_batch_exception(batch_exception):
Job submission
'''
def print_jobs(jobs: List[aztk.spark.models.Job]):
def print_jobs(jobs: List[models.Job]):
print_format = '{:<34}| {:<10}| {:<20}'
print_format_underline = '{:-<34}|{:-<11}|{:-<21}'
@ -247,7 +241,7 @@ def print_jobs(jobs: List[aztk.spark.models.Job]):
)
def print_job(client, job: aztk.spark.models.Job):
def print_job(client, job: models.Job):
print_format = '{:<36}| {:<15}'
log.info("")
@ -274,7 +268,7 @@ def print_job(client, job: aztk.spark.models.Job):
log.info("")
def node_state_count(cluster: aztk.spark.models.Cluster):
def node_state_count(cluster: models.Cluster):
states = {}
for state in batch_models.ComputeNodeState:
states[state] = 0
@ -283,7 +277,7 @@ def node_state_count(cluster: aztk.spark.models.Cluster):
return states
def print_cluster_summary(cluster: aztk.spark.models.Cluster):
def print_cluster_summary(cluster: models.Cluster):
print_format = '{:<4} {:<23} {:<15}'
log.info("Cluster %s", cluster.id)
@ -351,7 +345,7 @@ def print_applications(applications):
if warn_scheduling:
log.warning("\nNo Spark applications will be scheduled until the master is selected.")
def print_application(application: aztk.spark.models.Application):
def print_application(application: models.Application):
print_format = '{:<30}| {:<15}'
log.info("")
@ -404,3 +398,35 @@ class Spinner:
def utc_to_local(utc_dt):
return utc_dt.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None).strftime("%H:%M%p %d/%m/%y")
def print_cluster_conf(cluster_conf: ClusterConfiguration, wait: bool):
user_configuration = cluster_conf.user_configuration
log.info("-------------------------------------------")
log.info("spark cluster id: %s", cluster_conf.cluster_id)
log.info("spark cluster size: %s",
cluster_conf.vm_count + cluster_conf.vm_low_pri_count)
log.info("> dedicated: %s", cluster_conf.vm_count)
log.info("> low priority: %s", cluster_conf.vm_low_pri_count)
log.info("spark cluster vm size: %s", cluster_conf.vm_size)
log.info("custom scripts: %s", len(cluster_conf.custom_scripts) if cluster_conf.custom_scripts else 0)
log.info("subnet ID: %s", cluster_conf.subnet_id)
log.info("file shares: %s", len(cluster_conf.file_shares) if cluster_conf.file_shares is not None else 0)
log.info("docker repo name: %s", cluster_conf.docker_repo)
log.info("wait for cluster: %s", wait)
log.info("username: %s", user_configuration.username)
if user_configuration.password:
log.info("Password: %s", '*' * len(user_configuration.password))
log.info("Plugins:")
if not cluster_conf.plugins:
log.info(" None Configured")
else:
for plugin in cluster_conf.plugins:
log.info(" - %s", plugin.name)
log.info("-------------------------------------------")
def log_property(label: str, value: str):
label += ":"
log.info("{0:30} {1}".format(label, value))

Просмотреть файл

@ -15,7 +15,7 @@ size: 2
username: spark
# docker_repo: <name of docker image repo (for more information, see https://github.com/Azure/aztk/blob/master/docs/12-docker-image.md)>
docker_repo:
docker_repo:
# # optional custom scripts to run on the Spark master, Spark worker or all nodes in the cluster
# custom_scripts:
@ -27,5 +27,11 @@ docker_repo:
# To add your cluster to a virtual network provide the full arm resoruce id below
# subnet_id: /subscriptions/********-****-****-****-************/resourceGroups/********/providers/Microsoft.Network/virtualNetworks/*******/subnets/******
# To define plugins
# plugins:
# - name: rstudio_server
# args:
# version: 1.2.3
# wait: <true/false>
wait: false

Просмотреть файл

@ -14,14 +14,5 @@ job_history_ui_port: 18080
# web_ui_port: <local port where the spark master web ui is forwarded to>
web_ui_port: 8080
# jupyter_port: <local port which where jupyter is forwarded to>
jupyter_port: 8888
# name_node_ui_port: <local port which where Name Node UI is forwarded to>
name_node_ui_port: 50070
# rstudio_server_port: <local port which where rstudio server is forwarded to>
rstudio_server_port: 8787
# connect: <true/false, connect to spark master or print connection string (--no-connect)>
connect: true

Просмотреть файл

@ -31,7 +31,6 @@ is_dedicated = os.environ["AZ_BATCH_NODE_IS_DEDICATED"]
spark_web_ui_port = os.environ["SPARK_WEB_UI_PORT"]
spark_worker_ui_port = os.environ["SPARK_WORKER_UI_PORT"]
spark_jupyter_port = os.environ["SPARK_JUPYTER_PORT"]
spark_job_ui_port = os.environ["SPARK_JOB_UI_PORT"]
storage_account_name = os.environ["STORAGE_ACCOUNT_NAME"]

Просмотреть файл

@ -1,6 +1,6 @@
import os
from core import config
from install import pick_master, spark, scripts, create_user
from install import pick_master, spark, scripts, create_user, plugins
import wait_until_master_selected
@ -24,10 +24,12 @@ def setup_node():
if is_master:
setup_as_master()
plugins.setup_plugins(is_master=True, is_worker=True)
scripts.run_custom_scripts(is_master=True, is_worker=True)
else:
setup_as_worker()
plugins.setup_plugins(is_master=False, is_worker=True)
scripts.run_custom_scripts(is_master=False, is_worker=True)
open("/tmp/setup_complete", 'a').close()

Просмотреть файл

@ -0,0 +1,99 @@
import os
import json
import yaml
import subprocess
from pathlib import Path
log_folder = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'logs','plugins')
def _read_manifest_file(path=None):
custom_scripts = None
if not os.path.isfile(path):
print("Plugins manifest file doesn't exist at {0}".format(path))
else:
with open(path, 'r') as stream:
try:
custom_scripts = yaml.load(stream)
except json.JSONDecodeError as err:
print("Error in plugins manifest: {0}".format(err))
return custom_scripts
def setup_plugins(is_master: bool = False, is_worker: bool = False):
plugins_dir = _plugins_dir()
plugins_manifest = _read_manifest_file(
os.path.join(plugins_dir, 'plugins-manifest.yaml'))
if not os.path.exists(log_folder):
os.makedirs(log_folder)
if plugins_manifest is not None:
_setup_plugins(plugins_manifest, is_master, is_worker)
def _plugins_dir():
return os.path.join(os.environ['DOCKER_WORKING_DIR'], 'plugins')
def _run_on_this_node(plugin_obj=None, is_master=False, is_worker=False):
if plugin_obj['runOn'] == 'master' and is_master is True:
return True
if plugin_obj['runOn'] == 'worker' and is_worker is True:
return True
if plugin_obj['runOn'] == 'all-nodes':
return True
return False
def _setup_plugins(plugins_manifest, is_master=False, is_worker=False):
plugins_dir = _plugins_dir()
if is_master:
os.environ["IS_MASTER"] = "1"
else:
os.environ["IS_MASTER"] = "0"
if is_worker:
os.environ["IS_WORKER"] = "1"
else:
os.environ["IS_WORKER"] = "0"
for plugin in plugins_manifest:
if _run_on_this_node(plugin, is_master, is_worker):
path = os.path.join(plugins_dir, plugin['execute'])
_run_script(plugin.get("name"), path, plugin.get('args'), plugin.get('env'))
def _run_script(name: str, script_path: str = None, args: dict = None, env: dict = None):
if not os.path.isfile(script_path):
print("Cannot run plugin script: {0} file does not exist".format(
script_path))
return
file_stat = os.stat(script_path)
os.chmod(script_path, file_stat.st_mode | 0o777)
print("------------------------------------------------------------------")
print("Running plugin script:", script_path)
my_env = os.environ.copy()
if env:
for [key, value] in env.items():
my_env[key] = value
if args is None:
args = []
out_file = open(os.path.join(log_folder, '{0}.txt'.format(name)), 'w')
try:
subprocess.call(
[script_path] + args,
env=my_env,
stdout=out_file,
stderr=out_file)
print("Finished running")
print("------------------------------------------------------------------")
except Exception as e:
print(e)

Просмотреть файл

@ -9,6 +9,6 @@ paramiko==2.4.0
# Development
yapf==0.20.1
pylint==1.7.2
pylint==1.8.2
pytest==3.1.3
pytest-xdist==1.22.0

Просмотреть файл

@ -0,0 +1,35 @@
import os
import pytest
from aztk.models.plugins import PluginConfiguration
from aztk.models.plugins.internal import PluginManager
from aztk.error import InvalidPluginReferenceError
dir_path = os.path.dirname(os.path.realpath(__file__))
fake_plugin_dir = os.path.join(dir_path, "fake_plugins")
class RequiredArgPlugin(PluginConfiguration):
def __init__(self, req_arg):
super().__init__(name="required-arg")
def test_missing_plugin():
plugin_manager = PluginManager()
message = "Cannot find a plugin with name .*"
with pytest.raises(InvalidPluginReferenceError, match=message):
plugin_manager.get_plugin("non-existing-plugin")
def test_extra_args_plugin():
plugin_manager = PluginManager()
message = "Plugin JupyterPlugin doesn't have an argument called 'invalid'"
with pytest.raises(InvalidPluginReferenceError, match=message):
plugin_manager.get_plugin("jupyter", args=dict(invalid="foo"))
def test_missing_required_arg():
plugin_manager = PluginManager()
plugin_manager.plugins["required-arg"] = RequiredArgPlugin
message = "Missing a required argument req_arg for plugin RequiredArgPlugin"
with pytest.raises(InvalidPluginReferenceError, match=message):
plugin_manager.get_plugin("required-arg")

Просмотреть файл

@ -0,0 +1,47 @@
from aztk.models.plugins import PluginConfiguration, PluginPort, PluginRunTarget
def test_create_basic_plugin():
plugin = PluginConfiguration(
name="abc", files=["file.sh"], execute="file.sh")
assert plugin.name == "abc"
assert plugin.files == ["file.sh"]
assert plugin.execute == "file.sh"
assert plugin.args == []
assert plugin.run_on == PluginRunTarget.Master
def test_create_with_args():
plugin = PluginConfiguration(
name="abc", args=["arg1", "arg2"])
assert plugin.name == "abc"
assert len(plugin.args) == 2
assert plugin.args == ["arg1", "arg2"]
def test_plugin_with_internal_port():
plugin = PluginConfiguration(name="abc", ports=[PluginPort(internal=1234)])
assert plugin.name == "abc"
assert len(plugin.ports) == 1
port = plugin.ports[0]
assert port.internal == 1234
assert port.expose_publicly == False
assert port.public_port == None
def test_plugin_with_auto_public_port():
plugin = PluginConfiguration(name="abc", ports=[PluginPort(internal=1234, public=True)])
assert plugin.name == "abc"
assert len(plugin.ports) == 1
port = plugin.ports[0]
assert port.internal == 1234
assert port.expose_publicly == True
assert port.public_port == 1234
def test_plugin_with_specified_public_port():
plugin = PluginConfiguration(name="abc", ports=[PluginPort(internal=1234, public=4321)])
assert plugin.name == "abc"
assert len(plugin.ports) == 1
port = plugin.ports[0]
assert port.internal == 1234
assert port.expose_publicly == True
assert port.public_port == 4321

Просмотреть файл

@ -0,0 +1,35 @@
from aztk.utils.command_builder import CommandBuilder
def test_only_command():
cmd = CommandBuilder("ssh")
assert cmd.to_str() == "ssh"
def test_with_option():
cmd = CommandBuilder("ssh")
cmd.add_option("-L", "8080:localhost:8080")
assert cmd.to_str() == "ssh -L 8080:localhost:8080"
def test_with_multiple_options():
cmd = CommandBuilder("ssh")
cmd.add_option("-L", "8080:localhost:8080")
cmd.add_option("-p", "2020")
assert cmd.to_str() == "ssh -L 8080:localhost:8080 -p 2020"
def test_with_arg_and_option():
cmd = CommandBuilder("ssh")
cmd.add_argument("admin@1.2.3.4")
cmd.add_option("-p", "2020")
assert cmd.to_str() == "ssh -p 2020 admin@1.2.3.4"
def test_with_disabled_options():
cmd = CommandBuilder("ssh")
cmd.add_option("--verbose", enable=True)
cmd.add_option("-p", None)
cmd.add_option("-L", "8080:localhost:8080", enable=False)
assert cmd.to_str() == "ssh --verbose"