This commit is contained in:
Maxime 2014-12-24 19:21:14 +00:00
Родитель 76fc9f1734
Коммит 0f14481ed4
3 изменённых файлов: 88 добавлений и 30 удалений

Просмотреть файл

@ -31,5 +31,4 @@ HIVE_DEFAULT_DBID: hive_default
[misc]
RUN_AS_MASTER: True
JOB_HEARTBEAT_SEC: 5
# Used for dag_id and task_id VARCHAR length
ID_LEN: 250

Просмотреть файл

@ -23,6 +23,7 @@ from airflow import settings
from airflow import utils
from airflow.utils import State
from airflow.utils import apply_defaults
from flask_login import current_user
Base = declarative_base()
ID_LEN = conf.getint('misc', 'ID_LEN')
@ -113,9 +114,12 @@ class User(Base):
self.username = username
self.email = email
def __repr__(self):
return self.username
def get_id(self):
return unicode(self.id)
def is_active(self):
return True
@ -126,7 +130,6 @@ class User(Base):
return False
class DatabaseConnection(Base):
"""
Placeholder to store information about different database instances
@ -1145,13 +1148,16 @@ class Chart(Base):
__tablename__ = "chart"
id = Column(Integer, primary_key=True)
label = Column(String(200))
db_id = Column(String(ID_LEN), ForeignKey('db_connection.db_id'))
chart_type = Column(String(100), default="line_chart")
user_id = Column(Integer(), ForeignKey('user.id'),)
chart_type = Column(String(100), default="line")
sql_layout = Column(String(50), default="series")
sql = Column(Text, default="SELECT series, x, y FROM table")
y_log_scale = Column(Boolean)
show_datatable = Column(Boolean)
show_sql = Column(Boolean, default=True)
height = Column(Integer, default=600)
default_params = Column(String(5000), default="{}")
db = relationship("DatabaseConnection", order_by="DatabaseConnection.db_id")
owner = relationship("User", cascade=False, cascade_backrefs=False)
db = relationship("DatabaseConnection")

Просмотреть файл

@ -183,6 +183,7 @@ class Airflow(BaseView):
results=results or '',
has_data=has_data)
@expose('/chart')
@login_required
def chart(self):
@ -248,24 +249,52 @@ class Airflow(BaseView):
"SQL needs to return at least 3 columns (series, x, y)",
'error')
else:
# Preparing the data in a format that chartkick likes
for i, t in df.iterrows():
series, x, y = t[:3]
series = str(series)
if series not in all_data:
all_data[series] = []
if type(x) in (datetime, Timestamp, date) :
x = int(x.strftime("%s")) * 1000
else:
x = int(dateutil.parser.parse(x).strftime("%s")) * 1000
all_data[series].append([x, float(y)])
all_data = [{
'name': series,
'data': sorted(all_data[series], key=lambda r: r[0])
}
for series in sorted(
all_data, key=lambda s: all_data[s][0][1], reverse=True)
]
import numpy as np
# Trying to convert time to something Highcharts likes
x_col = 1 if chart.sql_layout == 'series' else 0
x_is_dt = True
print df.dtypes
df[df.columns[x_col]] = pd.to_datetime(df[df.columns[x_col]])
print df.dtypes
try:
# From string to datetime
df[df.columns[x_col]] = pd.to_datetime(df[df.columns[x_col]])
except Exception as e:
x_is_dt = False
raise Exception(str(e))
if x_is_dt:
df[df.columns[x_col]] = df[df.columns[x_col]].apply(
lambda x:int(x.strftime("%s")) * 1000)
if chart.sql_layout == 'series':
xaxis_label = df.columns[1]
yaxis_label = df.columns[2]
df[df.columns[2]] = df[df.columns[2]].astype(np.float)
df = df.pivot_table(
index=df.columns[1],
columns=df.columns[0],
values=df.columns[2], aggfunc=np.sum)
else:
xaxis_label = df.columns[0]
yaxis_label = 'y'
df.index = df[df.columns[0]]
df = df.sort('ds')
del df[df.columns[0]]
for col in df.columns:
df[col] = df[col].astype(np.float)
series = []
#df = df.fillna(0)
for col in df.columns:
series.append({
'name': col,
'data': [(i, v) for i, v in df[col].iteritems() if not np.isnan(v)]
})
series = [serie for serie in sorted(
series, key=lambda s: s['data'][0][1], reverse=True)]
chart_type = chart.chart_type
if chart.chart_type == "stacked_area":
stacking = "normal"
@ -289,20 +318,27 @@ class Airflow(BaseView):
},
'title': {'text': ''},
'xAxis': {
'title': {'text': df.columns[1]},
'title': {'text': xaxis_label},
'type': 'datetime',
},
'yAxis': {
'min': 0,
'title': {'text': df.columns[2]},
'title': {'text': yaxis_label},
},
'series': all_data,
'series': series,
}
if chart.y_log_scale:
hc['yAxis']['type'] = 'logarithmic'
hc['yAxis']['minorTickInterval'] = 0.1
del hc['yAxis']['min']
if chart.show_datatable:
table = df.to_html(
classes='table table-striped table-bordered')
sql += Markup("<code><pre>"+json.dumps(hc, indent=4)+ "</pre></code>")
response = self.render(
'airflow/highchart.html',
chart=chart,
@ -311,7 +347,8 @@ class Airflow(BaseView):
hc=json.dumps(hc) if hc else None,
show_chart=show_chart,
show_sql=show_sql,
sql=sql, label=label)
sql=sql,
label=label)
session.commit()
session.close()
return response
@ -889,11 +926,19 @@ def label_link(v, c, m, p):
return Markup("<a href='{url}'>{m.label}</a>".format(**locals()))
class ChartModelView(LoginMixin, ModelView):
column_list = ('label', 'db_id', 'chart_type', 'show_datatable', )
form_columns = (
'label', 'db', 'chart_type', 'owner', 'show_datatable', 'y_log_scale',
'sql_layout', 'show_sql', 'height', 'sql', 'default_params',)
column_list = (
'label', 'db_id', 'chart_type', 'owner',
'show_datatable', 'show_sql',)
column_formatters = dict(label=label_link)
create_template = 'airflow/chart/create.html'
edit_template = 'airflow/chart/edit.html'
column_filters = ('owner.username', 'db_id',)
column_searchable_list = ('owner.username', 'label', 'sql')
form_choices = {
'chart_type': [
('line', 'Line Chart'),
@ -903,8 +948,16 @@ class ChartModelView(LoginMixin, ModelView):
('area', 'Overlapping Area Chart'),
('stacked_area', 'Stacked Area Chart'),
('percent_area', 'Percent Area Chart'),
]
],
'sql_layout': [
('series', 'SELECT series, x, y FROM ...'),
('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'),
],
}
def on_model_change(self, form, model, is_created):
if not model.user_id and flask_login.current_user:
model.user_id = flask_login.current_user.id
mv = ChartModelView(
models.Chart, session,
name="Charts", category="Tools")