Коммит
8f087ff98b
|
@ -23,6 +23,7 @@ SMTP_MAIL_FROM: 'airflow_alerts@mydomain.com'
|
|||
CELERY_APP_NAME: airflow.executors.celery_executor
|
||||
BROKER_URL = sqla+mysql://airflow:airflow@localhost:3306/airflow
|
||||
CELERY_RESULT_BACKEND = db+mysql://airflow:airflow@localhost:3306/airflow
|
||||
WORKER_LOG_SERVER_PORT = 8793
|
||||
|
||||
[hooks]
|
||||
HIVE_HOME_PY: '/usr/lib/hive/lib/py'
|
||||
|
|
|
@ -182,11 +182,29 @@ def master(args):
|
|||
job = jobs.MasterJob(args.dag_id, args.subdir)
|
||||
job.run()
|
||||
|
||||
def serve_logs(args):
|
||||
print("Starting flask")
|
||||
import flask
|
||||
flask_app = flask.Flask(__name__)
|
||||
@flask_app.route('/log/<path:filename>')
|
||||
def serve_logs(filename):
|
||||
conf.get('core', 'BASE_LOG_FOLDER')
|
||||
print filename
|
||||
return flask.send_from_directory(
|
||||
conf.get('core', 'BASE_LOG_FOLDER'),
|
||||
filename,
|
||||
mimetype="application/json",
|
||||
as_attachment=False)
|
||||
print(conf.get('core', 'BASE_LOG_FOLDER'))
|
||||
flask_app.run(host='0.0.0.0', port=8990, debug=True)
|
||||
|
||||
def worker(args):
|
||||
from airflow.executors.celery_executor import app
|
||||
app.start()
|
||||
|
||||
import subprocess
|
||||
sp = subprocess.Popen("airflow serve_logs", shell=True)
|
||||
# Worker to serve static log files through this simple flask app
|
||||
from airflow.executors.celery_executor import app as celery_app
|
||||
celery_app.start()
|
||||
sp.kill()
|
||||
|
||||
def initdb(args):
|
||||
|
||||
|
@ -368,5 +386,9 @@ if __name__ == '__main__':
|
|||
parser_worker = subparsers.add_parser('worker', help=ht)
|
||||
parser_worker.set_defaults(func=worker)
|
||||
|
||||
ht = "Serve logs generate by worker"
|
||||
parser_logs = subparsers.add_parser('serve_logs', help=ht)
|
||||
parser_logs.set_defaults(func=serve_logs)
|
||||
|
||||
args = parser.parse_args()
|
||||
args.func(args)
|
||||
|
|
|
@ -158,7 +158,7 @@ class MasterJob(BaseJob):
|
|||
sys.exit(1)
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
settings.pessimistic_connection_handling()
|
||||
utils.pessimistic_connection_handling()
|
||||
|
||||
# Sleep time (seconds) between master runs
|
||||
|
||||
|
|
|
@ -518,14 +518,16 @@ class Airflow(BaseView):
|
|||
task_id = request.args.get('task_id')
|
||||
execution_date = request.args.get('execution_date')
|
||||
dag = dagbag.dags[dag_id]
|
||||
loc = BASE_LOG_FOLDER + "/{dag_id}/{task_id}/{execution_date}"
|
||||
log_relative = "/{dag_id}/{task_id}/{execution_date}".format(**locals())
|
||||
loc = BASE_LOG_FOLDER + log_relative
|
||||
loc = loc.format(**locals())
|
||||
log = ""
|
||||
try:
|
||||
f = open(loc)
|
||||
log = "".join(f.readlines())
|
||||
log += "".join(f.readlines())
|
||||
f.close()
|
||||
except:
|
||||
log = "The log file '{loc}' is missing.".format(**locals())
|
||||
log = "The log file '{0}' doesn't exist locally\n".format(loc)
|
||||
TI = models.TaskInstance
|
||||
session = Session()
|
||||
ti = session.query(TI).filter(
|
||||
|
@ -533,7 +535,21 @@ class Airflow(BaseView):
|
|||
TI.execution_date == execution_date).first()
|
||||
if ti:
|
||||
host = ti.hostname
|
||||
log += "\n\nIt should be on host [{host}]".format(**locals())
|
||||
WORKER_LOG_SERVER_PORT = \
|
||||
conf.get('celery', 'WORKER_LOG_SERVER_PORT')
|
||||
url = (
|
||||
"http://{host}:{WORKER_LOG_SERVER_PORT}/log"
|
||||
"{log_relative}").format(**locals())
|
||||
|
||||
log += "It should be on host [{host}]\n".format(**locals())
|
||||
log += "Fetching here: {url}\n".format(**locals())
|
||||
try:
|
||||
import urllib2
|
||||
w = urllib2.urlopen(url)
|
||||
log += w.read()
|
||||
w.close()
|
||||
except:
|
||||
log += "Failed to fetch.".format(**locals())
|
||||
session.commit()
|
||||
session.close()
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче