Feature: read/tail spark submit logs (#40)

* Tail log command

* tail log is working

* Cleanup code

* fix cr

* Fix cr

* Submit

* Comment

* fix tail

* Sleep
This commit is contained in:
Timothee Guerin 2017-08-18 12:35:04 -07:00 коммит произвёл GitHub
Родитель d94c43ed82
Коммит d552624705
7 изменённых файлов: 198 добавлений и 13 удалений

Просмотреть файл

@ -89,7 +89,7 @@ NOTE: The cluster id (--id) can only contain alphanumeric characters including h
Now you can submit jobs to run against the cluster:
```
azb spark submit \
azb spark app submit \
--id <my-cluster-id> \
--name <my-job-name> \
[options] \
@ -98,6 +98,19 @@ azb spark submit \
```
NOTE: The job name (--name) must be atleast 3 characters long, can only contain alphanumeric characters including hyphens but excluding underscores, and cannot contain uppercase letters.
The output of spark-submit will be streamed to the console. Use the `--no-wait` option to return immediately.
### Read the output of your spark job.
If you decided not to tail the log when submiting the job or want to read it again you can use this command.
```bash
azb spark app logs \
--id <my-cluster-id> \
-- name <my-job-name>
[--tail] # If you want it to tail the log if the task is still runing
```
### Interact with your Spark cluster
To view the spark UI, open up an ssh tunnel with the "masterui" option and a local port to map to:

Просмотреть файл

@ -46,3 +46,6 @@ WAIT_FOR_MASTER_TIMEOUT = 60 * 20
AZB_SOFTWARE_METADATA_KEY = "_azb_software"
TASK_WORKING_DIR = "wd"
SPARK_SUBMIT_LOGS_FILE = "output.log"

Просмотреть файл

@ -1,8 +1,13 @@
from datetime import timedelta
import time
import io
from typing import List
from dtde.core import CommandBuilder
import azure.batch.models as batch_models
from . import azure_api, util, constants
import azure.batch.models.batch_error as batch_error
from . import azure_api, util, log, constants
output_file = constants.TASK_WORKING_DIR + \
"/" + constants.SPARK_SUBMIT_LOGS_FILE
def get_node(node_id: str, cluster_id: str) -> batch_models.ComputeNode:
@ -26,7 +31,6 @@ def app_submit_cmd(
executor_memory: str,
driver_cores: str,
executor_cores: str):
master_id = util.get_master_node_id(cluster_id)
master_ip = get_node(master_id, cluster_id).ip_address
@ -36,8 +40,9 @@ def app_submit_cmd(
spark_home = constants.DOCKER_SPARK_HOME
# 2>&1 redirect stdout and stderr to be in the same file
spark_submit_cmd = CommandBuilder(
'{0}/bin/spark-submit'.format(spark_home))
'{0}/bin/spark-submit >> {1} 2>&1'.format(spark_home, constants.SPARK_SUBMIT_LOGS_FILE))
spark_submit_cmd.add_option(
'--master', 'spark://{0}:7077'.format(master_ip))
spark_submit_cmd.add_option('--name', name)
@ -67,7 +72,6 @@ def app_submit_cmd(
]
def submit_app(
cluster_id: str,
name: str,
@ -155,6 +159,89 @@ def submit_app(
# Wait for the app to finish
if wait:
util.wait_for_tasks_to_complete(
job_id,
timedelta(minutes=60))
read_log(cluster_id, name, tail=True)
def wait_for_app_to_be_running(cluster_id: str, app_name: str) -> batch_models.CloudTask:
"""
Wait for the batch task to leave the waiting state into running(or completed if it was fast enough)
"""
batch_client = azure_api.get_batch_client()
while True:
task = batch_client.task.get(cluster_id, app_name)
if task.state is batch_models.TaskState.active or task.state is batch_models.TaskState.preparing:
log.info("Task is waiting to be scheduled.")
time.sleep(5)
else:
return task
def check_task_node_exist(cluster_id: str, task: batch_models.CloudTask) -> bool:
batch_client = azure_api.get_batch_client()
try:
batch_client.compute_node.get(
cluster_id, task.node_info.node_id)
return True
except batch_error.BatchErrorException:
return False
def get_output_file_properties(cluster_id: str, app_name: str):
while True:
try:
file = util.get_file_properties(
cluster_id, app_name, output_file)
return file
except batch_error.BatchErrorException as e:
if e.response.status_code == 404:
log.info("Output file hasn't been created yet")
time.sleep(5)
continue
else:
raise e
def read_log(cluster_id: str, app_name: str, tail=False):
job_id = cluster_id
task_id = app_name
batch_client = azure_api.get_batch_client()
current_bytes = 0
task = wait_for_app_to_be_running(cluster_id, app_name)
if not check_task_node_exist(cluster_id, task):
log.error("The app ran on doesn't exists anymore(Node id: %s)!",
task.node_info.node_id)
return
while True:
file = get_output_file_properties(cluster_id, app_name)
target_bytes = file.content_length
if target_bytes != current_bytes:
ocp_range = None
if tail:
ocp_range = "bytes={0}-{1}".format(
current_bytes, target_bytes - 1)
stream = batch_client.file.get_from_task(
job_id, task_id, output_file, batch_models.FileGetFromTaskOptions(ocp_range=ocp_range))
content = util.read_stream_as_string(stream)
print(content, end="")
current_bytes = target_bytes
if not tail:
return
if task.state is batch_models.TaskState.completed:
log.info("Spark application is completed!")
return
task = batch_client.task.get(cluster_id, app_name)
time.sleep(5)

26
dtde/spark/cli/app.py Normal file
Просмотреть файл

@ -0,0 +1,26 @@
import argparse
import typing
from . import submit
from . import app_logs
def setup_parser(parser: argparse.ArgumentParser):
subparsers = parser.add_subparsers(
title="Actions", dest="app_action", metavar="<app_action>")
submit_parser = subparsers.add_parser(
"submit", help="Submit a new spark job")
logs_parser = subparsers.add_parser(
"logs", help="Action on an app")
submit.setup_parser(submit_parser)
app_logs.setup_parser(logs_parser)
def execute(args: typing.NamedTuple):
actions = dict(
submit=submit.execute,
logs=app_logs.execute,
)
func = actions[args.app_action]
func(args)

Просмотреть файл

@ -0,0 +1,23 @@
import argparse
import typing
from dtde import joblib
def setup_parser(parser: argparse.ArgumentParser):
parser.add_argument('--id',
dest='cluster_id',
required=True,
help='The unique id of your spark cluster')
parser.add_argument('--name',
dest='app_name',
required=True,
help='The unique id of your job name')
parser.add_argument('--tail', dest='tail', action='store_true')
def execute(args: typing.NamedTuple):
cluster_id = args.cluster_id
app_name = args.app_name
tail = args.tail
joblib.read_log(cluster_id, app_name, tail=tail)

Просмотреть файл

@ -3,6 +3,7 @@ import typing
from . import cluster
from . import submit
from . import app
def setup_parser(parser: argparse.ArgumentParser):
@ -12,17 +13,17 @@ def setup_parser(parser: argparse.ArgumentParser):
cluster_parser = subparsers.add_parser(
"cluster", help="Commands to manage a cluster")
submit_parser = subparsers.add_parser(
"submit", help="Submit a new spark job")
app_parser = subparsers.add_parser(
"app", help="Action on an app")
cluster.setup_parser(cluster_parser)
submit.setup_parser(submit_parser)
app.setup_parser(app_parser)
def execute(args: typing.NamedTuple):
actions = dict(
cluster=cluster.execute,
submit=submit.execute,
app=app.execute,
)
func = actions[args.action]
func(args)

Просмотреть файл

@ -1,5 +1,6 @@
from __future__ import print_function
import datetime
import io
import os
import time
import azure.batch.batch_service_client as batch
@ -373,3 +374,34 @@ def get_cluster_total_current_nodes(pool):
"""
return pool.current_dedicated_nodes + pool.current_low_priority_nodes
def get_file_properties(job_id: str, task_id: str, file_path: str):
batch_client = azure_api.get_batch_client()
raw = batch_client.file.get_properties_from_task(
job_id, task_id, file_path, raw=True)
return batch_models.FileProperties(
content_length=raw.headers["Content-Length"],
last_modified=raw.headers["Last-Modified"],
creation_time=raw.headers["ocp-creation-time"],
file_mode=raw.headers["ocp-batch-file-mode"],
)
def read_stream_as_string(stream, encoding="utf-8"):
"""
Read stream as string
:param stream: input stream generator
:param str encoding: The encoding of the file. The default is utf-8.
:return: The file content.
:rtype: str
"""
output = io.BytesIO()
try:
for data in stream:
output.write(data)
return output.getvalue().decode(encoding)
finally:
output.close()
raise RuntimeError('could not write data to stream or decode bytes')