This commit is contained in:
Maxime 2014-12-30 20:47:45 +00:00
Родитель e3a61c255b
Коммит 5bfffb2386
1 изменённых файлов: 63 добавлений и 39 удалений

Просмотреть файл

@ -1,4 +1,4 @@
from datetime import datetime, timedelta, date
from datetime import datetime, timedelta
import dateutil.parser
import json
import logging
@ -37,7 +37,7 @@ from flask_login import login_required
AUTHENTICATE = conf.getboolean('core', 'AUTHENTICATE')
if AUTHENTICATE is False:
login_required = lambda x: x
login_required = lambda x: x
dagbag = models.DagBag(conf.get('core', 'DAGS_FOLDER'))
session = Session()
@ -62,7 +62,9 @@ class AceEditorWidget(wtforms.widgets.TextArea):
kwargs.setdefault('id', field.id)
html = '''
<div id="{el_id}" style="height:100px;">{contents}</div>
<textarea id="{el_id}_ace" name="{form_name}" style="display:none;visibility:hidden;">
<textarea
id="{el_id}_ace" name="{form_name}"
style="display:none;visibility:hidden;">
</textarea>
'''.format(
el_id=kwargs.get('id', field.id),
@ -71,13 +73,16 @@ class AceEditorWidget(wtforms.widgets.TextArea):
)
return wtforms.widgets.core.HTMLString(html)
# Date filter form needed for gantt and graph view
class DateTimeForm(Form):
execution_date = DateTimeField("Execution date", widget=DateTimePickerWidget())
# Date filter form needed for gantt and graph view
execution_date = DateTimeField(
"Execution date", widget=DateTimePickerWidget())
class GraphForm(Form):
execution_date = DateTimeField("Execution date", widget=DateTimePickerWidget())
execution_date = DateTimeField(
"Execution date", widget=DateTimePickerWidget())
arrange = SelectField("Layout", choices=(
('LR', "Left->Right"),
('RL', "Right->Left"),
@ -90,16 +95,19 @@ class GraphForm(Form):
def index():
return redirect(url_for('admin.index'))
@app.route('/health')
def health():
""" We can add an array of tests here to check the server's health """
content = Markup(markdown.markdown("The server is healthy!"))
return content;
return content
@app.teardown_appcontext
def shutdown_session(exception=None):
settings.Session.remove()
class HomeView(AdminIndexView):
"""
Basic home view, just showing the README.md file
@ -108,8 +116,13 @@ class HomeView(AdminIndexView):
def index(self):
dags = sorted(dagbag.dags.values(), key=lambda dag: dag.dag_id)
return self.render('airflow/dags.html', dags=dags)
admin = Admin(
app, name="Airflow", index_view=HomeView(name='DAGs'), template_mode='bootstrap3')
app,
name="Airflow",
index_view=HomeView(name='DAGs'),
template_mode='bootstrap3')
admin.add_link(
base.MenuLink(
category='Tools',
@ -145,6 +158,7 @@ class Airflow(BaseView):
db_choices = [(db.db_id, db.db_id) for db in dbs]
db_id_str = request.args.get('db_id')
sql = request.args.get('sql')
class QueryForm(Form):
db_id = SelectField("Layout", choices=db_choices)
sql = TextAreaField("SQL", widget=AceEditorWidget())
@ -186,7 +200,6 @@ class Airflow(BaseView):
@expose('/chart_data')
@login_required
def chart_data(self):
from pandas.tslib import Timestamp
session = settings.Session()
chart_id = request.args.get('chart_id')
chart = session.query(models.Chart).filter_by(id=chart_id).all()[0]
@ -209,7 +222,7 @@ class Airflow(BaseView):
"Default params is not valid, string has to evaluate as "
"a Python dictionary. ")
request_dict = {k:request.args.get(k) for k in request.args}
request_dict = {k: request.args.get(k) for k in request.args}
from airflow import macros
args.update(request_dict)
args['macros'] = macros
@ -217,7 +230,7 @@ class Airflow(BaseView):
label = jinja2.Template(chart.label).render(**args)
payload['sql_html'] = Markup(highlight(
sql,
SqlLexer(), # Lexer call
SqlLexer(), # Lexer call
HtmlFormatter(noclasses=True))
)
payload['label'] = label
@ -256,7 +269,7 @@ class Airflow(BaseView):
raise Exception(str(e))
if x_is_dt:
df[df.columns[x_col]] = df[df.columns[x_col]].apply(
lambda x:int(x.strftime("%s")) * 1000)
lambda x: int(x.strftime("%s")) * 1000)
if chart.sql_layout == 'series':
# User provides columns (series, x, y)
@ -281,7 +294,9 @@ class Airflow(BaseView):
for col in df.columns:
series.append({
'name': col,
'data': [(i, v) for i, v in df[col].iteritems() if not np.isnan(v)]
'data': [
(i, v)
for i, v in df[col].iteritems() if not np.isnan(v)]
})
series = [serie for serie in sorted(
series, key=lambda s: s['data'][0][1], reverse=True)]
@ -296,7 +311,7 @@ class Airflow(BaseView):
else:
stacking = None
hc = {
'chart':{
'chart': {
'type': chart_type
},
'plotOptions': {
@ -350,12 +365,9 @@ class Airflow(BaseView):
if chart.show_sql:
sql = Markup(highlight(
chart.sql,
SqlLexer(), # Lexer call
SqlLexer(), # Lexer call
HtmlFormatter(noclasses=True))
)
series = []
#df = df.fillna(0)
response = self.render(
'airflow/highchart.html',
chart=chart,
@ -371,7 +383,8 @@ class Airflow(BaseView):
dag_id = request.args.get('dag_id')
dag = dagbag.dags[dag_id]
code = "".join(open(dag.full_filepath, 'r').readlines())
title = dag.filepath.replace(conf.get('core', 'BASE_FOLDER') + '/dags/', '')
title = dag.filepath.replace(
conf.get('core', 'BASE_FOLDER') + '/dags/', '')
html_code = highlight(
code, PythonLexer(), HtmlFormatter(noclasses=True))
return self.render(
@ -385,20 +398,20 @@ class Airflow(BaseView):
def login(u):
session = settings.Session()
role = 'airpal_topsecret.engineering.airbnb.com'
if not 'X-Internalauth-Username' in request.headers:
if 'X-Internalauth-Username' not in request.headers:
return redirect(url_for('airflow.noaccess'))
username = request.headers.get('X-Internalauth-Username')
has_access = role in request.headers.get('X-Internalauth-Groups')
d = {k:v for k, v in request.headers}
d = {k: v for k, v in request.headers}
import urllib2
cookie = urllib2.unquote(d.get('Cookie'))
cookie = ''.join(cookie.split('j:')[1:]).split('; _ga=')[0]
cookie = json.loads(cookie)
#email = [email for email in cookie['mail'] if 'airbnb.com' in email][0]
email = str(cookie['data']['userData']['mail'][0])
if has_access:
user = session.query(models.User).filter(models.User.username==username).first()
user = session.query(models.User).filter(
models.User.username == username).first()
if not user:
user = models.User(username=username)
user.email = email
@ -406,21 +419,22 @@ class Airflow(BaseView):
flask_login.login_user(user)
session.commit()
session.close()
return redirect(request.args.get("next") or url_for("index"))
return redirect(request.args.get("next") or url_for("index"))
return redirect('/')
@expose('/logout')
def logout(self):
flask_login.logout_user()
return redirect('/admin')
return redirect('/admin')
@expose('/log')
def log(self):
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dag = dagbag.dags[dag_id]
loc = conf.get('core', 'BASE_LOG_FOLDER') + "/{dag_id}/{task_id}/{execution_date}"
loc = BASE_LOG_FOLDER + "/{dag_id}/{task_id}/{execution_date}"
loc = loc.format(**locals())
try:
f = open(loc)
@ -431,8 +445,8 @@ class Airflow(BaseView):
TI = models.TaskInstance
session = Session()
ti = session.query(TI).filter(
TI.dag_id==dag_id, TI.task_id==task_id,
TI.execution_date==execution_date).first()
TI.dag_id == dag_id, TI.task_id == task_id,
TI.execution_date == execution_date).first()
if ti:
host = ti.hostname
log += "\n\nIt should be on host [{host}]".format(**locals())
@ -463,7 +477,8 @@ class Airflow(BaseView):
for attr_name in dir(task):
if not attr_name.startswith('_'):
attr = getattr(task, attr_name)
if type(attr) != type(self.task) and attr_name not in special_attrs:
if type(attr) != type(self.task) and \
attr_name not in special_attrs:
attributes.append((attr_name, str(attr)))
title = "Task Details for {task_id}".format(**locals())
@ -484,7 +499,7 @@ class Airflow(BaseView):
source = getattr(task, attr_name)
special_attrs_rendered[attr_name] = highlight(
source,
special_attrs[attr_name](), # Lexer call
special_attrs[attr_name](), # Lexer call
HtmlFormatter(noclasses=True)
)
@ -535,11 +550,13 @@ class Airflow(BaseView):
tasks = [task_id]
if upstream:
tasks += \
[t.task_id for t in task.get_flat_relatives(upstream=True)]
tasks += [
t.task_id
for t in task.get_flat_relatives(upstream=True)]
if downstream:
tasks += \
[t.task_id for t in task.get_flat_relatives(upstream=False)]
tasks += [
t.task_id
for t in task.get_flat_relatives(upstream=False)]
qry = qry.filter(TI.task_id.in_(tasks))
if not qry.count():
@ -559,7 +576,6 @@ class Airflow(BaseView):
session.close()
return response
@expose('/tree')
def tree(self):
dag_id = request.args.get('dag_id')
@ -738,7 +754,8 @@ class Airflow(BaseView):
if ti.end_date:
data.append([
ti.execution_date.isoformat(), (
ti.end_date - (ti.execution_date + task.schedule_interval)
ti.end_date - (
ti.execution_date + task.schedule_interval)
).total_seconds()/(60*60)
])
all_data.append({'data': data, 'name': task.task_id})
@ -826,9 +843,11 @@ admin.add_view(Airflow(name='DAGs'))
# Leveraging the admin for CRUD and browse on models
# ------------------------------------------------
class LoginMixin(object):
def is_accessible(self):
return AUTHENTICATE is False or flask_login.current_user.is_authenticated()
return AUTHENTICATE is False or \
flask_login.current_user.is_authenticated()
class ModelViewOnly(ModelView):
@ -860,6 +879,7 @@ def task_link(v, c, m, p):
return Markup(
'<a href="{url}">{m.task_id}</a>'.format(**locals()))
def dag_link(v, c, m, p):
url = url_for(
'airflow.tree',
@ -867,10 +887,12 @@ def dag_link(v, c, m, p):
return Markup(
'<a href="{url}">{m.dag_id}</a>'.format(**locals()))
def duration_f(v, c, m, p):
if m.end_date:
return timedelta(seconds=m.duration)
class TaskInstanceModelView(ModelView):
column_filters = ('dag_id', 'task_id', 'state', 'execution_date')
column_formatters = dict(
@ -890,6 +912,7 @@ class JobModelView(ModelViewOnly):
mv = JobModelView(jobs.BaseJob, session, name="Jobs", category="Admin")
admin.add_view(mv)
class UserModelView(LoginMixin, ModelView):
pass
mv = UserModelView(models.User, session, name="Users", category="Admin")
@ -920,14 +943,15 @@ mv = LogModelView(
models.Log, session, name="Logs", category="Admin")
admin.add_view(mv)
class ReloadTaskView(BaseView):
@expose('/')
def index(self):
logging.info("Reloading the dags")
dagbag.collect_dags()
return redirect(url_for('index'))
admin.add_view(ReloadTaskView(name='Reload DAGs', category="Admin"))
admin.add_view(ReloadTaskView(name='Reload DAGs', category="Admin"))
def label_link(v, c, m, p):
@ -939,7 +963,6 @@ def label_link(v, c, m, p):
return Markup("<a href='{url}'>{m.label}</a>".format(**locals()))
class ChartModelView(LoginMixin, ModelView):
form_columns = (
'label',
@ -977,6 +1000,7 @@ class ChartModelView(LoginMixin, ModelView):
('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'),
],
}
def on_model_change(self, form, model, is_created):
if not model.user_id and flask_login.current_user:
model.user_id = flask_login.current_user.id