Enable history server on batch nodes (#209)

* enable the history server on the cluster by default

* detect spark event log settings and set up the node accordingly

* fix spacing between methods
This commit is contained in:
Pablo Selem 2017-11-08 19:55:13 -08:00 коммит произвёл GitHub
Родитель e0a42b8abf
Коммит d44f0ebf08
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 77 добавлений и 11 удалений

Просмотреть файл

@ -181,6 +181,7 @@ class SshConfig:
self.username = None self.username = None
self.cluster_id = None self.cluster_id = None
self.job_ui_port = None self.job_ui_port = None
self.job_history_ui_port = None
self.web_ui_port = None self.web_ui_port = None
self.jupyter_port = None self.jupyter_port = None
self.host = False self.host = False
@ -215,6 +216,9 @@ class SshConfig:
if config.get('job_ui_port') is not None: if config.get('job_ui_port') is not None:
self.job_ui_port = config['job_ui_port'] self.job_ui_port = config['job_ui_port']
if config.get('job_history_ui_port') is not None:
self.job_history_ui_port = config['job_history_ui_port']
if config.get('web_ui_port') is not None: if config.get('web_ui_port') is not None:
self.web_ui_port = config['web_ui_port'] self.web_ui_port = config['web_ui_port']
@ -227,7 +231,7 @@ class SshConfig:
if config.get('connect') is not None: if config.get('connect') is not None:
self.connect = config['connect'] self.connect = config['connect']
def merge(self, cluster_id, username, job_ui_port, web_ui_port, jupyter_port, host, connect): def merge(self, cluster_id, username, job_ui_port, job_history_ui_port, web_ui_port, jupyter_port, host, connect):
""" """
Merges fields with args object Merges fields with args object
""" """
@ -238,6 +242,7 @@ class SshConfig:
cluster_id=cluster_id, cluster_id=cluster_id,
username=username, username=username,
job_ui_port=job_ui_port, job_ui_port=job_ui_port,
job_history_ui_port=job_history_ui_port,
web_ui_port=web_ui_port, web_ui_port=web_ui_port,
jupyter_port=jupyter_port, jupyter_port=jupyter_port,
host=host, host=host,

Просмотреть файл

@ -15,6 +15,8 @@ def setup_parser(parser: argparse.ArgumentParser):
help='Local port to port spark\'s master UI to') help='Local port to port spark\'s master UI to')
parser.add_argument('--jobui', parser.add_argument('--jobui',
help='Local port to port spark\'s job UI to') help='Local port to port spark\'s job UI to')
parser.add_argument('--jobhistoryui',
help='Local port to port spark\'s job history UI to')
parser.add_argument('--jupyter', parser.add_argument('--jupyter',
help='Local port to port jupyter to') help='Local port to port jupyter to')
parser.add_argument('-u', '--username', parser.add_argument('-u', '--username',
@ -38,6 +40,7 @@ def execute(args: typing.NamedTuple):
cluster_id=args.cluster_id, cluster_id=args.cluster_id,
username=args.username, username=args.username,
job_ui_port=args.jobui, job_ui_port=args.jobui,
job_history_ui_port=args.jobhistoryui,
web_ui_port=args.webui, web_ui_port=args.webui,
jupyter_port=args.jupyter, jupyter_port=args.jupyter,
host=args.host, host=args.host,
@ -48,6 +51,7 @@ def execute(args: typing.NamedTuple):
log.info("spark cluster id: %s", ssh_conf.cluster_id) log.info("spark cluster id: %s", ssh_conf.cluster_id)
log.info("open webui: %s", ssh_conf.web_ui_port) log.info("open webui: %s", ssh_conf.web_ui_port)
log.info("open jobui: %s", ssh_conf.job_ui_port) log.info("open jobui: %s", ssh_conf.job_ui_port)
log.info("open jobhistoryui: %s", ssh_conf.job_history_ui_port)
log.info("open jupyter: %s", ssh_conf.jupyter_port) log.info("open jupyter: %s", ssh_conf.jupyter_port)
log.info("ssh username: %s", ssh_conf.username) log.info("ssh username: %s", ssh_conf.username)
log.info("connect: %s", ssh_conf.connect) log.info("connect: %s", ssh_conf.connect)
@ -60,6 +64,7 @@ def execute(args: typing.NamedTuple):
cluster_id=ssh_conf.cluster_id, cluster_id=ssh_conf.cluster_id,
webui=ssh_conf.web_ui_port, webui=ssh_conf.web_ui_port,
jobui=ssh_conf.job_ui_port, jobui=ssh_conf.job_ui_port,
jobhistoryui=ssh_conf.job_history_ui_port,
jupyter=ssh_conf.jupyter_port, jupyter=ssh_conf.jupyter_port,
username=ssh_conf.username, username=ssh_conf.username,
host=ssh_conf.host, host=ssh_conf.host,

Просмотреть файл

@ -119,6 +119,7 @@ def ssh_in_master(
username: str = None, username: str = None,
webui: str = None, webui: str = None,
jobui: str = None, jobui: str = None,
jobhistoryui: str = None,
jupyter: str = None, jupyter: str = None,
ports=None, ports=None,
host: bool = False, host: bool = False,
@ -150,6 +151,8 @@ def ssh_in_master(
spark_worker_ui_port = aztk_sdk.utils.constants.DOCKER_SPARK_WORKER_UI_PORT spark_worker_ui_port = aztk_sdk.utils.constants.DOCKER_SPARK_WORKER_UI_PORT
spark_jupyter_port = aztk_sdk.utils.constants.DOCKER_SPARK_JUPYTER_PORT spark_jupyter_port = aztk_sdk.utils.constants.DOCKER_SPARK_JUPYTER_PORT
spark_job_ui_port = aztk_sdk.utils.constants.DOCKER_SPARK_JOB_UI_PORT spark_job_ui_port = aztk_sdk.utils.constants.DOCKER_SPARK_JOB_UI_PORT
spark_job_history_ui_port = aztk_sdk.utils.constants.DOCKER_SPARK_JOB_UI_HISTORY_PORT
ssh_command = aztk_sdk.utils.command_builder.CommandBuilder('ssh') ssh_command = aztk_sdk.utils.command_builder.CommandBuilder('ssh')
@ -163,6 +166,8 @@ def ssh_in_master(
webui, spark_web_ui_port), enable=bool(webui)) webui, spark_web_ui_port), enable=bool(webui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format( ssh_command.add_option("-L", "{0}:localhost:{1}".format(
jobui, spark_job_ui_port), enable=bool(jobui)) jobui, spark_job_ui_port), enable=bool(jobui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format(
jobhistoryui, spark_job_history_ui_port), enable=bool(jobui))
ssh_command.add_option("-L", "{0}:localhost:{1}".format( ssh_command.add_option("-L", "{0}:localhost:{1}".format(
jupyter, spark_jupyter_port), enable=bool(jupyter)) jupyter, spark_jupyter_port), enable=bool(jupyter))

Просмотреть файл

@ -35,10 +35,12 @@ def __docker_run_cmd(docker_repo: str = None) -> str:
cmd.add_option('-e', 'SPARK_WORKER_UI_PORT=$SPARK_WORKER_UI_PORT') cmd.add_option('-e', 'SPARK_WORKER_UI_PORT=$SPARK_WORKER_UI_PORT')
cmd.add_option('-e', 'SPARK_JUPYTER_PORT=$SPARK_JUPYTER_PORT') cmd.add_option('-e', 'SPARK_JUPYTER_PORT=$SPARK_JUPYTER_PORT')
cmd.add_option('-e', 'SPARK_JOB_UI_PORT=$SPARK_JOB_UI_PORT') cmd.add_option('-e', 'SPARK_JOB_UI_PORT=$SPARK_JOB_UI_PORT')
cmd.add_option('-p', '8080:8080') cmd.add_option('-p', '8080:8080') # Spark Master UI
cmd.add_option('-p', '7077:7077') cmd.add_option('-p', '7077:7077') # Spark Master
cmd.add_option('-p', '4040:4040') cmd.add_option('-p', '4040:4040') # Job UI
cmd.add_option('-p', '8888:8888') cmd.add_option('-p', '8888:8888') # Jupyter UI
cmd.add_option('-p', '18080:18080') # Spark History Server UI
cmd.add_option('-d', docker_repo) cmd.add_option('-d', docker_repo)
cmd.add_argument('/bin/bash /batch/startup/wd/docker_main.sh') cmd.add_argument('/bin/bash /batch/startup/wd/docker_main.sh')
return cmd.to_str() return cmd.to_str()

Просмотреть файл

@ -11,6 +11,7 @@ DOCKER_SPARK_WEB_UI_PORT = 8080
DOCKER_SPARK_WORKER_UI_PORT = 8081 DOCKER_SPARK_WORKER_UI_PORT = 8081
DOCKER_SPARK_JUPYTER_PORT = 8888 DOCKER_SPARK_JUPYTER_PORT = 8888
DOCKER_SPARK_JOB_UI_PORT = 4040 DOCKER_SPARK_JOB_UI_PORT = 4040
DOCKER_SPARK_JOB_UI_HISTORY_PORT = 18080
DOCKER_SPARK_HOME = "/home/spark-current" DOCKER_SPARK_HOME = "/home/spark-current"
""" """

Просмотреть файл

@ -159,6 +159,25 @@ def start_spark_master():
setup_jupyter() setup_jupyter()
start_jupyter() start_jupyter()
start_history_server()
def start_history_server():
# configure the history server
spark_event_log_enabled_key = 'spark.eventLog.enabled'
spark_event_log_directory_key = 'spark.eventLog.dir'
path_to_spark_defaults_conf = os.path.join(spark_home, 'conf/spark-defaults.conf')
properties = parse_configuration_file(path_to_spark_defaults_conf)
# only enable the history server if it was enabled in the configuration file
if spark_event_log_enabled_key in properties:
if spark_event_log_directory_key in properties:
configure_history_server_log_path(properties[spark_event_log_directory_key])
exe = os.path.join(spark_home, "sbin", "start-history-server.sh")
cmd = [exe]
print("Starting history server")
call(cmd)
def start_spark_worker(): def start_spark_worker():
@ -185,6 +204,7 @@ def setup_conf():
copy_core_site() copy_core_site()
copy_jars() copy_jars()
def copy_spark_env(): def copy_spark_env():
spark_env_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/spark-env.sh') spark_env_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/spark-env.sh')
spark_env_path_dest = os.path.join(spark_home, 'conf/spark-env.sh') spark_env_path_dest = os.path.join(spark_home, 'conf/spark-env.sh')
@ -197,6 +217,7 @@ def copy_spark_env():
print("Failed to copy spark-env.sh file") print("Failed to copy spark-env.sh file")
print(e) print(e)
def copy_core_site(): def copy_core_site():
spark_default_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/spark-defaults.conf') spark_default_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'conf/spark-defaults.conf')
spark_default_path_dest = os.path.join(spark_home, 'conf/spark-defaults.conf') spark_default_path_dest = os.path.join(spark_home, 'conf/spark-defaults.conf')
@ -214,9 +235,10 @@ def copy_core_site():
try: try:
shutil.copyfile(spark_default_path_src, spark_default_path_dest) shutil.copyfile(spark_default_path_src, spark_default_path_dest)
except Exception as e: except Exception as e:
print("Failed to copy spark-defaults.conf file") print("Failed to copy core-site.xml file")
print(e) print(e)
def copy_jars(): def copy_jars():
# Copy jars to $SPARK_HOME/jars # Copy jars to $SPARK_HOME/jars
spark_default_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'jars') spark_default_path_src = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'jars')
@ -232,3 +254,29 @@ def copy_jars():
except Exception as e: except Exception as e:
print("Failed to copy jar files") print("Failed to copy jar files")
print(e) print(e)
def parse_configuration_file(path_to_file: str):
file = open(path_to_file, 'r')
properties = {}
for line in file:
if (not line.startswith('#') and len(line) > 1):
split = line.split()
properties[split[0]] = split[1]
return properties
def configure_history_server_log_path(path_to_log_file):
# Check if the file path starts with a local file extension
# If so, create the path on disk otherwise ignore
print('Configuring spark history server log directory {}.'.format(path_to_log_file))
if path_to_log_file.startswith('file:/'):
# create the local path on disk
directory = path_to_log_file.replace('file:', '')
if os.path.exists(directory):
print('Skipping. Directory {} already exists.'.format(directory))
else:
print('Create direcotory {}.'.format(directory))
os.makedirs(directory)
else:
print('Skipping. The eventLog directory is not local.')