Merge pull request #67 from mistercrunch/chart

Linting and polishing charts
This commit is contained in:
Maxime Beauchemin 2014-12-30 12:53:24 -08:00
Родитель 6934d955ff 5bfffb2386
Коммит 32be69f53d
3 изменённых файлов: 76 добавлений и 41 удалений

Просмотреть файл

@ -1,4 +1,4 @@
from datetime import datetime, timedelta, date from datetime import datetime, timedelta
import dateutil.parser import dateutil.parser
import json import json
import logging import logging
@ -37,7 +37,7 @@ from flask_login import login_required
AUTHENTICATE = conf.getboolean('core', 'AUTHENTICATE') AUTHENTICATE = conf.getboolean('core', 'AUTHENTICATE')
if AUTHENTICATE is False: if AUTHENTICATE is False:
login_required = lambda x: x login_required = lambda x: x
dagbag = models.DagBag(conf.get('core', 'DAGS_FOLDER')) dagbag = models.DagBag(conf.get('core', 'DAGS_FOLDER'))
session = Session() session = Session()
@ -62,7 +62,9 @@ class AceEditorWidget(wtforms.widgets.TextArea):
kwargs.setdefault('id', field.id) kwargs.setdefault('id', field.id)
html = ''' html = '''
<div id="{el_id}" style="height:100px;">{contents}</div> <div id="{el_id}" style="height:100px;">{contents}</div>
<textarea id="{el_id}_ace" name="{form_name}" style="display:none;visibility:hidden;"> <textarea
id="{el_id}_ace" name="{form_name}"
style="display:none;visibility:hidden;">
</textarea> </textarea>
'''.format( '''.format(
el_id=kwargs.get('id', field.id), el_id=kwargs.get('id', field.id),
@ -71,13 +73,16 @@ class AceEditorWidget(wtforms.widgets.TextArea):
) )
return wtforms.widgets.core.HTMLString(html) return wtforms.widgets.core.HTMLString(html)
# Date filter form needed for gantt and graph view
class DateTimeForm(Form): class DateTimeForm(Form):
execution_date = DateTimeField("Execution date", widget=DateTimePickerWidget()) # Date filter form needed for gantt and graph view
execution_date = DateTimeField(
"Execution date", widget=DateTimePickerWidget())
class GraphForm(Form): class GraphForm(Form):
execution_date = DateTimeField("Execution date", widget=DateTimePickerWidget()) execution_date = DateTimeField(
"Execution date", widget=DateTimePickerWidget())
arrange = SelectField("Layout", choices=( arrange = SelectField("Layout", choices=(
('LR', "Left->Right"), ('LR', "Left->Right"),
('RL', "Right->Left"), ('RL', "Right->Left"),
@ -90,16 +95,19 @@ class GraphForm(Form):
def index(): def index():
return redirect(url_for('admin.index')) return redirect(url_for('admin.index'))
@app.route('/health') @app.route('/health')
def health(): def health():
""" We can add an array of tests here to check the server's health """ """ We can add an array of tests here to check the server's health """
content = Markup(markdown.markdown("The server is healthy!")) content = Markup(markdown.markdown("The server is healthy!"))
return content; return content
@app.teardown_appcontext @app.teardown_appcontext
def shutdown_session(exception=None): def shutdown_session(exception=None):
settings.Session.remove() settings.Session.remove()
class HomeView(AdminIndexView): class HomeView(AdminIndexView):
""" """
Basic home view, just showing the README.md file Basic home view, just showing the README.md file
@ -108,8 +116,13 @@ class HomeView(AdminIndexView):
def index(self): def index(self):
dags = sorted(dagbag.dags.values(), key=lambda dag: dag.dag_id) dags = sorted(dagbag.dags.values(), key=lambda dag: dag.dag_id)
return self.render('airflow/dags.html', dags=dags) return self.render('airflow/dags.html', dags=dags)
admin = Admin( admin = Admin(
app, name="Airflow", index_view=HomeView(name='DAGs'), template_mode='bootstrap3') app,
name="Airflow",
index_view=HomeView(name='DAGs'),
template_mode='bootstrap3')
admin.add_link( admin.add_link(
base.MenuLink( base.MenuLink(
category='Tools', category='Tools',
@ -145,6 +158,7 @@ class Airflow(BaseView):
db_choices = [(db.db_id, db.db_id) for db in dbs] db_choices = [(db.db_id, db.db_id) for db in dbs]
db_id_str = request.args.get('db_id') db_id_str = request.args.get('db_id')
sql = request.args.get('sql') sql = request.args.get('sql')
class QueryForm(Form): class QueryForm(Form):
db_id = SelectField("Layout", choices=db_choices) db_id = SelectField("Layout", choices=db_choices)
sql = TextAreaField("SQL", widget=AceEditorWidget()) sql = TextAreaField("SQL", widget=AceEditorWidget())
@ -186,7 +200,6 @@ class Airflow(BaseView):
@expose('/chart_data') @expose('/chart_data')
@login_required @login_required
def chart_data(self): def chart_data(self):
from pandas.tslib import Timestamp
session = settings.Session() session = settings.Session()
chart_id = request.args.get('chart_id') chart_id = request.args.get('chart_id')
chart = session.query(models.Chart).filter_by(id=chart_id).all()[0] chart = session.query(models.Chart).filter_by(id=chart_id).all()[0]
@ -209,7 +222,7 @@ class Airflow(BaseView):
"Default params is not valid, string has to evaluate as " "Default params is not valid, string has to evaluate as "
"a Python dictionary. ") "a Python dictionary. ")
request_dict = {k:request.args.get(k) for k in request.args} request_dict = {k: request.args.get(k) for k in request.args}
from airflow import macros from airflow import macros
args.update(request_dict) args.update(request_dict)
args['macros'] = macros args['macros'] = macros
@ -217,7 +230,7 @@ class Airflow(BaseView):
label = jinja2.Template(chart.label).render(**args) label = jinja2.Template(chart.label).render(**args)
payload['sql_html'] = Markup(highlight( payload['sql_html'] = Markup(highlight(
sql, sql,
SqlLexer(), # Lexer call SqlLexer(), # Lexer call
HtmlFormatter(noclasses=True)) HtmlFormatter(noclasses=True))
) )
payload['label'] = label payload['label'] = label
@ -256,7 +269,7 @@ class Airflow(BaseView):
raise Exception(str(e)) raise Exception(str(e))
if x_is_dt: if x_is_dt:
df[df.columns[x_col]] = df[df.columns[x_col]].apply( df[df.columns[x_col]] = df[df.columns[x_col]].apply(
lambda x:int(x.strftime("%s")) * 1000) lambda x: int(x.strftime("%s")) * 1000)
if chart.sql_layout == 'series': if chart.sql_layout == 'series':
# User provides columns (series, x, y) # User provides columns (series, x, y)
@ -281,7 +294,9 @@ class Airflow(BaseView):
for col in df.columns: for col in df.columns:
series.append({ series.append({
'name': col, 'name': col,
'data': [(i, v) for i, v in df[col].iteritems() if not np.isnan(v)] 'data': [
(i, v)
for i, v in df[col].iteritems() if not np.isnan(v)]
}) })
series = [serie for serie in sorted( series = [serie for serie in sorted(
series, key=lambda s: s['data'][0][1], reverse=True)] series, key=lambda s: s['data'][0][1], reverse=True)]
@ -296,7 +311,7 @@ class Airflow(BaseView):
else: else:
stacking = None stacking = None
hc = { hc = {
'chart':{ 'chart': {
'type': chart_type 'type': chart_type
}, },
'plotOptions': { 'plotOptions': {
@ -350,12 +365,9 @@ class Airflow(BaseView):
if chart.show_sql: if chart.show_sql:
sql = Markup(highlight( sql = Markup(highlight(
chart.sql, chart.sql,
SqlLexer(), # Lexer call SqlLexer(), # Lexer call
HtmlFormatter(noclasses=True)) HtmlFormatter(noclasses=True))
) )
series = []
#df = df.fillna(0)
response = self.render( response = self.render(
'airflow/highchart.html', 'airflow/highchart.html',
chart=chart, chart=chart,
@ -371,7 +383,8 @@ class Airflow(BaseView):
dag_id = request.args.get('dag_id') dag_id = request.args.get('dag_id')
dag = dagbag.dags[dag_id] dag = dagbag.dags[dag_id]
code = "".join(open(dag.full_filepath, 'r').readlines()) code = "".join(open(dag.full_filepath, 'r').readlines())
title = dag.filepath.replace(conf.get('core', 'BASE_FOLDER') + '/dags/', '') title = dag.filepath.replace(
conf.get('core', 'BASE_FOLDER') + '/dags/', '')
html_code = highlight( html_code = highlight(
code, PythonLexer(), HtmlFormatter(noclasses=True)) code, PythonLexer(), HtmlFormatter(noclasses=True))
return self.render( return self.render(
@ -385,20 +398,20 @@ class Airflow(BaseView):
def login(u): def login(u):
session = settings.Session() session = settings.Session()
role = 'airpal_topsecret.engineering.airbnb.com' role = 'airpal_topsecret.engineering.airbnb.com'
if not 'X-Internalauth-Username' in request.headers: if 'X-Internalauth-Username' not in request.headers:
return redirect(url_for('airflow.noaccess')) return redirect(url_for('airflow.noaccess'))
username = request.headers.get('X-Internalauth-Username') username = request.headers.get('X-Internalauth-Username')
has_access = role in request.headers.get('X-Internalauth-Groups') has_access = role in request.headers.get('X-Internalauth-Groups')
d = {k:v for k, v in request.headers} d = {k: v for k, v in request.headers}
import urllib2 import urllib2
cookie = urllib2.unquote(d.get('Cookie')) cookie = urllib2.unquote(d.get('Cookie'))
cookie = ''.join(cookie.split('j:')[1:]).split('; _ga=')[0] cookie = ''.join(cookie.split('j:')[1:]).split('; _ga=')[0]
cookie = json.loads(cookie) cookie = json.loads(cookie)
#email = [email for email in cookie['mail'] if 'airbnb.com' in email][0]
email = str(cookie['data']['userData']['mail'][0]) email = str(cookie['data']['userData']['mail'][0])
if has_access: if has_access:
user = session.query(models.User).filter(models.User.username==username).first() user = session.query(models.User).filter(
models.User.username == username).first()
if not user: if not user:
user = models.User(username=username) user = models.User(username=username)
user.email = email user.email = email
@ -406,21 +419,22 @@ class Airflow(BaseView):
flask_login.login_user(user) flask_login.login_user(user)
session.commit() session.commit()
session.close() session.close()
return redirect(request.args.get("next") or url_for("index")) return redirect(request.args.get("next") or url_for("index"))
return redirect('/') return redirect('/')
@expose('/logout') @expose('/logout')
def logout(self): def logout(self):
flask_login.logout_user() flask_login.logout_user()
return redirect('/admin') return redirect('/admin')
@expose('/log') @expose('/log')
def log(self): def log(self):
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
dag_id = request.args.get('dag_id') dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id') task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date') execution_date = request.args.get('execution_date')
dag = dagbag.dags[dag_id] dag = dagbag.dags[dag_id]
loc = conf.get('core', 'BASE_LOG_FOLDER') + "/{dag_id}/{task_id}/{execution_date}" loc = BASE_LOG_FOLDER + "/{dag_id}/{task_id}/{execution_date}"
loc = loc.format(**locals()) loc = loc.format(**locals())
try: try:
f = open(loc) f = open(loc)
@ -431,8 +445,8 @@ class Airflow(BaseView):
TI = models.TaskInstance TI = models.TaskInstance
session = Session() session = Session()
ti = session.query(TI).filter( ti = session.query(TI).filter(
TI.dag_id==dag_id, TI.task_id==task_id, TI.dag_id == dag_id, TI.task_id == task_id,
TI.execution_date==execution_date).first() TI.execution_date == execution_date).first()
if ti: if ti:
host = ti.hostname host = ti.hostname
log += "\n\nIt should be on host [{host}]".format(**locals()) log += "\n\nIt should be on host [{host}]".format(**locals())
@ -463,7 +477,8 @@ class Airflow(BaseView):
for attr_name in dir(task): for attr_name in dir(task):
if not attr_name.startswith('_'): if not attr_name.startswith('_'):
attr = getattr(task, attr_name) attr = getattr(task, attr_name)
if type(attr) != type(self.task) and attr_name not in special_attrs: if type(attr) != type(self.task) and \
attr_name not in special_attrs:
attributes.append((attr_name, str(attr))) attributes.append((attr_name, str(attr)))
title = "Task Details for {task_id}".format(**locals()) title = "Task Details for {task_id}".format(**locals())
@ -484,7 +499,7 @@ class Airflow(BaseView):
source = getattr(task, attr_name) source = getattr(task, attr_name)
special_attrs_rendered[attr_name] = highlight( special_attrs_rendered[attr_name] = highlight(
source, source,
special_attrs[attr_name](), # Lexer call special_attrs[attr_name](), # Lexer call
HtmlFormatter(noclasses=True) HtmlFormatter(noclasses=True)
) )
@ -535,11 +550,13 @@ class Airflow(BaseView):
tasks = [task_id] tasks = [task_id]
if upstream: if upstream:
tasks += \ tasks += [
[t.task_id for t in task.get_flat_relatives(upstream=True)] t.task_id
for t in task.get_flat_relatives(upstream=True)]
if downstream: if downstream:
tasks += \ tasks += [
[t.task_id for t in task.get_flat_relatives(upstream=False)] t.task_id
for t in task.get_flat_relatives(upstream=False)]
qry = qry.filter(TI.task_id.in_(tasks)) qry = qry.filter(TI.task_id.in_(tasks))
if not qry.count(): if not qry.count():
@ -559,7 +576,6 @@ class Airflow(BaseView):
session.close() session.close()
return response return response
@expose('/tree') @expose('/tree')
def tree(self): def tree(self):
dag_id = request.args.get('dag_id') dag_id = request.args.get('dag_id')
@ -738,7 +754,8 @@ class Airflow(BaseView):
if ti.end_date: if ti.end_date:
data.append([ data.append([
ti.execution_date.isoformat(), ( ti.execution_date.isoformat(), (
ti.end_date - (ti.execution_date + task.schedule_interval) ti.end_date - (
ti.execution_date + task.schedule_interval)
).total_seconds()/(60*60) ).total_seconds()/(60*60)
]) ])
all_data.append({'data': data, 'name': task.task_id}) all_data.append({'data': data, 'name': task.task_id})
@ -826,9 +843,11 @@ admin.add_view(Airflow(name='DAGs'))
# Leveraging the admin for CRUD and browse on models # Leveraging the admin for CRUD and browse on models
# ------------------------------------------------ # ------------------------------------------------
class LoginMixin(object): class LoginMixin(object):
def is_accessible(self): def is_accessible(self):
return AUTHENTICATE is False or flask_login.current_user.is_authenticated() return AUTHENTICATE is False or \
flask_login.current_user.is_authenticated()
class ModelViewOnly(ModelView): class ModelViewOnly(ModelView):
@ -860,6 +879,7 @@ def task_link(v, c, m, p):
return Markup( return Markup(
'<a href="{url}">{m.task_id}</a>'.format(**locals())) '<a href="{url}">{m.task_id}</a>'.format(**locals()))
def dag_link(v, c, m, p): def dag_link(v, c, m, p):
url = url_for( url = url_for(
'airflow.tree', 'airflow.tree',
@ -867,10 +887,12 @@ def dag_link(v, c, m, p):
return Markup( return Markup(
'<a href="{url}">{m.dag_id}</a>'.format(**locals())) '<a href="{url}">{m.dag_id}</a>'.format(**locals()))
def duration_f(v, c, m, p): def duration_f(v, c, m, p):
if m.end_date: if m.end_date:
return timedelta(seconds=m.duration) return timedelta(seconds=m.duration)
class TaskInstanceModelView(ModelView): class TaskInstanceModelView(ModelView):
column_filters = ('dag_id', 'task_id', 'state', 'execution_date') column_filters = ('dag_id', 'task_id', 'state', 'execution_date')
column_formatters = dict( column_formatters = dict(
@ -890,6 +912,7 @@ class JobModelView(ModelViewOnly):
mv = JobModelView(jobs.BaseJob, session, name="Jobs", category="Admin") mv = JobModelView(jobs.BaseJob, session, name="Jobs", category="Admin")
admin.add_view(mv) admin.add_view(mv)
class UserModelView(LoginMixin, ModelView): class UserModelView(LoginMixin, ModelView):
pass pass
mv = UserModelView(models.User, session, name="Users", category="Admin") mv = UserModelView(models.User, session, name="Users", category="Admin")
@ -920,14 +943,15 @@ mv = LogModelView(
models.Log, session, name="Logs", category="Admin") models.Log, session, name="Logs", category="Admin")
admin.add_view(mv) admin.add_view(mv)
class ReloadTaskView(BaseView): class ReloadTaskView(BaseView):
@expose('/') @expose('/')
def index(self): def index(self):
logging.info("Reloading the dags") logging.info("Reloading the dags")
dagbag.collect_dags() dagbag.collect_dags()
return redirect(url_for('index')) return redirect(url_for('index'))
admin.add_view(ReloadTaskView(name='Reload DAGs', category="Admin"))
admin.add_view(ReloadTaskView(name='Reload DAGs', category="Admin"))
def label_link(v, c, m, p): def label_link(v, c, m, p):
@ -939,11 +963,19 @@ def label_link(v, c, m, p):
return Markup("<a href='{url}'>{m.label}</a>".format(**locals())) return Markup("<a href='{url}'>{m.label}</a>".format(**locals()))
class ChartModelView(LoginMixin, ModelView): class ChartModelView(LoginMixin, ModelView):
form_columns = ( form_columns = (
'label', 'db', 'chart_type', 'owner', 'show_datatable', 'y_log_scale', 'label',
'sql_layout', 'show_sql', 'height', 'sql', 'default_params',) 'owner',
'db',
'chart_type',
'show_datatable',
'y_log_scale',
'show_sql',
'height',
'sql_layout',
'sql',
'default_params',)
column_list = ( column_list = (
'label', 'db_id', 'chart_type', 'owner', 'label', 'db_id', 'chart_type', 'owner',
'show_datatable', 'show_sql',) 'show_datatable', 'show_sql',)
@ -968,6 +1000,7 @@ class ChartModelView(LoginMixin, ModelView):
('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'), ('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'),
], ],
} }
def on_model_change(self, form, model, is_created): def on_model_change(self, form, model, is_created):
if not model.user_id and flask_login.current_user: if not model.user_id and flask_login.current_user:
model.user_id = flask_login.current_user.id model.user_id = flask_login.current_user.id

Просмотреть файл

@ -19,6 +19,7 @@
textarea.val(editor.getSession().getValue()); textarea.val(editor.getSession().getValue());
}); });
editor.focus(); editor.focus();
$(":checkbox").removeClass("form-control");
}); });
</script> </script>
{% endblock %} {% endblock %}

Просмотреть файл

@ -19,6 +19,7 @@
textarea.val(editor.getSession().getValue()); textarea.val(editor.getSession().getValue());
}); });
editor.focus(); editor.focus();
$(":checkbox").removeClass("form-control");
}); });
</script> </script>
{% endblock %} {% endblock %}