Queries
This commit is contained in:
Родитель
d42c438c57
Коммит
f10e90e3e7
|
@ -69,6 +69,17 @@ class HiveHook(BaseHook):
|
|||
self.transport.close()
|
||||
return [row.split("\t") for row in records]
|
||||
|
||||
def get_pandas_df(self, hql, schema=None):
|
||||
import pandas as pd
|
||||
self.transport.open()
|
||||
if schema:
|
||||
self.hive.execute("USE " + schema)
|
||||
self.hive.execute(hql)
|
||||
records = self.hive.fetchAll()
|
||||
self.transport.close()
|
||||
df = pd.DataFrame([row.split("\t") for row in records])
|
||||
return df
|
||||
|
||||
def run(self, hql, schema=None):
|
||||
self.transport.open()
|
||||
if schema:
|
||||
|
|
|
@ -129,6 +129,15 @@ class DatabaseConnection(Base):
|
|||
self.schema = schema
|
||||
self.port = port
|
||||
|
||||
def get_hook(self):
|
||||
from airflow import hooks
|
||||
if self.db_type == 'mysql':
|
||||
return hooks.MySqlHook(mysql_dbid=self.db_id)
|
||||
elif self.db_type == 'hive':
|
||||
return hooks.HiveHook(hive_dbid=self.db_id)
|
||||
elif self.db_type == 'presto':
|
||||
return hooks.PrestoHook(presto_dbid=self.db_id)
|
||||
|
||||
|
||||
class DagPickle(Base):
|
||||
"""
|
||||
|
|
|
@ -7,10 +7,10 @@ from flask import Flask, url_for, Markup, Blueprint, redirect, flash
|
|||
from flask.ext.admin import Admin, BaseView, expose, AdminIndexView
|
||||
from flask.ext.admin.contrib.sqla import ModelView
|
||||
from flask import request
|
||||
from wtforms import Form, DateTimeField, SelectField
|
||||
from wtforms import Form, DateTimeField, SelectField, TextField, TextAreaField
|
||||
|
||||
from pygments import highlight
|
||||
from pygments.lexers import PythonLexer
|
||||
from pygments.lexers import PythonLexer, SqlLexer
|
||||
from pygments.formatters import HtmlFormatter
|
||||
|
||||
|
||||
|
@ -78,6 +78,32 @@ class Airflow(BaseView):
|
|||
def index(self):
|
||||
return self.render('admin/dags.html')
|
||||
|
||||
@expose('/query')
|
||||
def query(self):
|
||||
session = settings.Session()
|
||||
dbs = session.query(models.DatabaseConnection)
|
||||
db_choices = [(db.db_id, db.db_id) for db in dbs]
|
||||
db_id_str = request.args.get('db_id')
|
||||
sql = request.args.get('sql')
|
||||
class QueryForm(Form):
|
||||
db_id = SelectField("Layout", choices=db_choices)
|
||||
sql = TextAreaField("Execution date")
|
||||
data = {
|
||||
'db_id': db_id_str,
|
||||
'sql': sql,
|
||||
}
|
||||
results = None
|
||||
if db_id_str:
|
||||
db = [db for db in dbs if db.db_id == db_id_str][0]
|
||||
hook = db.get_hook()
|
||||
results = hook.get_pandas_df(sql).to_html(
|
||||
classes="table table-striped table-bordered model-list")
|
||||
|
||||
form = QueryForm(request.form, data=data)
|
||||
session.commit()
|
||||
session.close()
|
||||
return self.render('admin/query.html', form=form, results=results)
|
||||
|
||||
@expose('/code')
|
||||
def code(self):
|
||||
dag_id = request.args.get('dag_id')
|
||||
|
@ -118,17 +144,26 @@ class Airflow(BaseView):
|
|||
task = dag.get_task(task_id)
|
||||
|
||||
s = ''
|
||||
sql_attrs = ['hql', 'sql']
|
||||
for attr_name in dir(task):
|
||||
if not attr_name.startswith('_'):
|
||||
attr = getattr(task, attr_name)
|
||||
if type(attr) != type(self.task):
|
||||
if type(attr) != type(self.task) and attr_name not in sql_attrs:
|
||||
s += attr_name + ': ' + str(attr) + '\n'
|
||||
|
||||
s = s.replace('<', '<')
|
||||
s = s.replace('>', '>')
|
||||
html_code = "<pre><code>" + s + "</code></pre>"
|
||||
html_code = "<h3>Atributes:</h3><pre><code>" + s + "</code></pre>"
|
||||
|
||||
title = "Task Details for {task_id}".format(**locals())
|
||||
|
||||
for attr_name in sql_attrs:
|
||||
if hasattr(task, attr_name):
|
||||
html_code = "<h3>"+attr_name+"</h3>" + highlight(
|
||||
getattr(task, attr_name),
|
||||
SqlLexer(),
|
||||
HtmlFormatter(noclasses=True)) + html_code
|
||||
|
||||
return self.render(
|
||||
'admin/code.html', html_code=html_code, dag=dag, title=title)
|
||||
|
||||
|
|
|
@ -15,9 +15,6 @@ g.node.running rect{
|
|||
g.node.failed rect {
|
||||
stroke: red;
|
||||
}
|
||||
input, select {
|
||||
margin-bottom: 0px;
|
||||
}
|
||||
div#svg_container {
|
||||
border: 1px solid black;
|
||||
background-color: #EEE;
|
||||
|
|
|
@ -39,3 +39,10 @@ pre code {
|
|||
overflow-wrap: normal;
|
||||
white-space: pre;
|
||||
}
|
||||
textarea {
|
||||
width: 100%;
|
||||
margin-top:2px;
|
||||
}
|
||||
input, select {
|
||||
margin: 0px;
|
||||
}
|
||||
|
|
|
@ -54,13 +54,12 @@
|
|||
</h4>
|
||||
</div>
|
||||
<div class="modal-body">
|
||||
|
||||
<button id="btn_log" type="button" class="btn btn-primary">
|
||||
View Log
|
||||
</button>
|
||||
<button id="btn_task" type="button" class="btn btn-primary">
|
||||
Task Details
|
||||
</button>
|
||||
<button id="btn_log" type="button" class="btn btn-primary">
|
||||
View Log
|
||||
</button>
|
||||
<hr/>
|
||||
<button id="btn_clear" type="button" class="btn btn-primary">
|
||||
Clear
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
{% extends "admin/master.html" %}
|
||||
|
||||
{% block title %}Airflow - DAGs{% endblock %}
|
||||
|
||||
{% block head_css %}
|
||||
{{ super() }}
|
||||
<link rel="stylesheet" type="text/css"
|
||||
href="{{ url_for("static", filename="main.css") }}">
|
||||
{% endblock %}
|
||||
|
||||
{% block body %}
|
||||
<h2>Query</h2>
|
||||
<form method="get">
|
||||
{{ form.db_id }}
|
||||
<input type="submit" class="btn btn-default" value="Run!"><br>
|
||||
{{ form.sql(rows=10) }}
|
||||
</form>
|
||||
{{ results|safe }}
|
||||
{% endblock %}
|
||||
{% block tail %}
|
||||
{{ super() }}
|
||||
<script>
|
||||
</script>
|
||||
{% endblock %}
|
Загрузка…
Ссылка в новой задаче