From 3e1a3352d5becf37b0ec43e3368a38fa777f4bc2 Mon Sep 17 00:00:00 2001 From: Maxime Date: Thu, 8 Jan 2015 22:20:05 +0000 Subject: [PATCH] More charts improvements --- .gitignore | 1 + airflow/hooks/presto/presto_hook.py | 7 ++- airflow/www/app.py | 47 +++++++++++++------- airflow/www/login.py | 4 +- airflow/www/templates/airflow/highchart.html | 6 +++ airflow/www/utils.py | 9 ++++ 6 files changed, 54 insertions(+), 20 deletions(-) diff --git a/.gitignore b/.gitignore index 0016ebcfdb..7a56b617b1 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ logs MANIFEST secrets.py *.egg-info +*.bkp diff --git a/airflow/hooks/presto/presto_hook.py b/airflow/hooks/presto/presto_hook.py index 4adc2a38de..9ac6bb281a 100644 --- a/airflow/hooks/presto/presto_hook.py +++ b/airflow/hooks/presto/presto_hook.py @@ -43,8 +43,11 @@ class PrestoHook(BaseHook): client = self.client if client.runquery(hql, schema): data = client.getdata() - df = pandas.DataFrame(data) - df.columns = [c['name'] for c in client.getcolumns()] + if data: + df = pandas.DataFrame(data) + df.columns = [c['name'] for c in client.getcolumns()] + else: + df = pandas.DataFrame() return df else: raise PrestoException(self.client.getlasterrormessage()) diff --git a/airflow/www/app.py b/airflow/www/app.py index 5c642843af..871b7be05b 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -2,7 +2,9 @@ from datetime import datetime, timedelta import dateutil.parser import json import logging +import re import sys +import urllib2 from flask import Flask, url_for, Markup, Blueprint, redirect, flash, Response from flask.ext.admin import Admin, BaseView, expose, AdminIndexView @@ -35,6 +37,8 @@ from airflow.www.login import login_manager import flask_login from flask_login import login_required +QUERY_LIMIT = 100000 +CHART_LIMIT = 200000 AUTHENTICATE = conf.getboolean('core', 'AUTHENTICATE') if AUTHENTICATE is False: @@ -128,6 +132,7 @@ class Airflow(BaseView): @expose('/query') @login_required + @wwwutils.gzipped def query(self): session = settings.Session() dbs = session.query(models.DatabaseConnection).order_by( @@ -150,7 +155,8 @@ class Airflow(BaseView): db = [db for db in dbs if db.db_id == db_id_str][0] hook = db.get_hook() try: - df = hook.get_pandas_df(sql) + # df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT)) + df = hook.get_pandas_df(sql, QUERY_LIMIT) has_data = len(df) > 0 df = df.fillna('') results = df.to_html( @@ -162,6 +168,11 @@ class Airflow(BaseView): flash(str(e), 'error') error = True + if has_data and len(df) == QUERY_LIMIT: + flash( + "Query output truncated at " + str(QUERY_LIMIT) + + " rows", 'info') + if not has_data and error: flash('No data', 'error') @@ -218,10 +229,15 @@ class Airflow(BaseView): pd.set_option('display.max_colwidth', 100) hook = db.get_hook() try: - df = hook.get_pandas_df(sql) + df = hook.get_pandas_df(wwwutils.limit_sql(sql, CHART_LIMIT)) except Exception as e: payload['error'] += "SQL execution failed. Details: " + str(e) + if not payload['error'] and len(df) == CHART_LIMIT: + payload['warning'] = ( + "Data has been truncated to {0}" + " rows. Expect incomplete results.").format(CHART_LIMIT) + if not payload['error'] and len(df) == 0: payload['error'] += "Empty result set. " elif not payload['error'] and len(df.columns) < 3: @@ -470,14 +486,12 @@ class Airflow(BaseView): has_access = role in request.headers.get('X-Internalauth-Groups') d = {k: v for k, v in request.headers} - try: - import urllib2 - cookie = urllib2.unquote(d.get('Cookie')) - cookie = ''.join(cookie.split('j:')[1:]).split('; _ga=')[0] - cookie = json.loads(cookie) - email = str(cookie['data']['userData']['mail'][0]) - except: - email = "" + cookie = urllib2.unquote(d.get('Cookie', '')) + mailsrch = re.compile( + r'[\w\-][\w\-\.]+@[\w\-][\w\-\.]+[a-zA-Z]{1,4}') + res = mailsrch.findall(cookie) + email = res[0] if res else '' + if has_access: user = session.query(models.User).filter( models.User.username == username).first() @@ -647,6 +661,7 @@ class Airflow(BaseView): return response @expose('/tree') + @wwwutils.gzipped def tree(self): dag_id = request.args.get('dag_id') dag = dagbag.dags[dag_id] @@ -973,19 +988,19 @@ class TaskInstanceModelView(ModelView): 'start_date', 'end_date', 'duration', 'state', 'log') can_delete = True mv = TaskInstanceModelView( - models.TaskInstance, session, name="Task Instances", category="Admin") + models.TaskInstance, Session(), name="Task Instances", category="Admin") admin.add_view(mv) class JobModelView(ModelViewOnly): column_default_sort = ('start_date', True) -mv = JobModelView(jobs.BaseJob, session, name="Jobs", category="Admin") +mv = JobModelView(jobs.BaseJob, Session(), name="Jobs", category="Admin") admin.add_view(mv) class UserModelView(LoginMixin, ModelView): pass -mv = UserModelView(models.User, session, name="Users", category="Admin") +mv = UserModelView(models.User, Session(), name="Users", category="Admin") admin.add_view(mv) @@ -1002,7 +1017,7 @@ class DatabaseConnectionModelView(LoginMixin, ModelView): ] } mv = DatabaseConnectionModelView( - models.DatabaseConnection, session, + models.DatabaseConnection, Session(), name="Database Connections", category="Admin") admin.add_view(mv) @@ -1012,7 +1027,7 @@ class LogModelView(ModelViewOnly): column_filters = ('dag_id', 'task_id', 'execution_date') mv = LogModelView( - models.Log, session, name="Logs", category="Admin") + models.Log, Session(), name="Logs", category="Admin") admin.add_view(mv) @@ -1122,7 +1137,7 @@ class ChartModelView(LoginMixin, ModelView): model.user_id = flask_login.current_user.id mv = ChartModelView( - models.Chart, session, + models.Chart, Session(), name="Charts", category="Tools") admin.add_view(mv) diff --git a/airflow/www/login.py b/airflow/www/login.py index 0c600b3843..f967cd6cc9 100644 --- a/airflow/www/login.py +++ b/airflow/www/login.py @@ -14,8 +14,8 @@ login_manager = flask_login.LoginManager() def load_user(userid): session = settings.Session() user = session.query(User).filter(User.id == userid).first() - if not user: - raise Exception(userid) + #if not user: + # raise Exception(userid) session.expunge_all() session.commit() session.close() diff --git a/airflow/www/templates/airflow/highchart.html b/airflow/www/templates/airflow/highchart.html index fbc5c097d5..a7eff58eb1 100644 --- a/airflow/www/templates/airflow/highchart.html +++ b/airflow/www/templates/airflow/highchart.html @@ -123,6 +123,10 @@ $('#chart_section').hide(1000); $('#datatable_section').hide(1000); } + function warn(msg){ + $('#error_msg').html(msg); + $('#error').show(); + } $( document ).ready(function() { Highcharts.setOptions({ colors: [ @@ -154,6 +158,8 @@ else { error(payload.error); } + if ('warning' in payload) + warn(payload.warning); }).fail(function() { error( "Ooops. Unhandled error." ); }); diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 26db093334..a0aba919a4 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -7,6 +7,15 @@ from flask import after_this_request, request import wtforms from wtforms.compat import text_type +def limit_sql(sql, limit): + sql = sql.strip() + sql = sql.rstrip(';') + return """\ + SELECT * FROM ( + {sql} + ) qry + LIMIT {limit}; + """.format(**locals()) def gzipped(f): '''