This commit is contained in:
Maxime 2015-01-08 22:20:05 +00:00
Родитель 1789dd088c
Коммит 3e1a3352d5
6 изменённых файлов: 54 добавлений и 20 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -12,3 +12,4 @@ logs
MANIFEST
secrets.py
*.egg-info
*.bkp

Просмотреть файл

@ -43,8 +43,11 @@ class PrestoHook(BaseHook):
client = self.client
if client.runquery(hql, schema):
data = client.getdata()
df = pandas.DataFrame(data)
df.columns = [c['name'] for c in client.getcolumns()]
if data:
df = pandas.DataFrame(data)
df.columns = [c['name'] for c in client.getcolumns()]
else:
df = pandas.DataFrame()
return df
else:
raise PrestoException(self.client.getlasterrormessage())

Просмотреть файл

@ -2,7 +2,9 @@ from datetime import datetime, timedelta
import dateutil.parser
import json
import logging
import re
import sys
import urllib2
from flask import Flask, url_for, Markup, Blueprint, redirect, flash, Response
from flask.ext.admin import Admin, BaseView, expose, AdminIndexView
@ -35,6 +37,8 @@ from airflow.www.login import login_manager
import flask_login
from flask_login import login_required
QUERY_LIMIT = 100000
CHART_LIMIT = 200000
AUTHENTICATE = conf.getboolean('core', 'AUTHENTICATE')
if AUTHENTICATE is False:
@ -128,6 +132,7 @@ class Airflow(BaseView):
@expose('/query')
@login_required
@wwwutils.gzipped
def query(self):
session = settings.Session()
dbs = session.query(models.DatabaseConnection).order_by(
@ -150,7 +155,8 @@ class Airflow(BaseView):
db = [db for db in dbs if db.db_id == db_id_str][0]
hook = db.get_hook()
try:
df = hook.get_pandas_df(sql)
# df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT))
df = hook.get_pandas_df(sql, QUERY_LIMIT)
has_data = len(df) > 0
df = df.fillna('')
results = df.to_html(
@ -162,6 +168,11 @@ class Airflow(BaseView):
flash(str(e), 'error')
error = True
if has_data and len(df) == QUERY_LIMIT:
flash(
"Query output truncated at " + str(QUERY_LIMIT) +
" rows", 'info')
if not has_data and error:
flash('No data', 'error')
@ -218,10 +229,15 @@ class Airflow(BaseView):
pd.set_option('display.max_colwidth', 100)
hook = db.get_hook()
try:
df = hook.get_pandas_df(sql)
df = hook.get_pandas_df(wwwutils.limit_sql(sql, CHART_LIMIT))
except Exception as e:
payload['error'] += "SQL execution failed. Details: " + str(e)
if not payload['error'] and len(df) == CHART_LIMIT:
payload['warning'] = (
"Data has been truncated to {0}"
" rows. Expect incomplete results.").format(CHART_LIMIT)
if not payload['error'] and len(df) == 0:
payload['error'] += "Empty result set. "
elif not payload['error'] and len(df.columns) < 3:
@ -470,14 +486,12 @@ class Airflow(BaseView):
has_access = role in request.headers.get('X-Internalauth-Groups')
d = {k: v for k, v in request.headers}
try:
import urllib2
cookie = urllib2.unquote(d.get('Cookie'))
cookie = ''.join(cookie.split('j:')[1:]).split('; _ga=')[0]
cookie = json.loads(cookie)
email = str(cookie['data']['userData']['mail'][0])
except:
email = ""
cookie = urllib2.unquote(d.get('Cookie', ''))
mailsrch = re.compile(
r'[\w\-][\w\-\.]+@[\w\-][\w\-\.]+[a-zA-Z]{1,4}')
res = mailsrch.findall(cookie)
email = res[0] if res else ''
if has_access:
user = session.query(models.User).filter(
models.User.username == username).first()
@ -647,6 +661,7 @@ class Airflow(BaseView):
return response
@expose('/tree')
@wwwutils.gzipped
def tree(self):
dag_id = request.args.get('dag_id')
dag = dagbag.dags[dag_id]
@ -973,19 +988,19 @@ class TaskInstanceModelView(ModelView):
'start_date', 'end_date', 'duration', 'state', 'log')
can_delete = True
mv = TaskInstanceModelView(
models.TaskInstance, session, name="Task Instances", category="Admin")
models.TaskInstance, Session(), name="Task Instances", category="Admin")
admin.add_view(mv)
class JobModelView(ModelViewOnly):
column_default_sort = ('start_date', True)
mv = JobModelView(jobs.BaseJob, session, name="Jobs", category="Admin")
mv = JobModelView(jobs.BaseJob, Session(), name="Jobs", category="Admin")
admin.add_view(mv)
class UserModelView(LoginMixin, ModelView):
pass
mv = UserModelView(models.User, session, name="Users", category="Admin")
mv = UserModelView(models.User, Session(), name="Users", category="Admin")
admin.add_view(mv)
@ -1002,7 +1017,7 @@ class DatabaseConnectionModelView(LoginMixin, ModelView):
]
}
mv = DatabaseConnectionModelView(
models.DatabaseConnection, session,
models.DatabaseConnection, Session(),
name="Database Connections", category="Admin")
admin.add_view(mv)
@ -1012,7 +1027,7 @@ class LogModelView(ModelViewOnly):
column_filters = ('dag_id', 'task_id', 'execution_date')
mv = LogModelView(
models.Log, session, name="Logs", category="Admin")
models.Log, Session(), name="Logs", category="Admin")
admin.add_view(mv)
@ -1122,7 +1137,7 @@ class ChartModelView(LoginMixin, ModelView):
model.user_id = flask_login.current_user.id
mv = ChartModelView(
models.Chart, session,
models.Chart, Session(),
name="Charts", category="Tools")
admin.add_view(mv)

Просмотреть файл

@ -14,8 +14,8 @@ login_manager = flask_login.LoginManager()
def load_user(userid):
session = settings.Session()
user = session.query(User).filter(User.id == userid).first()
if not user:
raise Exception(userid)
#if not user:
# raise Exception(userid)
session.expunge_all()
session.commit()
session.close()

Просмотреть файл

@ -123,6 +123,10 @@
$('#chart_section').hide(1000);
$('#datatable_section').hide(1000);
}
function warn(msg){
$('#error_msg').html(msg);
$('#error').show();
}
$( document ).ready(function() {
Highcharts.setOptions({
colors: [
@ -154,6 +158,8 @@
else {
error(payload.error);
}
if ('warning' in payload)
warn(payload.warning);
}).fail(function() {
error( "Ooops. Unhandled error." );
});

Просмотреть файл

@ -7,6 +7,15 @@ from flask import after_this_request, request
import wtforms
from wtforms.compat import text_type
def limit_sql(sql, limit):
sql = sql.strip()
sql = sql.rstrip(';')
return """\
SELECT * FROM (
{sql}
) qry
LIMIT {limit};
""".format(**locals())
def gzipped(f):
'''