Merge pull request #76 from mistercrunch/logs

Improving remote logs
This commit is contained in:
Maxime Beauchemin 2015-01-08 21:06:47 -08:00
Родитель 8f087ff98b b26791674c
Коммит dd1931eb47
2 изменённых файлов: 45 добавлений и 17 удалений

Просмотреть файл

@ -19,6 +19,7 @@ import argparse
mark_success_help = "Mark jobs as succeeded without running them"
subdir_help = "File location or directory from which to look for the dag"
def backfill(args):
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
dagbag = DagBag(args.subdir)
@ -86,10 +87,12 @@ def run(args):
force=args.force,
ignore_dependencies=args.ignore_dependencies)
def list_dags(args):
dagbag = DagBag()
print("\n".join(sorted(dagbag.dags)))
def list_tasks(args):
dagbag = DagBag()
if args.dag_id not in dagbag.dags:
@ -101,6 +104,7 @@ def list_tasks(args):
tasks = sorted([t.task_id for t in dag.tasks])
print("\n".join(sorted(tasks)))
def test(args):
log = logging.getLogger()
log.setLevel(logging.DEBUG)
@ -118,6 +122,7 @@ def test(args):
ti = TaskInstance(task, args.execution_date)
ti.run(force=True, ignore_dependencies=True)
def clear(args):
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
dagbag = DagBag(args.subdir)
@ -198,6 +203,26 @@ def serve_logs(args):
print(conf.get('core', 'BASE_LOG_FOLDER'))
flask_app.run(host='0.0.0.0', port=8990, debug=True)
def serve_logs(args):
print("Starting flask")
import flask
flask_app = flask.Flask(__name__)
@flask_app.route('/log/<path:filename>')
def serve_logs(filename):
conf.get('core', 'BASE_LOG_FOLDER')
print filename
return flask.send_from_directory(
conf.get('core', 'BASE_LOG_FOLDER'),
filename,
mimetype="application/json",
as_attachment=False)
print(conf.get('core', 'BASE_LOG_FOLDER'))
WORKER_LOG_SERVER_PORT = \
int(conf.get('celery', 'WORKER_LOG_SERVER_PORT'))
flask_app.run(
host='0.0.0.0', port=WORKER_LOG_SERVER_PORT, debug=True)
def worker(args):
import subprocess
sp = subprocess.Popen("airflow serve_logs", shell=True)
@ -206,6 +231,7 @@ def worker(args):
celery_app.start()
sp.kill()
def initdb(args):
if raw_input(

Просмотреть файл

@ -3,6 +3,7 @@ import dateutil.parser
import json
import logging
import re
import socket
import sys
import urllib2
@ -522,26 +523,27 @@ class Airflow(BaseView):
loc = BASE_LOG_FOLDER + log_relative
loc = loc.format(**locals())
log = ""
try:
f = open(loc)
log += "".join(f.readlines())
f.close()
except:
log = "The log file '{0}' doesn't exist locally\n".format(loc)
TI = models.TaskInstance
session = Session()
ti = session.query(TI).filter(
TI.dag_id == dag_id, TI.task_id == task_id,
TI.execution_date == execution_date).first()
if ti:
host = ti.hostname
TI = models.TaskInstance
session = Session()
ti = session.query(TI).filter(
TI.dag_id == dag_id, TI.task_id == task_id,
TI.execution_date == execution_date).first()
if ti:
host = ti.hostname
if socket.gethostname() == host:
try:
f = open(loc)
log += "".join(f.readlines())
f.close()
except:
log = "Log file isn't where expected.\n".format(loc)
else:
WORKER_LOG_SERVER_PORT = \
conf.get('celery', 'WORKER_LOG_SERVER_PORT')
url = (
"http://{host}:{WORKER_LOG_SERVER_PORT}/log"
"{log_relative}").format(**locals())
log += "It should be on host [{host}]\n".format(**locals())
log += "Log file isn't local."
log += "Fetching here: {url}\n".format(**locals())
try:
import urllib2
@ -549,11 +551,11 @@ class Airflow(BaseView):
log += w.read()
w.close()
except:
log += "Failed to fetch.".format(**locals())
log += "Failed to fetch log file.".format(**locals())
session.commit()
session.close()
log = "<pre><code>" + log + "</code></pre>"
log = "<pre><code>{0}</code></pre>".format(log)
title = "Logs for {task_id} on {execution_date}".format(**locals())
html_code = log