Merge pull request #64 from mistercrunch/flasklogin

Flasklogin
This commit is contained in:
Maxime Beauchemin 2014-12-22 23:57:27 -08:00
Родитель a0967ed06b edd6cf5119
Коммит 76fc9f1734
23 изменённых файлов: 183 добавлений и 74 удалений

Просмотреть файл

@ -1,5 +1,6 @@
[core]
AIRFLOW_HOME: TO_REPLACE_FROM_OS_ENVIRON
AUTHENTICATE: false
BASE_LOG_FOLDER: %(AIRFLOW_HOME)s/logs
DAGS_FOLDER: %(AIRFLOW_HOME)s/dags
BASE_FOLDER: %(AIRFLOW_HOME)s/airflow

Просмотреть файл

@ -1,6 +1,6 @@
#!/usr/bin/env python
from airflow.configuration import getconf
from airflow.configuration import conf
from airflow import settings
from airflow import jobs
from airflow.models import DagBag, TaskInstance, DagPickle
@ -45,7 +45,7 @@ def run(args):
settings.pessimistic_connection_handling()
# Setting up logging
directory = getconf().get('core', 'BASE_LOG_FOLDER') + \
directory = conf.get('core', 'BASE_LOG_FOLDER') + \
"/{args.dag_id}/{args.task_id}".format(args=args)
if not os.path.exists(directory):
os.makedirs(directory)
@ -226,7 +226,7 @@ if __name__ == '__main__':
action="store_true")
parser_backfill.add_argument(
"-sd", "--subdir", help=subdir_help,
default=getconf().get('core', 'DAGS_FOLDER'))
default=conf.get('core', 'DAGS_FOLDER'))
parser_backfill.set_defaults(func=backfill)
ht = "Clear a set of task instance, as if they never ran"
@ -250,7 +250,7 @@ if __name__ == '__main__':
"-d", "--downstream", help=ht, action="store_true")
parser_clear.add_argument(
"-sd", "--subdir", help=subdir_help,
default=getconf().get('core', 'DAGS_FOLDER'))
default=conf.get('core', 'DAGS_FOLDER'))
parser_clear.set_defaults(func=clear)
ht = "Run a single task instance"
@ -261,7 +261,7 @@ if __name__ == '__main__':
"execution_date", help="The execution date to run")
parser_run.add_argument(
"-sd", "--subdir", help=subdir_help,
default=getconf().get('core', 'DAGS_FOLDER'))
default=conf.get('core', 'DAGS_FOLDER'))
parser_run.add_argument(
"-m", "--mark_success", help=mark_success_help, action="store_true")
parser_run.add_argument(
@ -281,12 +281,12 @@ if __name__ == '__main__':
parser_webserver = subparsers.add_parser('webserver', help=ht)
parser_webserver.add_argument(
"-p", "--port",
default=getconf().get('server', 'WEB_SERVER_PORT'),
default=conf.get('server', 'WEB_SERVER_PORT'),
type=int,
help="Set the port on which to run the web server")
parser_webserver.add_argument(
"-hn", "--hostname",
default=getconf().get('server', 'WEB_SERVER_HOST'),
default=conf.get('server', 'WEB_SERVER_HOST'),
help="Set the hostname on which to run the web server")
ht = "Use the server that ships with Flask in debug mode"
parser_webserver.add_argument(
@ -299,7 +299,7 @@ if __name__ == '__main__':
"-d", "--dag_id", help="The id of the dag to run")
parser_master.add_argument(
"-sd", "--subdir", help=subdir_help,
default=getconf().get('core', 'DAGS_FOLDER'))
default=conf.get('core', 'DAGS_FOLDER'))
parser_master.set_defaults(func=master)
ht = "Initialize and reset the metadata database"

Просмотреть файл

@ -54,3 +54,5 @@ class AirflowConfigParser(ConfigParser):
def getconf():
return AirflowConfigParser.instance()
conf = getconf()

Просмотреть файл

@ -3,7 +3,7 @@ import multiprocessing
import time
from airflow.executors.base_executor import BaseExecutor
from airflow.configuration import getconf
from airflow.configuration import conf
from airflow.utils import State
from celery_worker import execute_command
@ -54,7 +54,7 @@ class CelerySubmitter(multiprocessing.Process):
self.task_queue.task_done()
break
key, command = work
BASE_FOLDER = getconf().get('core', 'BASE_FOLDER')
BASE_FOLDER = conf.get('core', 'BASE_FOLDER')
command = (
"exec bash -c '"
"cd $AIRFLOW_HOME;\n" +

Просмотреть файл

@ -1,7 +1,7 @@
import subprocess
import logging
from airflow.configuration import getconf
from airflow.configuration import conf
from celery import Celery
# to start the celery worker, run the command:
@ -10,9 +10,9 @@ from celery import Celery
# app = Celery('airflow.executors.celery_worker', backend='amqp', broker='amqp://')
app = Celery(
getconf().get('celery', 'CELERY_APP_NAME'),
backend=getconf().get('celery', 'CELERY_BROKER'),
broker=getconf().get('celery', 'CELERY_RESULTS_BACKEND'))
conf.get('celery', 'CELERY_APP_NAME'),
backend=conf.get('celery', 'CELERY_BROKER'),
broker=conf.get('celery', 'CELERY_RESULTS_BACKEND'))
@app.task(name='airflow.executors.celery_worker.execute_command')
def execute_command(command):

Просмотреть файл

@ -5,11 +5,11 @@ import sys
from tempfile import NamedTemporaryFile
from airflow.models import DatabaseConnection
from airflow.configuration import getconf
from airflow.configuration import conf
from airflow import settings
# Adding the Hive python libs to python path
sys.path.insert(0, getconf().get('hooks', 'HIVE_HOME_PY'))
sys.path.insert(0, conf.get('hooks', 'HIVE_HOME_PY'))
from thrift.transport import TSocket
from thrift.transport import TTransport
@ -21,7 +21,7 @@ from airflow.hooks.base_hook import BaseHook
class HiveHook(BaseHook):
def __init__(self,
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID')):
hive_dbid=conf.get('hooks', 'HIVE_DEFAULT_DBID')):
session = settings.Session()
db = session.query(
DatabaseConnection).filter(

Просмотреть файл

@ -1,7 +1,7 @@
import subprocess
from airflow import settings
from airflow.configuration import getconf
from airflow.configuration import conf
from airflow.models import DatabaseConnection
from airflow.hooks.base_hook import BaseHook
from airflow.hooks.presto.presto_client import PrestoClient
@ -13,7 +13,7 @@ class PrestoHook(BaseHook):
"""
Interact with Presto!
"""
def __init__(self, presto_dbid=getconf().get('hooks', 'PRESTO_DEFAULT_DBID')):
def __init__(self, presto_dbid=conf.get('hooks', 'PRESTO_DEFAULT_DBID')):
session = settings.Session()
db = session.query(
DatabaseConnection).filter(

Просмотреть файл

@ -11,7 +11,7 @@ from sqlalchemy import func
from sqlalchemy.orm.session import make_transient
from airflow.executors import DEFAULT_EXECUTOR
from airflow.configuration import getconf
from airflow.configuration import conf
from airflow import models
from airflow import settings
from airflow import utils
@ -20,7 +20,7 @@ from airflow.utils import State
Base = models.Base
ID_LEN = getconf().getint('misc', 'ID_LEN')
ID_LEN = conf.getint('misc', 'ID_LEN')
class BaseJob(Base):
"""
@ -51,7 +51,7 @@ class BaseJob(Base):
def __init__(
self,
executor=DEFAULT_EXECUTOR,
heartrate=getconf().getint('misc', 'JOB_HEARTBEAT_SEC'),
heartrate=conf.getint('misc', 'JOB_HEARTBEAT_SEC'),
*args, **kwargs):
self.hostname = socket.gethostname()
self.executor = executor
@ -65,7 +65,7 @@ class BaseJob(Base):
def is_alive(self):
return (
(datetime.now() - self.latest_heartbeat).seconds <
(getconf().getint('misc', 'JOB_HEARTBEAT_SEC') * 2.1)
(conf.getint('misc', 'JOB_HEARTBEAT_SEC') * 2.1)
)
def heartbeat(self):

Просмотреть файл

@ -1,8 +1,8 @@
from airflow.configuration import getconf
from airflow.configuration import conf
def max_partition(
table, schema="default",
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID')):
hive_dbid=conf.get('hooks', 'HIVE_DEFAULT_DBID')):
from airflow.hooks.hive_hook import HiveHook
if '.' in table:
schema, table = table.split('.')

Просмотреть файл

@ -18,14 +18,14 @@ from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import relationship
from airflow.executors import DEFAULT_EXECUTOR
from airflow.configuration import getconf
from airflow.configuration import conf
from airflow import settings
from airflow import utils
from airflow.utils import State
from airflow.utils import apply_defaults
Base = declarative_base()
ID_LEN = getconf().getint('misc', 'ID_LEN')
ID_LEN = conf.getint('misc', 'ID_LEN')
class DagBag(object):
"""
@ -42,7 +42,7 @@ class DagBag(object):
dag_folder=None,
executor=DEFAULT_EXECUTOR):
if not dag_folder:
dag_folder = getconf().get('core', 'DAGS_FOLDER')
dag_folder = conf.get('core', 'DAGS_FOLDER')
logging.info("Filling up the DagBag from " + dag_folder)
self.dag_folder = dag_folder
self.dags = {}
@ -113,6 +113,19 @@ class User(Base):
self.username = username
self.email = email
def get_id(self):
return unicode(self.id)
def is_active(self):
return True
def is_authenticated(self):
return True
def is_anonymous(self):
return False
class DatabaseConnection(Base):
"""
@ -250,13 +263,13 @@ class TaskInstance(Base):
@property
def log_filepath(self):
iso = self.execution_date.isoformat()
return getconf().get('core', 'BASE_LOG_FOLDER') + \
return conf.get('core', 'BASE_LOG_FOLDER') + \
"/{self.dag_id}/{self.task_id}/{iso}.log".format(**locals())
@property
def log_url(self):
iso = self.execution_date.isoformat()
BASE_URL = getconf().get('core', 'BASE_URL')
BASE_URL = conf.get('core', 'BASE_URL')
return BASE_URL + (
"/admin/airflow/log"
"?dag_id={self.dag_id}"
@ -930,7 +943,7 @@ class DAG(Base):
@property
def filepath(self):
base = getconf().get('core', 'DAGS_FOLDER')
base = conf.get('core', 'DAGS_FOLDER')
return self.full_filepath.replace(base + '/', '')
@property

Просмотреть файл

@ -1,6 +1,6 @@
import logging
from airflow.configuration import getconf
from airflow.configuration import conf
from airflow.hooks import HiveHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
@ -25,7 +25,7 @@ class HiveOperator(BaseOperator):
@apply_defaults
def __init__(
self, hql,
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID'),
hive_dbid=conf.get('hooks', 'HIVE_DEFAULT_DBID'),
*args, **kwargs):
super(HiveOperator, self).__init__(*args, **kwargs)

Просмотреть файл

@ -3,7 +3,7 @@ import logging
from time import sleep
from airflow import settings
from airflow.configuration import getconf
from airflow.configuration import conf
from airflow.hooks import HiveHook
from airflow.models import BaseOperator
from airflow.models import DatabaseConnection as DB
@ -132,7 +132,7 @@ class HivePartitionSensor(BaseSensorOperator):
def __init__(
self,
table, partition="ds='{{ ds }}'",
hive_dbid=getconf().get('hooks', 'HIVE_DEFAULT_DBID'),
hive_dbid=conf.get('hooks', 'HIVE_DEFAULT_DBID'),
schema='default',
*args, **kwargs):
super(HivePartitionSensor, self).__init__(*args, **kwargs)

Просмотреть файл

@ -6,7 +6,7 @@ from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy.pool import Pool
from airflow.configuration import getconf
from airflow.configuration import conf
HEADER = """\
.__ _____.__
@ -31,9 +31,9 @@ def pessimistic_connection_handling():
cursor.close()
BASE_FOLDER = getconf().get('core', 'BASE_FOLDER')
BASE_FOLDER = conf.get('core', 'BASE_FOLDER')
BASE_LOG_URL = "/admin/airflow/log"
SQL_ALCHEMY_CONN = getconf().get('core', 'SQL_ALCHEMY_CONN')
SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
if BASE_FOLDER not in sys.path:
sys.path.append(BASE_FOLDER)

Просмотреть файл

@ -3,7 +3,7 @@ from functools import wraps
import inspect
import logging
import re
from airflow.configuration import getconf
from airflow.configuration import conf
class State(object):
@ -140,11 +140,11 @@ def send_email(to, subject, html_content):
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
SMTP_HOST = getconf().get('smtp', 'SMTP_HOST')
SMTP_MAIL_FROM = getconf().get('smtp', 'SMTP_MAIL_FROM')
SMTP_PORT = getconf().get('smtp', 'SMTP_PORT')
SMTP_USER = getconf().get('smtp', 'SMTP_USER')
SMTP_PASSWORD = getconf().get('smtp', 'SMTP_PASSWORD')
SMTP_HOST = conf.get('smtp', 'SMTP_HOST')
SMTP_MAIL_FROM = conf.get('smtp', 'SMTP_MAIL_FROM')
SMTP_PORT = conf.get('smtp', 'SMTP_PORT')
SMTP_USER = conf.get('smtp', 'SMTP_USER')
SMTP_PASSWORD = conf.get('smtp', 'SMTP_PASSWORD')
if type(to) is type(list()):
to = ','.join(to)

Просмотреть файл

@ -20,7 +20,6 @@ from pygments.formatters import HtmlFormatter
import jinja2
import markdown
import chartkick
@ -29,13 +28,22 @@ from airflow import jobs
from airflow import models
from airflow.models import State
from airflow import settings
from airflow.configuration import getconf
from airflow.configuration import conf
from airflow import utils
dagbag = models.DagBag(getconf().get('core', 'DAGS_FOLDER'))
from airflow.www.login import login_manager
import flask_login
from flask_login import login_required
AUTHENTICATE = conf.getboolean('core', 'AUTHENTICATE')
if AUTHENTICATE is False:
login_required = lambda x: x
dagbag = models.DagBag(conf.get('core', 'DAGS_FOLDER'))
session = Session()
app = Flask(__name__)
login_manager.init_app(app)
app.secret_key = 'airflowified'
# Init for chartkick, the python wrapper for highcharts
@ -129,6 +137,7 @@ class Airflow(BaseView):
return self.render('airflow/dags.html')
@expose('/query')
@login_required
def query(self):
session = settings.Session()
dbs = session.query(models.DatabaseConnection).order_by(
@ -175,6 +184,7 @@ class Airflow(BaseView):
has_data=has_data)
@expose('/chart')
@login_required
def chart(self):
from pandas.tslib import Timestamp
session = settings.Session()
@ -311,23 +321,61 @@ class Airflow(BaseView):
dag_id = request.args.get('dag_id')
dag = dagbag.dags[dag_id]
code = "".join(open(dag.full_filepath, 'r').readlines())
title = dag.filepath.replace(getconf().get('core', 'BASE_FOLDER') + '/dags/', '')
title = dag.filepath.replace(conf.get('core', 'BASE_FOLDER') + '/dags/', '')
html_code = highlight(
code, PythonLexer(), HtmlFormatter(noclasses=True))
return self.render(
'airflow/code.html', html_code=html_code, dag=dag, title=title)
@expose('/noaccess')
def noaccess(self):
return self.render('airflow/noaccess.html')
@expose('/login')
def login(u):
session = settings.Session()
role = 'airpal_topsecret.engineering.airbnb.com'
if not 'X-Internalauth-Username' in request.headers:
return redirect(url_for('airflow.noaccess'))
username = request.headers.get('X-Internalauth-Username')
has_access = role in request.headers.get('X-Internalauth-Groups')
d = {k:v for k, v in request.headers}
import urllib2
cookie = urllib2.unquote(d.get('Cookie'))
cookie = ''.join(cookie.split('j:')[1:]).split('; _ga=')[0]
cookie = json.loads(cookie)
#email = [email for email in cookie['mail'] if 'airbnb.com' in email][0]
email = str(cookie['data']['userData']['mail'][0])
if has_access:
user = session.query(models.User).filter(models.User.username==username).first()
if not user:
user = models.User(username=username)
user.email = email
session.merge(user)
flask_login.login_user(user)
session.commit()
session.close()
return redirect(request.args.get("next") or url_for("index"))
return redirect('/')
@expose('/logout')
def logout(self):
flask_login.logout_user()
return redirect('/admin')
@expose('/log')
def log(self):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dag = dagbag.dags[dag_id]
loc = getconf().get('core', 'BASE_LOG_FOLDER') + "/{dag_id}/{task_id}/{execution_date}"
loc = conf.get('core', 'BASE_LOG_FOLDER') + "/{dag_id}/{task_id}/{execution_date}"
loc = loc.format(**locals())
try:
f = open(loc)
log = "".join(f.readlines())
f.close()
except:
log = "The log file '{loc}' is missing.".format(**locals())
TI = models.TaskInstance
@ -341,7 +389,6 @@ class Airflow(BaseView):
session.commit()
session.close()
log = "<pre><code>" + log + "</code></pre>"
title = "Logs for {task_id} on {execution_date}".format(**locals())
html_code = log
@ -729,6 +776,10 @@ admin.add_view(Airflow(name='DAGs'))
# Leveraging the admin for CRUD and browse on models
# ------------------------------------------------
class LoginMixin(object):
def is_accessible(self):
return AUTHENTICATE is False or flask_login.current_user.is_authenticated()
class ModelViewOnly(ModelView):
"""
@ -789,12 +840,13 @@ class JobModelView(ModelViewOnly):
mv = JobModelView(jobs.BaseJob, session, name="Jobs", category="Admin")
admin.add_view(mv)
mv = ModelView(models.User, session, name="Users", category="Admin")
class UserModelView(LoginMixin, ModelView):
pass
mv = UserModelView(models.User, session, name="Users", category="Admin")
admin.add_view(mv)
class DatabaseConnectionModelView(ModelView):
class DatabaseConnectionModelView(LoginMixin, ModelView):
column_list = ('db_id', 'db_type', 'host', 'port')
form_choices = {
'db_type': [
@ -837,7 +889,7 @@ def label_link(v, c, m, p):
return Markup("<a href='{url}'>{m.label}</a>".format(**locals()))
class ChartModelView(ModelView):
class ChartModelView(LoginMixin, ModelView):
column_list = ('label', 'db_id', 'chart_type', 'show_datatable', )
column_formatters = dict(label=label_link)
create_template = 'airflow/chart/create.html'

27
airflow/www/login.py Normal file
Просмотреть файл

@ -0,0 +1,27 @@
import urllib2
import json
import flask_login
from flask import redirect
from airflow.models import User
from airflow import settings
from airflow.configuration import conf
login_manager = flask_login.LoginManager()
@login_manager.user_loader
def load_user(userid):
session = settings.Session()
user = session.query(User).filter(User.id == userid).first()
if not user:
#user = User(username=username)
raise Exception(userid)
session.expunge_all()
session.commit()
session.close()
return user
login_manager.login_view = 'airflow.login'
login_manager.login_message = None

Просмотреть файл

@ -81,3 +81,6 @@ table.dataTable.dataframe thead .sorting_asc {
.no-wrap {
white-space: nowrap;
}
div.form-inline{
margin-bottom: 5px;
}

Просмотреть файл

@ -11,8 +11,10 @@
{% block brand %}
<a class="navbar-brand" rel="home" href="#">
<img style="float: left; width:35px; margin-top: -7px;"
src="{{ url_for("static", filename="pin_100.png") }}">
<span>AirFlow</span>
</a>
<img
style="float: left; width:35px; margin-top: -7px;"
src="{{ url_for("static", filename="pin_100.png") }}"
title="{{ current_user.username }}">
<span>AirFlow</span>
</a>
{% endblock %}

Просмотреть файл

@ -4,7 +4,5 @@
{% block body %}
{{ super() }}
<h2>{{ title }}</h2>
<div class="panel">
{{ html_code|safe }}
</div>
{{ html_code|safe }}
{% endblock %}

Просмотреть файл

@ -8,10 +8,11 @@
{% block body %}
{{ super() }}
<form method="get">
Run:<input type="hidden" value="{{ dag.dag_id }}" name="dag_id">
{{ form.execution_date | safe }}
<input type="submit" value="Go" class="btn btn-default"
action="" method="get">
<div class="form-inline">
Run:<input type="hidden" value="{{ dag.dag_id }}" name="dag_id">
{{ form.execution_date(class_="form-control") | safe }}
<input type="submit" value="Go" class="btn btn-default" action="" method="get">
</div>
</form>
<div id="container"></div>
{% endblock %}

Просмотреть файл

@ -16,12 +16,14 @@
{% block body %}
{{ super() }}
<form method="get">
Run:<input type="hidden" value="{{ dag.dag_id }}" name="dag_id">
{{ form.execution_date | safe }}
Layout:
{{ form.arrange | safe }}
<input type="submit" value="Go" class="btn btn-default"
action="" method="get">
<div class="form-inline">
Run:<input type="hidden" value="{{ dag.dag_id }}" name="dag_id">
{{ form.execution_date(class_="form-control") | safe }}
Layout:
{{ form.arrange(class_="form-control") | safe }}
<input type="submit" value="Go" class="btn btn-default"
action="" method="get">
</div>
</form>
<div id="svg_container">
<svg height=800 width="100%">

Просмотреть файл

@ -0,0 +1,7 @@
{% extends "airflow/master.html" %}
{% block title %}{{ title }}{% endblock %}
{% block body %}
You don't seem to have access. Please contact your administrator.
{% endblock %}

Просмотреть файл

@ -13,8 +13,10 @@
{% block body %}
<h2>Query</h2>
<form method="get">
{{ form.db_id }}
<input type="submit" class="btn btn-default" value="Run!"><br>
<div class="form-inline">
{{ form.db_id }}
<input type="submit" class="btn btn-default" value="Run!"><br>
</div>
<div id='ace_container'>
{{ form.sql(rows=10) }}
</div>
@ -48,7 +50,6 @@
"iDisplayLength": 25,
});
$('select').addClass("form-control");
$('select#db_id').attr("style", "width: auto; float:left;");
});
</script>
{% endblock %}