[AIRFLOW-4026] Add filter by DAG tags (#6489)

This commit is contained in:
Zacharya 2020-01-09 16:43:27 +02:00 коммит произвёл Kaxil Naik
Родитель 5b6772cb83
Коммит a7cacf593f
33 изменённых файлов: 229 добавлений и 32 удалений

Просмотреть файл

@ -880,6 +880,13 @@
type: string
example: ~
default: "0"
- name: session_lifetime_days
description: |
The UI cookie lifetime in days
version_added: ~
type: string
example: ~
default: "30"
- name: email
description: ~

Просмотреть файл

@ -425,6 +425,9 @@ update_fab_perms = True
# 0 means never get forcibly logged out
force_log_out_after = 0
# The UI cookie lifetime in days
session_lifetime_days = 30
[email]
email_backend = airflow.utils.email.send_email_smtp

Просмотреть файл

@ -36,6 +36,7 @@ dag = DAG(
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
tags=['example']
)
run_this_last = DummyOperator(

Просмотреть файл

@ -35,6 +35,7 @@ dag = DAG(
dag_id='example_branch_operator',
default_args=args,
schedule_interval="@daily",
tags=['example']
)
run_this_first = DummyOperator(

Просмотреть файл

@ -37,6 +37,7 @@ dag = DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
default_args=args,
tags=['example']
)

Просмотреть файл

@ -31,7 +31,7 @@ args = {
dag = models.DAG(
dag_id='example_gcs_to_bq_operator', default_args=args,
schedule_interval=None)
schedule_interval=None, tags=['example'])
create_test_dataset = bash_operator.BashOperator(
task_id='create_airflow_test_dataset',

Просмотреть файл

@ -39,7 +39,7 @@ BUCKET_3_DST = os.environ.get("GCP_GCS_BUCKET_3_DST", "test-gcs-sync-3-dst")
with models.DAG(
"example_gcs_to_gcs", default_args=default_args, schedule_interval=None
"example_gcs_to_gcs", default_args=default_args, schedule_interval=None, tags=['example']
) as dag:
sync_full_bucket = GoogleCloudStorageSynchronizeBuckets(
task_id="sync-full-bucket",

Просмотреть файл

@ -37,7 +37,7 @@ DESTINATION_PATH_2 = "/tmp/dirs/"
with models.DAG(
"example_gcs_to_sftp", default_args=default_args, schedule_interval=None
"example_gcs_to_sftp", default_args=default_args, schedule_interval=None, tags=['example']
) as dag:
# [START howto_operator_gcs_to_sftp_copy_single_file]
copy_file_from_gcs_to_sftp = GoogleCloudStorageToSFTPOperator(

Просмотреть файл

@ -38,7 +38,7 @@ default_args = {
'retry_delay': timedelta(minutes=5),
}
dag = DAG('example_http_operator', default_args=default_args)
dag = DAG('example_http_operator', default_args=default_args, tags=['example'])
dag.doc_md = __doc__

Просмотреть файл

@ -30,6 +30,7 @@ dag = DAG(
dag_id='latest_only',
schedule_interval=dt.timedelta(hours=4),
start_date=days_ago(2),
tags=['example']
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

Просмотреть файл

@ -31,6 +31,7 @@ dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=days_ago(2),
tags=['example']
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

Просмотреть файл

@ -34,6 +34,7 @@ dag = DAG(
},
schedule_interval='*/1 * * * *',
dagrun_timeout=timedelta(minutes=4),
tags=['example']
)

Просмотреть файл

@ -32,6 +32,7 @@ dag = DAG(
dag_id='example_pig_operator',
default_args=args,
schedule_interval=None,
tags=['example']
)
run_this = PigOperator(

Просмотреть файл

@ -35,6 +35,7 @@ dag = DAG(
dag_id='example_python_operator',
default_args=args,
schedule_interval=None,
tags=['example']
)

Просмотреть файл

@ -29,7 +29,7 @@ args = {
'start_date': dates.days_ago(2),
}
dag = DAG(dag_id='example_short_circuit_operator', default_args=args)
dag = DAG(dag_id='example_short_circuit_operator', default_args=args, tags=['example'])
cond_true = ShortCircuitOperator(
task_id='condition_is_True',

Просмотреть файл

@ -58,6 +58,6 @@ def create_test_pipeline(suffix, trigger_rule, dag_):
join >> final
dag = DAG(dag_id='example_skip_dag', default_args=args)
dag = DAG(dag_id='example_skip_dag', default_args=args, tags=['example'])
create_test_pipeline('1', 'all_success', dag)
create_test_pipeline('2', 'one_success', dag)

Просмотреть файл

@ -36,6 +36,7 @@ dag = DAG(
dag_id=DAG_NAME,
default_args=args,
schedule_interval="@once",
tags=['example']
)
start = DummyOperator(

Просмотреть файл

@ -31,6 +31,7 @@ dag = DAG(
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow", "start_date": days_ago(2)},
schedule_interval="@once",
tags=['example']
)
trigger = TriggerDagRunOperator(

Просмотреть файл

@ -32,6 +32,7 @@ dag = DAG(
dag_id="example_trigger_target_dag",
default_args={"start_date": days_ago(2), "owner": "airflow"},
schedule_interval=None,
tags=['example']
)

Просмотреть файл

@ -28,7 +28,7 @@ args = {
'start_date': days_ago(2),
}
dag = DAG('example_xcom', schedule_interval="@once", default_args=args)
dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}

Просмотреть файл

@ -21,7 +21,7 @@ from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
dag = DAG(dag_id='test_utils', schedule_interval=None)
dag = DAG(dag_id='test_utils', schedule_interval=None, tags=['example'])
task = BashOperator(
task_id='sleeps_forever',

Просмотреть файл

@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Add DagTags table
Revision ID: 7939bcff74ba
Revises: fe461863935f
Create Date: 2020-01-07 19:39:01.247442
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = '7939bcff74ba'
down_revision = 'fe461863935f'
branch_labels = None
depends_on = None
def upgrade():
"""Apply Add DagTags table"""
op.create_table(
'dag_tag',
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.ForeignKeyConstraint(['dag_id'], ['dag.dag_id'], ),
sa.PrimaryKeyConstraint('name', 'dag_id')
)
def downgrade():
"""Unapply Add DagTags table"""
op.drop_table('dag_tag')

Просмотреть файл

@ -20,7 +20,7 @@
from airflow.models.base import ID_LEN, Base # noqa: F401
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink # noqa: F401
from airflow.models.connection import Connection # noqa: F401
from airflow.models.dag import DAG, DagModel # noqa: F401
from airflow.models.dag import DAG, DagModel, DagTag # noqa: F401
from airflow.models.dagbag import DagBag # noqa: F401
from airflow.models.dagpickle import DagPickle # noqa: F401
from airflow.models.dagrun import DagRun # noqa: F401

Просмотреть файл

@ -32,7 +32,8 @@ import jinja2
import pendulum
from croniter import croniter
from dateutil.relativedelta import relativedelta
from sqlalchemy import Boolean, Column, Index, Integer, String, Text, func, or_
from sqlalchemy import Boolean, Column, ForeignKey, Index, Integer, String, Text, func, or_
from sqlalchemy.orm import backref, relationship
from airflow import settings, utils
from airflow.configuration import conf
@ -181,6 +182,8 @@ class DAG(BaseDag, LoggingMixin):
<https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment>`_
:type jinja_environment_kwargs: dict
:param tags: List of tags to help filtering DAGS in the UI.
:type tags: List[str]
"""
_comps = {
'dag_id',
@ -221,7 +224,8 @@ class DAG(BaseDag, LoggingMixin):
params: Optional[Dict] = None,
access_control: Optional[Dict] = None,
is_paused_upon_creation: Optional[bool] = None,
jinja_environment_kwargs: Optional[Dict] = None
jinja_environment_kwargs: Optional[Dict] = None,
tags: Optional[List[str]] = None
):
self.user_defined_macros = user_defined_macros
self.user_defined_filters = user_defined_filters
@ -310,6 +314,7 @@ class DAG(BaseDag, LoggingMixin):
self.is_paused_upon_creation = is_paused_upon_creation
self.jinja_environment_kwargs = jinja_environment_kwargs
self.tags = tags
def __repr__(self):
return "<DAG: {self.dag_id}>".format(self=self)
@ -1366,6 +1371,7 @@ class DAG(BaseDag, LoggingMixin):
if self.is_paused_upon_creation is not None:
orm_dag.is_paused = self.is_paused_upon_creation
self.log.info("Creating ORM DAG for %s", self.dag_id)
session.add(orm_dag)
if self.is_subdag:
orm_dag.is_subdag = True
orm_dag.fileloc = self.parent_dag.fileloc
@ -1379,7 +1385,8 @@ class DAG(BaseDag, LoggingMixin):
orm_dag.default_view = self._default_view
orm_dag.description = self.description
orm_dag.schedule_interval = self.schedule_interval
session.merge(orm_dag)
orm_dag.tags = self.get_dagtags(session=session)
session.commit()
for subdag in self.subdags:
@ -1395,6 +1402,28 @@ class DAG(BaseDag, LoggingMixin):
session=session
)
@provide_session
def get_dagtags(self, session=None):
"""
Creating a list of DagTags, if one is missing from the DB, will insert.
:return: The DagTag list.
:rtype: list
"""
tags = []
if not self.tags:
return tags
for name in set(self.tags):
tag = session.query(
DagTag).filter(DagTag.name == name).filter(DagTag.dag_id == self.dag_id).first()
if not tag:
tag = DagTag(name=name, dag_id=self.dag_id)
session.add(tag)
tags.append(tag)
session.commit()
return tags
@staticmethod
@provide_session
def deactivate_unknown_dags(active_dag_ids, session=None):
@ -1529,6 +1558,15 @@ class DAG(BaseDag, LoggingMixin):
return cls.__serialized_fields
class DagTag(Base):
"""
A tag name per dag, to allow quick filtering in the DAG view.
"""
__tablename__ = "dag_tag"
name = Column(String(100), primary_key=True)
dag_id = Column(String(ID_LEN), ForeignKey('dag.dag_id'), primary_key=True)
class DagModel(Base):
__tablename__ = "dag"
@ -1571,6 +1609,8 @@ class DagModel(Base):
default_view = Column(String(25))
# Schedule interval
schedule_interval = Column(Interval)
# Tags for view filter
tags = relationship('DagTag', cascade='all,delete-orphan', backref=backref('dag'))
__table_args__ = (
Index('idx_root_dag_id', root_dag_id, unique=False),

Просмотреть файл

@ -95,7 +95,8 @@
"doc_md": { "type" : "string"},
"_default_view": { "type" : "string"},
"_access_control": {"$ref": "#/definitions/dict" },
"is_paused_upon_creation": { "type": "boolean" }
"is_paused_upon_creation": { "type": "boolean" },
"tags": { "type": "array" }
},
"required": [
"_dag_id",

Просмотреть файл

@ -20,12 +20,13 @@
import datetime
import logging
import socket
from datetime import timedelta
from typing import Any, Optional
from urllib.parse import urlparse
import flask
import flask_login
from flask import Flask
from flask import Flask, session as flask_session
from flask_appbuilder import SQLA, AppBuilder
from flask_caching import Cache
from flask_wtf.csrf import CSRFProtect
@ -60,6 +61,9 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"):
)
app.secret_key = conf.get('webserver', 'SECRET_KEY')
session_lifetime_days = conf.getint('webserver', 'SESSION_LIFETIME_DAYS', fallback=30)
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(days=session_lifetime_days)
app.config.from_pyfile(settings.WEBSERVER_CONFIG, silent=True)
app.config['APP_NAME'] = app_name
app.config['TESTING'] = testing
@ -257,6 +261,10 @@ def create_app(config=None, session=None, testing=False, app_name="Airflow"):
def shutdown_session(exception=None): # pylint: disable=unused-variable
settings.Session.remove()
@app.before_request
def make_session_permanent():
flask_session.permanent = True
return app, appbuilder

Просмотреть файл

@ -32,10 +32,21 @@
<div id="main_content">
<div class="row">
<div class="col-sm-2">
<div class="col-sm-4">
<form id="filter_form" class="form-inline" style="width: 100%; text-align: left;">
<div id="dags_filter" class="form-group" style="width: 100%;">
<select multiple name="tags" id="tags_filter" class="select2-drop-mask" style="width: 60%;">
{% for tag in tags %}
<option value="{{ tag.name }}" {% if tag.selected %}selected{% endif %}>{{ tag.name }}</option>
{% endfor %}
</select>
<input type="submit" value="Filter tags" class="btn btn-default">
<input type="submit" value="Reset" class="btn btn-default" name="reset_tags">
</div>
</form>
</div>
<div class="col-sm-10">
<form id="search_form" class="form-inline" style="width: 100%; text-align: right;">
<div class="col-sm-8">
<form id="search_form" class="form-inline" style="width: 100%; text-align: right; float: right;">
<div id="dags_filter" class="form-group" style="width: 100%;">
<label for="dag_query" style="width:20%; text-align: right;">Search:</label>
<input id="dag_query" type="text" class="typeahead form-control" data-provide="typeahead" style="width:50%;" value="{{search_query}}" autocomplete="off">
@ -81,9 +92,17 @@
<!-- Column 3: Name -->
<td>
<a href="{{ url_for('Airflow.'+ dag.get_default_view(), dag_id=dag.dag_id) }}" title="{{ dag.description }}">
{{ dag.dag_id }}
</a>
<span>
<a href="{{ url_for('Airflow.'+ dag.get_default_view(), dag_id=dag.dag_id) }}" title="{{ dag.description }}">
{{ dag.dag_id }}
</a>
</span>
<div style="float: right; max-width: 70%; text-align: right; line-height: 160%;">
{% for tag in dag.tags %}
<span class="label label-success" style="margin: 3px;">{{ tag.name }}</span>
{% endfor %}
</div>
</td>
<!-- Column 4: Dag Schedule -->
@ -217,6 +236,11 @@
const DAGS_INDEX = "{{ url_for('Airflow.index') }}";
const ENTER_KEY_CODE = 13;
$('#tags_filter').select2({
placeholder: "Filter dags",
allowClear: true
});
$('#dag_query').on('keypress', function (e) {
// check for key press on ENTER (key code 13) to trigger the search
if (e.which === ENTER_KEY_CODE) {

Просмотреть файл

@ -471,9 +471,12 @@ class CustomSQLAInterface(SQLAInterface):
def is_utcdatetime(self, col_name):
from airflow.utils.sqlalchemy import UtcDateTime
obj = self.list_columns[col_name].type
return isinstance(obj, UtcDateTime) or \
isinstance(obj, sqla.types.TypeDecorator) and \
isinstance(obj.impl, UtcDateTime)
if col_name in self.list_columns:
obj = self.list_columns[col_name].type
return isinstance(obj, UtcDateTime) or \
isinstance(obj, sqla.types.TypeDecorator) and \
isinstance(obj.impl, UtcDateTime)
return False
filter_converter_class = UtcAwareFilterConverter

Просмотреть файл

@ -34,7 +34,8 @@ import lazy_object_proxy
import markdown
import sqlalchemy as sqla
from flask import (
Markup, Response, escape, flash, jsonify, make_response, redirect, render_template, request, url_for,
Markup, Response, escape, flash, jsonify, make_response, redirect, render_template, request,
session as flask_session, url_for,
)
from flask_appbuilder import BaseView, ModelView, expose, has_access
from flask_appbuilder.actions import action
@ -43,6 +44,7 @@ from flask_babel import lazy_gettext
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
from sqlalchemy import and_, desc, or_, union_all
from sqlalchemy.orm import joinedload
from wtforms import SelectField, validators
import airflow
@ -53,7 +55,7 @@ from airflow.api.common.experimental.mark_tasks import (
)
from airflow.configuration import AIRFLOW_CONFIG, conf
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Connection, DagModel, DagRun, Log, SlaMiss, TaskFail, XCom, errors
from airflow.models import Connection, DagModel, DagRun, DagTag, Log, SlaMiss, TaskFail, XCom, errors
from airflow.settings import STORE_SERIALIZED_DAGS
from airflow.ti_deps.dep_context import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS, DepContext
from airflow.utils import timezone
@ -70,6 +72,7 @@ from airflow.www.forms import (
from airflow.www.widgets import AirflowModelListWidget
PAGE_SIZE = conf.getint('webserver', 'page_size')
FILTER_TAGS_COOKIE = 'tags_filter'
if os.environ.get('SKIP_DAGS_PARSING') != 'True':
dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
@ -225,6 +228,17 @@ class Airflow(AirflowBaseView):
arg_current_page = request.args.get('page', '0')
arg_search_query = request.args.get('search', None)
arg_tags_filter = request.args.getlist('tags', None)
if request.args.get('reset_tags') is not None:
flask_session[FILTER_TAGS_COOKIE] = None
arg_tags_filter = None
else:
cookie_val = flask_session.get(FILTER_TAGS_COOKIE)
if arg_tags_filter:
flask_session[FILTER_TAGS_COOKIE] = ','.join(arg_tags_filter)
elif cookie_val:
arg_tags_filter = cookie_val.split(',')
dags_per_page = PAGE_SIZE
current_page = get_int_arg(arg_current_page, default=0)
@ -258,10 +272,21 @@ class Airflow(AirflowBaseView):
DagModel.owners.ilike('%' + arg_search_query + '%')
)
if arg_tags_filter:
dags_query = dags_query.filter(DagModel.tags.any(DagTag.name.in_(arg_tags_filter)))
if 'all_dags' not in filter_dag_ids:
dags_query = dags_query.filter(DagModel.dag_id.in_(filter_dag_ids))
dags = dags_query.order_by(DagModel.dag_id).offset(start).limit(dags_per_page).all()
dags = dags_query.order_by(DagModel.dag_id).options(
joinedload(DagModel.tags)).offset(start).limit(dags_per_page).all()
tags = []
dagtags = session.query(DagTag.name).distinct(DagTag.name).all()
tags = [
{"name": name, "selected": bool(arg_tags_filter and name in arg_tags_filter)}
for name, in dagtags
]
import_errors = session.query(errors.ImportError).all()
@ -301,7 +326,8 @@ class Airflow(AirflowBaseView):
search=escape(arg_search_query) if arg_search_query else None,
showPaused=not hide_paused),
auto_complete_data=auto_complete_data,
num_runs=num_runs)
num_runs=num_runs,
tags=tags)
@expose('/dag_stats')
@has_access

Двоичные данные
docs/img/dags.png

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 212 KiB

После

Ширина:  |  Высота:  |  Размер: 117 KiB

Просмотреть файл

@ -30,6 +30,15 @@ List of the DAGs in your environment, and a set of shortcuts to useful pages.
You can see exactly how many tasks succeeded, failed, or are currently
running at a glance. To hide completed tasks set show_recent_stats_for_completed_runs = False
In order to filter DAGs (e.g by team), you can add tags in each dag.
The filter is saved in a cookie and can be reset by the reset button.
For example:
.. code:: python
dag = DAG('dag', tags=['team1', 'sql'])
------------
.. image:: img/dags.png

Просмотреть файл

@ -16,7 +16,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.models import Connection, DagModel, DagRun, Pool, SlaMiss, TaskInstance, errors
from airflow.models import Connection, DagModel, DagRun, DagTag, Pool, SlaMiss, TaskInstance, errors
from airflow.utils.db import add_default_pool_if_not_exists, create_default_connections
from airflow.utils.session import create_session
@ -29,6 +29,7 @@ def clear_db_runs():
def clear_db_dags():
with create_session() as session:
session.query(DagTag).delete()
session.query(DagModel).delete()

Просмотреть файл

@ -33,7 +33,7 @@ from unittest import mock
from urllib.parse import quote_plus
import jinja2
from flask import Markup, url_for
from flask import Markup, session as flask_session, url_for
from parameterized import parameterized
from werkzeug.test import Client
from werkzeug.wrappers import BaseResponse
@ -446,6 +446,15 @@ class TestAirflowBaseViews(TestBase):
resp = self.client.get('home', follow_redirects=True)
self.check_content_in_response('DAGs', resp)
def test_home_filter_tags(self):
from airflow.www.views import FILTER_TAGS_COOKIE
with self.client:
self.client.get('home?tags=example&tags=data', follow_redirects=True)
self.assertEqual('example,data', flask_session[FILTER_TAGS_COOKIE])
self.client.get('home?reset_tags', follow_redirects=True)
self.assertEqual(None, flask_session[FILTER_TAGS_COOKIE])
def test_task(self):
url = ('task?task_id=runme_0&dag_id=example_bash_operator&execution_date={}'
.format(self.percent_encode(self.EXAMPLE_DAG_DEFAULT_DATE)))
@ -687,17 +696,22 @@ class TestAirflowBaseViews(TestBase):
# The delete-dag URL should be generated correctly for DAGs
# that exist on the scheduler (DB) but not the webserver DagBag
dag_id = 'example_bash_operator'
test_dag_id = "non_existent_dag"
DM = models.DagModel
self.session.query(DM).filter(DM.dag_id == 'example_bash_operator').update({'dag_id': test_dag_id})
dag_query = self.session.query(DM).filter(DM.dag_id == dag_id)
dag_query.first().tags = [] # To avoid "FOREIGN KEY constraint" error
self.session.commit()
dag_query.update({'dag_id': test_dag_id})
self.session.commit()
resp = self.client.get('/', follow_redirects=True)
self.check_content_in_response('/delete?dag_id={}'.format(test_dag_id), resp)
self.check_content_in_response("return confirmDeleteDag(this, '{}')".format(test_dag_id), resp)
self.session.query(DM).filter(DM.dag_id == test_dag_id).update({'dag_id': 'example_bash_operator'})
self.session.query(DM).filter(DM.dag_id == test_dag_id).update({'dag_id': dag_id})
self.session.commit()