diff --git a/airflow/www/views.py b/airflow/www/views.py index 4ea826ce8e..129c0edcb1 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -38,11 +38,11 @@ from flask import ( ) from flask_appbuilder import BaseView, ModelView, expose, has_access, permission_name from flask_appbuilder.actions import action -from flask_appbuilder.models.sqla.filters import BaseFilter +from flask_appbuilder.models.sqla.filters import BaseFilter # noqa from flask_babel import lazy_gettext from jinja2.utils import htmlsafe_json_dumps, pformat # type: ignore from pygments import highlight, lexers -from pygments.formatters import HtmlFormatter +from pygments.formatters import HtmlFormatter # noqa pylint: disable=no-name-in-module from sqlalchemy import and_, desc, func, or_, union_all from sqlalchemy.orm import joinedload from wtforms import SelectField, validators @@ -60,6 +60,7 @@ from airflow.jobs.scheduler_job import SchedulerJob from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors from airflow.models.dagcode import DagCode from airflow.models.dagrun import DagRun, DagRunType +from airflow.models.taskinstance import TaskInstance from airflow.ti_deps.dep_context import DepContext from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS from airflow.utils import timezone @@ -94,33 +95,32 @@ def get_safe_url(url): return url_for('Airflow.index') -def get_date_time_num_runs_dag_runs_form_data(request, session, dag): +def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag): """Get Execution Data, Base Date & Number of runs from a Request """ - dttm = request.args.get('execution_date') - if dttm: - dttm = timezone.parse(dttm) + date_time = www_request.args.get('execution_date') + if date_time: + date_time = timezone.parse(date_time) else: - dttm = dag.get_latest_execution_date(session=session) or timezone.utcnow() + date_time = dag.get_latest_execution_date(session=session) or timezone.utcnow() - base_date = request.args.get('base_date') + base_date = www_request.args.get('base_date') if base_date: base_date = timezone.parse(base_date) else: # The DateTimeField widget truncates milliseconds and would loose # the first dag run. Round to next second. - base_date = (dttm + timedelta(seconds=1)).replace(microsecond=0) + base_date = (date_time + timedelta(seconds=1)).replace(microsecond=0) default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') - num_runs = request.args.get('num_runs') + num_runs = www_request.args.get('num_runs') num_runs = int(num_runs) if num_runs else default_dag_run - DR = models.DagRun drs = ( - session.query(DR) + session.query(DagRun) .filter( - DR.dag_id == dag.dag_id, - DR.execution_date <= base_date) - .order_by(desc(DR.execution_date)) + DagRun.dag_id == dag.dag_id, + DagRun.execution_date <= base_date) + .order_by(desc(DagRun.execution_date)) .limit(num_runs) .all() ) @@ -128,20 +128,20 @@ def get_date_time_num_runs_dag_runs_form_data(request, session, dag): dr_state = None for dr in drs: dr_choices.append((dr.execution_date.isoformat(), dr.run_id)) - if dttm == dr.execution_date: + if date_time == dr.execution_date: dr_state = dr.state # Happens if base_date was changed and the selected dag run is not in result if not dr_state and drs: dr = drs[0] - dttm = dr.execution_date + date_time = dr.execution_date dr_state = dr.state return { - 'dttm': dttm, + 'dttm': date_time, 'base_date': base_date, 'num_runs': num_runs, - 'execution_date': dttm.isoformat(), + 'execution_date': date_time.isoformat(), 'dr_choices': dr_choices, 'dr_state': dr_state, } @@ -151,20 +151,21 @@ def get_date_time_num_runs_dag_runs_form_data(request, session, dag): # Error handlers ###################################################################################### -def circles(error): +def circles(error): # pylint: disable=unused-argument """Show Circles on screen for any error in the Webserver""" return render_template( - 'airflow/circles.html', hostname=socket.getfqdn() if conf.getboolean( + 'airflow/circles.html', hostname=socket.getfqdn() if conf.getboolean( # noqa 'webserver', 'EXPOSE_HOSTNAME', fallback=True) else 'redact'), 404 -def show_traceback(error): +def show_traceback(error): # pylint: disable=unused-argument """Show Traceback for a given error""" from airflow.utils import asciiart as ascii_ + return render_template( - 'airflow/traceback.html', + 'airflow/traceback.html', # noqa hostname=socket.getfqdn() if conf.getboolean( 'webserver', 'EXPOSE_HOSTNAME', @@ -199,7 +200,10 @@ class AirflowBaseView(BaseView): # noqa: D101 ) -class Airflow(AirflowBaseView): # noqa: D101 +class Airflow(AirflowBaseView): # noqa: D101 pylint: disable=too-many-public-methods + """ + Main Airflow application. + """ @expose('/health') def health(self): """ @@ -221,7 +225,7 @@ class Airflow(AirflowBaseView): # noqa: D101 latest_scheduler_heartbeat = scheduler_job.latest_heartbeat.isoformat() if scheduler_job.is_alive(): scheduler_status = 'healthy' - except Exception: + except Exception: # noqa pylint: disable=broad-except payload['metadatabase']['status'] = 'unhealthy' payload['scheduler'] = {'status': scheduler_status, @@ -231,7 +235,8 @@ class Airflow(AirflowBaseView): # noqa: D101 @expose('/home') @has_access - def index(self): + def index(self): # pylint: disable=too-many-locals,too-many-statements + """Home view.""" hide_paused_dags_by_default = conf.getboolean('webserver', 'hide_paused_dags_by_default') @@ -287,10 +292,11 @@ class Airflow(AirflowBaseView): # noqa: D101 ~DagModel.is_subdag, DagModel.is_active ) + # pylint: disable=no-member if arg_search_query: dags_query = dags_query.filter( - DagModel.dag_id.ilike('%' + arg_search_query + '%') | - DagModel.owners.ilike('%' + arg_search_query + '%') + DagModel.dag_id.ilike('%' + arg_search_query + '%') | # noqa + DagModel.owners.ilike('%' + arg_search_query + '%') # noqa ) if arg_tags_filter: @@ -298,6 +304,7 @@ class Airflow(AirflowBaseView): # noqa: D101 if 'all_dags' not in filter_dag_ids: dags_query = dags_query.filter(DagModel.dag_id.in_(filter_dag_ids)) + # pylint: enable=no-member all_dags = dags_query active_dags = dags_query.filter(~DagModel.is_paused) @@ -331,9 +338,9 @@ class Airflow(AirflowBaseView): # noqa: D101 import_errors = session.query(errors.ImportError).all() - for ie in import_errors: + for import_error in import_errors: flash( - "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie), + "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=import_error), "dag_import_error") from airflow.plugins_manager import import_errors as plugin_import_errors @@ -375,6 +382,7 @@ class Airflow(AirflowBaseView): # noqa: D101 @has_access @provide_session def dag_stats(self, session=None): + """Dag statistics.""" dr = models.DagRun allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids() @@ -398,7 +406,7 @@ class Airflow(AirflowBaseView): # noqa: D101 return wwwutils.json_response({}) payload = {} - dag_state_stats = dag_state_stats.filter(dr.dag_id.in_(filter_dag_ids)) + dag_state_stats = dag_state_stats.filter(dr.dag_id.in_(filter_dag_ids)) # pylint: disable=no-member data = {} for dag_id, state, count in dag_state_stats: @@ -421,10 +429,7 @@ class Airflow(AirflowBaseView): # noqa: D101 @has_access @provide_session def task_stats(self, session=None): - TI = models.TaskInstance - DagRun = models.DagRun - Dag = models.DagModel - + """Task Statistics""" allowed_dag_ids = set(current_app.appbuilder.sm.get_accessible_dag_ids()) if not allowed_dag_ids: @@ -443,59 +448,78 @@ class Airflow(AirflowBaseView): # noqa: D101 else: filter_dag_ids = allowed_dag_ids - RunningDagRun = ( + # pylint: disable=comparison-with-callable + running_dag_run_query_result = ( session.query(DagRun.dag_id, DagRun.execution_date) - .join(Dag, Dag.dag_id == DagRun.dag_id) - .filter(DagRun.state == State.RUNNING, Dag.is_active) + .join(DagModel, DagModel.dag_id == DagRun.dag_id) + .filter(DagRun.state == State.RUNNING, DagModel.is_active) ) + # pylint: enable=comparison-with-callable + # pylint: disable=no-member if selected_dag_ids: - RunningDagRun = RunningDagRun.filter(DagRun.dag_id.in_(filter_dag_ids)) - RunningDagRun = RunningDagRun.subquery('running_dag_run') + running_dag_run_query_result = \ + running_dag_run_query_result.filter(DagRun.dag_id.in_(filter_dag_ids)) + # pylint: enable=no-member + running_dag_run_query_result = running_dag_run_query_result.subquery('running_dag_run') + + # pylint: disable=no-member # Select all task_instances from active dag_runs. - RunningTI = ( - session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) - .join(RunningDagRun, - and_(RunningDagRun.c.dag_id == TI.dag_id, - RunningDagRun.c.execution_date == TI.execution_date)) + running_task_instance_query_result = ( + session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')) + .join(running_dag_run_query_result, + and_(running_dag_run_query_result.c.dag_id == TaskInstance.dag_id, + running_dag_run_query_result.c.execution_date == TaskInstance.execution_date)) ) if selected_dag_ids: - RunningTI = RunningTI.filter(TI.dag_id.in_(filter_dag_ids)) + running_task_instance_query_result = \ + running_task_instance_query_result.filter(TaskInstance.dag_id.in_(filter_dag_ids)) + # pylint: enable=no-member if conf.getboolean('webserver', 'SHOW_RECENT_STATS_FOR_COMPLETED_RUNS', fallback=True): - LastDagRun = ( + # pylint: disable=comparison-with-callable + last_dag_run = ( session.query( DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date') ) - .join(Dag, Dag.dag_id == DagRun.dag_id) - .filter(DagRun.state != State.RUNNING, Dag.is_active) + .join(DagModel, DagModel.dag_id == DagRun.dag_id) + .filter(DagRun.state != State.RUNNING, DagModel.is_active) .group_by(DagRun.dag_id) ) - + # pylint: enable=comparison-with-callable + # pylint: disable=no-member if selected_dag_ids: - LastDagRun = LastDagRun.filter(DagRun.dag_id.in_(filter_dag_ids)) - LastDagRun = LastDagRun.subquery('last_dag_run') + last_dag_run = last_dag_run.filter(DagRun.dag_id.in_(filter_dag_ids)) + last_dag_run = last_dag_run.subquery('last_dag_run') + # pylint: enable=no-member # Select all task_instances from active dag_runs. # If no dag_run is active, return task instances from most recent dag_run. - LastTI = ( - session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) - .join(LastDagRun, - and_(LastDagRun.c.dag_id == TI.dag_id, - LastDagRun.c.execution_date == TI.execution_date)) + last_task_instance_query_result = ( + session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')) + .join(last_dag_run, + and_(last_dag_run.c.dag_id == TaskInstance.dag_id, + last_dag_run.c.execution_date == TaskInstance.execution_date)) ) + # pylint: disable=no-member if selected_dag_ids: - LastTI = LastTI.filter(TI.dag_id.in_(filter_dag_ids)) + last_task_instance_query_result = \ + last_task_instance_query_result.filter(TaskInstance.dag_id.in_(filter_dag_ids)) + # pylint: enable=no-member - FinalTI = union_all(LastTI, RunningTI).alias('final_ti') + final_task_instance_query_result = union_all( + last_task_instance_query_result, + running_task_instance_query_result).alias('final_ti') else: - FinalTI = RunningTI.subquery('final_ti') + final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti') qry = ( - session.query(FinalTI.c.dag_id, FinalTI.c.state, sqla.func.count()) - .group_by(FinalTI.c.dag_id, FinalTI.c.state) + session.query(final_task_instance_query_result.c.dag_id, + final_task_instance_query_result.c.state, sqla.func.count()) + .group_by(final_task_instance_query_result.c.dag_id, + final_task_instance_query_result.c.state) ) data = {} @@ -519,8 +543,7 @@ class Airflow(AirflowBaseView): # noqa: D101 @has_access @provide_session def last_dagruns(self, session=None): - DagRun = models.DagRun - + """Last DAG runs""" allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids() if 'all_dags' in allowed_dag_ids: @@ -544,7 +567,7 @@ class Airflow(AirflowBaseView): # noqa: D101 ).group_by(DagRun.dag_id) # Filter to only ask for accessible and selected dags - query = query.filter(DagRun.dag_id.in_(filter_dag_ids)) + query = query.filter(DagRun.dag_id.in_(filter_dag_ids)) # pylint: enable=no-member resp = { r.dag_id.replace('.', '__dot__'): { @@ -559,21 +582,24 @@ class Airflow(AirflowBaseView): # noqa: D101 @has_access @provide_session def code(self, session=None): + """Dag Code.""" all_errors = "" + dag_orm = None + dag_id = None try: dag_id = request.args.get('dag_id') dag_orm = DagModel.get_dagmodel(dag_id, session=session) code = DagCode.get_code_by_fileloc(dag_orm.fileloc) html_code = Markup(highlight( - code, lexers.PythonLexer(), HtmlFormatter(linenos=True))) + code, lexers.PythonLexer(), HtmlFormatter(linenos=True))) # pylint: disable=no-member - except Exception as e: + except Exception as e: # pylint: disable=broad-except all_errors += ( "Exception encountered during " + "dag_id retrieval/dag retrieval fallback/code highlighting:\n\n{}\n".format(e) ) - html_code = Markup('
Failed to load file.
Details: {}
').format( + html_code = Markup('Failed to load file.
Details: {}
').format( # noqa escape(all_errors)) return self.render_template( @@ -587,16 +613,16 @@ class Airflow(AirflowBaseView): # noqa: D101 @has_access @provide_session def dag_details(self, session=None): + """Get Dag details.""" dag_id = request.args.get('dag_id') dag = current_app.dag_bag.get_dag(dag_id) title = "DAG details" root = request.args.get('root', '') - TI = models.TaskInstance states = ( - session.query(TI.state, sqla.func.count(TI.dag_id)) - .filter(TI.dag_id == dag_id) - .group_by(TI.state) + session.query(TaskInstance.state, sqla.func.count(TaskInstance.dag_id)) + .filter(TaskInstance.dag_id == dag_id) + .group_by(TaskInstance.state) .all() ) @@ -615,6 +641,7 @@ class Airflow(AirflowBaseView): # noqa: D101 @has_access @action_logging def rendered(self): + """Get rendered Dag.""" dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') @@ -629,12 +656,12 @@ class Airflow(AirflowBaseView): # noqa: D101 ti = models.TaskInstance(task=task, execution_date=dttm) try: ti.get_rendered_template_fields() - except AirflowException as e: + except AirflowException as e: # pylint: disable=broad-except msg = "Error rendering template: " + escape(e) - if e.__cause__: + if e.__cause__: # pylint: disable=using-constant-test msg += Markup("{}
").format(pformat(content))
+ html_dict[template_field] = \
+ Markup("{}
").format(pformat(content)) # noqa
return self.render_template(
'airflow/ti_code.html',
@@ -661,6 +689,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def get_logs_with_metadata(self, session=None):
+ """Retrieve logs including metadata."""
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
@@ -745,6 +774,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def log(self, session=None):
+ """Retrieve log."""
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
@@ -778,6 +808,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def redirect_to_external_log(self, session=None):
+ """Redirects to external log."""
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
@@ -807,8 +838,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def task(self):
- TI = models.TaskInstance
-
+ """Retrieve task."""
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
# Carrying execution_date through, even though it's irrelevant for
@@ -827,23 +857,25 @@ class Airflow(AirflowBaseView): # noqa: D101
return redirect(url_for('Airflow.index'))
task = copy.copy(dag.get_task(task_id))
task.resolve_template_files()
- ti = TI(task=task, execution_date=dttm)
+ ti = TaskInstance(task=task, execution_date=dttm)
ti.refresh_from_db()
ti_attrs = []
for attr_name in dir(ti):
if not attr_name.startswith('_'):
attr = getattr(ti, attr_name)
- if type(attr) != type(self.task): # noqa
+ if type(attr) != type(self.task): # noqa pylint: disable=unidiomatic-typecheck
ti_attrs.append((attr_name, str(attr)))
task_attrs = []
for attr_name in dir(task):
if not attr_name.startswith('_'):
attr = getattr(task, attr_name)
+ # pylint: disable=unidiomatic-typecheck
if type(attr) != type(self.task) and \
attr_name not in wwwutils.get_attr_renderer(): # noqa
task_attrs.append((attr_name, str(attr)))
+ # pylint: enable=unidiomatic-typecheck
# Color coding the special attributes that are code
special_attrs_rendered = {}
@@ -888,6 +920,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def xcom(self, session=None):
+ """Retrieve XCOM."""
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
# Carrying execution_date through, even though it's irrelevant for
@@ -932,6 +965,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def run(self):
+ """Retrieves Run."""
dag_id = request.form.get('dag_id')
task_id = request.form.get('task_id')
origin = get_safe_url(request.form.get('origin'))
@@ -949,13 +983,13 @@ class Airflow(AirflowBaseView): # noqa: D101
valid_kubernetes_config = False
try:
- from airflow.executors.celery_executor import CeleryExecutor
+ from airflow.executors.celery_executor import CeleryExecutor # noqa
valid_celery_config = isinstance(executor, CeleryExecutor)
except ImportError:
pass
try:
- from airflow.executors.kubernetes_executor import KubernetesExecutor
+ from airflow.executors.kubernetes_executor import KubernetesExecutor # noqa
valid_kubernetes_config = isinstance(executor, KubernetesExecutor)
except ImportError:
pass
@@ -999,6 +1033,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def delete(self):
+ """Deletes DAG."""
from airflow.api.common.experimental import delete_dag
from airflow.exceptions import DagFileExists, DagNotFound
@@ -1028,7 +1063,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def trigger(self, session=None):
-
+ """Triggers DAG Run."""
dag_id = request.values.get('dag_id')
origin = get_safe_url(request.values.get('origin'))
@@ -1049,21 +1084,21 @@ class Airflow(AirflowBaseView): # noqa: D101
dr = DagRun.find(dag_id=dag_id, execution_date=execution_date, run_type=DagRunType.MANUAL)
if dr:
- flash(f"This run_id {dr.run_id} already exists")
+ flash(f"This run_id {dr.run_id} already exists") # noqa
return redirect(origin)
run_conf = {}
- conf = request.values.get('conf')
- if conf:
+ request_conf = request.values.get('conf')
+ if request_conf:
try:
- run_conf = json.loads(conf)
+ run_conf = json.loads(request_conf)
except json.decoder.JSONDecodeError:
flash("Invalid JSON configuration", "error")
return self.render_template(
'airflow/trigger.html',
dag_id=dag_id,
origin=origin,
- conf=conf
+ conf=request_conf
)
dag = current_app.dag_bag.get_dag(dag_id)
@@ -1126,6 +1161,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def clear(self):
+ """Clears the Dag."""
dag_id = request.form.get('dag_id')
task_id = request.form.get('task_id')
origin = get_safe_url(request.form.get('origin'))
@@ -1157,6 +1193,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def dagrun_clear(self):
+ """Clears the DagRun"""
dag_id = request.form.get('dag_id')
origin = get_safe_url(request.form.get('origin'))
execution_date = request.form.get('execution_date')
@@ -1174,6 +1211,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@provide_session
def blocked(self, session=None):
+ """Mark Dag Blocked."""
allowed_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids()
if 'all_dags' in allowed_dag_ids:
@@ -1192,14 +1230,14 @@ class Airflow(AirflowBaseView): # noqa: D101
if not filter_dag_ids:
return wwwutils.json_response([])
- DR = models.DagRun
-
+ # pylint: disable=comparison-with-callable
dags = (
- session.query(DR.dag_id, sqla.func.count(DR.id))
- .filter(DR.state == State.RUNNING)
- .filter(DR.dag_id.in_(filter_dag_ids))
- .group_by(DR.dag_id)
+ session.query(DagRun.dag_id, sqla.func.count(DagRun.id))
+ .filter(DagRun.state == State.RUNNING)
+ .filter(DagRun.dag_id.in_(filter_dag_ids))
+ .group_by(DagRun.dag_id)
)
+ # pylint: enable=comparison-with-callable
payload = []
for dag_id, active_dag_runs in dags:
@@ -1277,6 +1315,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def dagrun_failed(self):
+ """Mark DagRun failed."""
dag_id = request.form.get('dag_id')
execution_date = request.form.get('execution_date')
confirmed = request.form.get('confirmed') == 'true'
@@ -1289,6 +1328,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def dagrun_success(self):
+ """Mark DagRun success"""
dag_id = request.form.get('dag_id')
execution_date = request.form.get('execution_date')
confirmed = request.form.get('confirmed') == 'true'
@@ -1296,7 +1336,8 @@ class Airflow(AirflowBaseView): # noqa: D101
return self._mark_dagrun_state_as_success(dag_id, execution_date,
confirmed, origin)
- def _mark_task_instance_state(self, dag_id, task_id, origin, execution_date,
+ def _mark_task_instance_state(self, # pylint: disable=too-many-arguments
+ dag_id, task_id, origin, execution_date,
confirmed, upstream, downstream,
future, past, state):
dag = current_app.dag_bag.get_dag(dag_id)
@@ -1340,6 +1381,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def failed(self):
+ """Mark task as failed."""
dag_id = request.form.get('dag_id')
task_id = request.form.get('task_id')
origin = get_safe_url(request.form.get('origin'))
@@ -1360,6 +1402,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def success(self):
+ """Mark task as success."""
dag_id = request.form.get('dag_id')
task_id = request.form.get('task_id')
origin = get_safe_url(request.form.get('origin'))
@@ -1380,7 +1423,8 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@gzipped
@action_logging
- def tree(self):
+ def tree(self): # pylint: disable=too-many-locals
+ """Get Dag as tree."""
dag_id = request.args.get('dag_id')
blur = conf.getboolean('webserver', 'demo_mode')
dag = current_app.dag_bag.get_dag(dag_id)
@@ -1437,27 +1481,27 @@ class Airflow(AirflowBaseView): # noqa: D101
node_count = 0
node_limit = 5000 / max(1, len(dag.leaves))
- def encode_ti(ti: Optional[models.TaskInstance]) -> Optional[List]:
- if not ti:
+ def encode_ti(task_instance: Optional[models.TaskInstance]) -> Optional[List]:
+ if not task_instance:
return None
# NOTE: order of entry is important here because client JS relies on it for
# tree node reconstruction. Remember to change JS code in tree.html
# whenever order is altered.
- data = [
- ti.state,
- ti.try_number,
+ task_instance_data = [
+ task_instance.state,
+ task_instance.try_number,
None, # start_ts
None, # duration
]
- if ti.start_date:
+ if task_instance.start_date:
# round to seconds to reduce payload size
- data[2] = int(ti.start_date.timestamp())
- if ti.duration is not None:
- data[3] = int(ti.duration)
+ task_instance_data[2] = int(task_instance.start_date.timestamp())
+ if task_instance.duration is not None:
+ task_instance_data[3] = int(task_instance.duration)
- return data
+ return task_instance_data
def recurse_nodes(task, visited):
nonlocal node_count
@@ -1547,6 +1591,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def graph(self, session=None):
+ """Get DAG as Graph."""
dag_id = request.args.get('dag_id')
blur = conf.getboolean('webserver', 'demo_mode')
dag = current_app.dag_bag.get_dag(dag_id)
@@ -1565,36 +1610,37 @@ class Airflow(AirflowBaseView): # noqa: D101
nodes = []
edges = []
- for task in dag.tasks:
+ for dag_task in dag.tasks:
nodes.append({
- 'id': task.task_id,
+ 'id': dag_task.task_id,
'value': {
- 'label': task.task_id,
- 'labelStyle': "fill:{0};".format(task.ui_fgcolor),
- 'style': "fill:{0};".format(task.ui_color),
+ 'label': dag_task.task_id,
+ 'labelStyle': "fill:{0};".format(dag_task.ui_fgcolor),
+ 'style': "fill:{0};".format(dag_task.ui_color),
'rx': 5,
'ry': 5,
}
})
def get_downstream(task):
- for t in task.downstream_list:
+ for downstream_task in task.downstream_list:
edge = {
- 'source_id': task.task_id,
- 'target_id': t.task_id,
+ 'source_id': downstream_task.task_id,
+ 'target_id': downstream_task.task_id,
}
if edge not in edges:
edges.append(edge)
- get_downstream(t)
+ get_downstream(downstream_task)
- for t in dag.roots:
- get_downstream(t)
+ for dag_task in dag.roots:
+ get_downstream(dag_task)
dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
dt_nr_dr_data['arrange'] = arrange
dttm = dt_nr_dr_data['dttm']
class GraphForm(DateTimeWithNumRunsWithDagRunsForm):
+ """Graph Form class."""
arrange = SelectField("Layout", choices=(
('LR', "Left->Right"),
('RL', "Right->Left"),
@@ -1651,7 +1697,8 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
@provide_session
- def duration(self, session=None):
+ def duration(self, session=None): # pylint: disable=too-many-locals
+ """Get Dag as duration graph."""
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
@@ -1684,40 +1731,40 @@ class Airflow(AirflowBaseView): # noqa: D101
cum_chart = nvd3.lineChart(
name="cumLineChart", x_is_date=True, height=chart_height, width="1200")
- y = defaultdict(list)
- x = defaultdict(list)
- cum_y = defaultdict(list)
+ y_points = defaultdict(list)
+ x_points = defaultdict(list)
+ cumulative_y = defaultdict(list)
- tis = dag.get_task_instances(start_date=min_date, end_date=base_date)
- TF = TaskFail
+ task_instances = dag.get_task_instances(start_date=min_date, end_date=base_date)
ti_fails = (
- session.query(TF)
- .filter(TF.dag_id == dag.dag_id,
- TF.execution_date >= min_date,
- TF.execution_date <= base_date,
- TF.task_id.in_([t.task_id for t in dag.tasks]))
+ session.query(TaskFail)
+ .filter(TaskFail.dag_id == dag.dag_id,
+ TaskFail.execution_date >= min_date,
+ TaskFail.execution_date <= base_date,
+ TaskFail.task_id.in_([t.task_id for t in dag.tasks]))
.all()
)
fails_totals = defaultdict(int)
- for tf in ti_fails:
- dict_key = (tf.dag_id, tf.task_id, tf.execution_date)
- if tf.duration:
- fails_totals[dict_key] += tf.duration
+ for failed_task_instance in ti_fails:
+ dict_key = (failed_task_instance.dag_id, failed_task_instance.task_id,
+ failed_task_instance.execution_date)
+ if failed_task_instance.duration:
+ fails_totals[dict_key] += failed_task_instance.duration
- for ti in tis:
- if ti.duration:
- dttm = wwwutils.epoch(ti.execution_date)
- x[ti.task_id].append(dttm)
- y[ti.task_id].append(float(ti.duration))
- fails_dict_key = (ti.dag_id, ti.task_id, ti.execution_date)
+ for task_instance in task_instances:
+ if task_instance.duration:
+ date_time = wwwutils.epoch(task_instance.execution_date)
+ x_points[task_instance.task_id].append(date_time)
+ y_points[task_instance.task_id].append(float(task_instance.duration))
+ fails_dict_key = (task_instance.dag_id, task_instance.task_id, task_instance.execution_date)
fails_total = fails_totals[fails_dict_key]
- cum_y[ti.task_id].append(float(ti.duration + fails_total))
+ cumulative_y[task_instance.task_id].append(float(task_instance.duration + fails_total))
# determine the most relevant time unit for the set of task instance
# durations for the DAG
- y_unit = infer_time_unit([d for t in y.values() for d in t])
- cum_y_unit = infer_time_unit([d for t in cum_y.values() for d in t])
+ y_unit = infer_time_unit([d for t in y_points.values() for d in t])
+ cum_y_unit = infer_time_unit([d for t in cumulative_y.values() for d in t])
# update the y Axis on both charts to have the correct time units
chart.create_y_axis('yAxis', format='.02f', custom_format=False,
label='Duration ({})'.format(y_unit))
@@ -1727,15 +1774,15 @@ class Airflow(AirflowBaseView): # noqa: D101
cum_chart.axislist['yAxis']['axisLabelDistance'] = '-15'
for task in dag.tasks:
- if x[task.task_id]:
- chart.add_serie(name=task.task_id, x=x[task.task_id],
- y=scale_time_units(y[task.task_id], y_unit))
- cum_chart.add_serie(name=task.task_id, x=x[task.task_id],
- y=scale_time_units(cum_y[task.task_id],
+ if x_points[task.task_id]:
+ chart.add_serie(name=task.task_id, x=x_points[task.task_id],
+ y=scale_time_units(y_points[task.task_id], y_unit))
+ cum_chart.add_serie(name=task.task_id, x=x_points[task.task_id],
+ y=scale_time_units(cumulative_y[task.task_id],
cum_y_unit))
- dates = sorted(list({ti.execution_date for ti in tis}))
- max_date = max([ti.execution_date for ti in tis]) if dates else None
+ dates = sorted(list({ti.execution_date for ti in task_instances}))
+ max_date = max([ti.execution_date for ti in task_instances]) if dates else None
session.commit()
@@ -1764,6 +1811,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def tries(self, session=None):
+ """Shows all tries."""
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
@@ -1792,15 +1840,15 @@ class Airflow(AirflowBaseView): # noqa: D101
width="1200")
for task in dag.tasks:
- y = []
- x = []
+ y_points = []
+ x_points = []
for ti in task.get_task_instances(start_date=min_date, end_date=base_date):
dttm = wwwutils.epoch(ti.execution_date)
- x.append(dttm)
+ x_points.append(dttm)
# y value should reflect completed tries to have a 0 baseline.
- y.append(ti.prev_attempted_tries)
- if x:
- chart.add_serie(name=task.task_id, x=x, y=y)
+ y_points.append(ti.prev_attempted_tries)
+ if x_points:
+ chart.add_serie(name=task.task_id, x=x_points, y=y_points)
tis = dag.get_task_instances(start_date=min_date, end_date=base_date)
tries = sorted(list({ti.try_number for ti in tis}))
@@ -1829,6 +1877,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def landing_times(self, session=None):
+ """Shows landing times."""
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
@@ -1854,12 +1903,12 @@ class Airflow(AirflowBaseView): # noqa: D101
chart_height = wwwutils.get_chart_height(dag)
chart = nvd3.lineChart(
name="lineChart", x_is_date=True, height=chart_height, width="1200")
- y = {}
- x = {}
+ y_points = {}
+ x_points = {}
for task in dag.tasks:
task_id = task.task_id
- y[task_id] = []
- x[task_id] = []
+ y_points[task_id] = []
+ x_points[task_id] = []
for ti in task.get_task_instances(start_date=min_date, end_date=base_date):
ts = ti.execution_date
if dag.schedule_interval and dag.following_schedule(ts):
@@ -1867,20 +1916,20 @@ class Airflow(AirflowBaseView): # noqa: D101
if ti.end_date:
dttm = wwwutils.epoch(ti.execution_date)
secs = (ti.end_date - ts).total_seconds()
- x[task_id].append(dttm)
- y[task_id].append(secs)
+ x_points[task_id].append(dttm)
+ y_points[task_id].append(secs)
# determine the most relevant time unit for the set of landing times
# for the DAG
- y_unit = infer_time_unit([d for t in y.values() for d in t])
+ y_unit = infer_time_unit([d for t in y_points.values() for d in t])
# update the y Axis to have the correct time units
chart.create_y_axis('yAxis', format='.02f', custom_format=False,
label='Landing Time ({})'.format(y_unit))
chart.axislist['yAxis']['axisLabelDistance'] = '-15'
for task in dag.tasks:
- if x[task.task_id]:
- chart.add_serie(name=task.task_id, x=x[task.task_id],
- y=scale_time_units(y[task.task_id], y_unit))
+ if x_points[task.task_id]:
+ chart.add_serie(name=task.task_id, x=x_points[task.task_id],
+ y=scale_time_units(y_points[task.task_id], y_unit))
tis = dag.get_task_instances(start_date=min_date, end_date=base_date)
dates = sorted(list({ti.execution_date for ti in tis}))
@@ -1907,8 +1956,9 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def paused(self):
+ """Toggle paused."""
dag_id = request.args.get('dag_id')
- is_paused = True if request.args.get('is_paused') == 'false' else False
+ is_paused = request.args.get('is_paused') == 'false'
models.DagModel.get_dagmodel(dag_id).set_is_paused(
is_paused=is_paused)
return "OK"
@@ -1919,7 +1969,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def refresh(self, session=None):
- DagModel = models.DagModel
+ """Refresh DAG."""
dag_id = request.values.get('dag_id')
orm_dag = session.query(
DagModel).filter(DagModel.dag_id == dag_id).first()
@@ -1940,6 +1990,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def refresh_all(self):
+ """Refresh everything"""
if settings.STORE_SERIALIZED_DAGS:
current_app.dag_bag.collect_dags_from_db()
else:
@@ -1957,6 +2008,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@action_logging
@provide_session
def gantt(self, session=None):
+ """Show GANTT chart."""
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
demo_mode = conf.getboolean('webserver', 'demo_mode')
@@ -1978,13 +2030,12 @@ class Airflow(AirflowBaseView): # noqa: D101
ti for ti in dag.get_task_instances(dttm, dttm)
if ti.start_date and ti.state]
tis = sorted(tis, key=lambda ti: ti.start_date)
- TF = TaskFail
ti_fails = list(itertools.chain(*[(
session
- .query(TF)
- .filter(TF.dag_id == ti.dag_id,
- TF.task_id == ti.task_id,
- TF.execution_date == ti.execution_date)
+ .query(TaskFail)
+ .filter(TaskFail.dag_id == ti.dag_id,
+ TaskFail.task_id == ti.task_id,
+ TaskFail.execution_date == ti.execution_date)
.all()
) for ti in tis]))
@@ -1999,30 +2050,31 @@ class Airflow(AirflowBaseView): # noqa: D101
# https://issues.apache.org/jira/browse/AIRFLOW-2143
try_count = ti.prev_attempted_tries
gantt_bar_items.append((ti.task_id, ti.start_date, end_date, ti.state, try_count))
- d = alchemy_to_dict(ti)
- d['extraLinks'] = dag.get_task(ti.task_id).extra_links
- tasks.append(d)
+ task_dict = alchemy_to_dict(ti)
+ task_dict['extraLinks'] = dag.get_task(ti.task_id).extra_links
+ tasks.append(task_dict)
tf_count = 0
try_count = 1
prev_task_id = ""
- for tf in ti_fails:
- end_date = tf.end_date or timezone.utcnow()
- start_date = tf.start_date or end_date
- if tf_count != 0 and tf.task_id == prev_task_id:
+ for failed_task_instance in ti_fails:
+ end_date = failed_task_instance.end_date or timezone.utcnow()
+ start_date = failed_task_instance.start_date or end_date
+ if tf_count != 0 and failed_task_instance.task_id == prev_task_id:
try_count = try_count + 1
else:
try_count = 1
- prev_task_id = tf.task_id
- gantt_bar_items.append((tf.task_id, start_date, end_date, State.FAILED, try_count))
+ prev_task_id = failed_task_instance.task_id
+ gantt_bar_items.append((failed_task_instance.task_id, start_date, end_date, State.FAILED,
+ try_count))
tf_count = tf_count + 1
- task = dag.get_task(tf.task_id)
- d = alchemy_to_dict(tf)
- d['state'] = State.FAILED
- d['operator'] = task.task_type
- d['try_number'] = try_count
- d['extraLinks'] = task.extra_links
- tasks.append(d)
+ task = dag.get_task(failed_task_instance.task_id)
+ task_dict = alchemy_to_dict(failed_task_instance)
+ task_dict['state'] = State.FAILED
+ task_dict['operator'] = task.task_type
+ task_dict['try_number'] = try_count
+ task_dict['extraLinks'] = task.extra_links
+ tasks.append(task_dict)
data = {
'taskNames': [ti.task_id for ti in tis],
@@ -2107,6 +2159,7 @@ class Airflow(AirflowBaseView): # noqa: D101
@has_access
@action_logging
def task_instances(self):
+ """Shows task instances."""
dag_id = request.args.get('dag_id')
dag = current_app.dag_bag.get_dag(dag_id)
@@ -2130,9 +2183,10 @@ class VersionView(AirflowBaseView):
@expose('/version')
@has_access
def version(self):
+ """Shows Airflow version."""
try:
airflow_version = airflow.__version__
- except Exception as e:
+ except Exception as e: # pylint: disable=broad-except
airflow_version = None
logging.error(e)
@@ -2153,6 +2207,7 @@ class ConfigurationView(AirflowBaseView):
@expose('/configuration')
@has_access
def conf(self):
+ """Shows configuration."""
raw = request.args.get('raw') == "true"
title = "Airflow Configuration"
subtitle = AIRFLOW_CONFIG
@@ -2177,7 +2232,7 @@ class ConfigurationView(AirflowBaseView):
else:
code_html = Markup(highlight(
config,
- lexers.IniLexer(), # Lexer call
+ lexers.IniLexer(), # Lexer call pylint: disable=no-member
HtmlFormatter(noclasses=True))
)
return self.render_template(
@@ -2193,6 +2248,7 @@ class RedocView(AirflowBaseView):
@expose('/redoc')
def redoc(self):
+ """Redoc API documentation."""
openapi_spec_url = url_for("/api/v1./api/v1_openapi_yaml")
return self.render_template('airflow/redoc.html', openapi_spec_url=openapi_spec_url)
@@ -2203,7 +2259,7 @@ class RedocView(AirflowBaseView):
class DagFilter(BaseFilter):
"""Filter using DagIDs"""
- def apply(self, query, func): # noqa
+ def apply(self, query, func): # noqa pylint: disable=redefined-outer-name,unused-argument
if current_app.appbuilder.sm.has_all_dags_access():
return query
filter_dag_ids = current_app.appbuilder.sm.get_accessible_dag_ids()
@@ -2211,6 +2267,7 @@ class DagFilter(BaseFilter):
class AirflowModelView(ModelView): # noqa: D101
+ """Airflow Mode View."""
list_widget = AirflowModelListWidget
page_size = PAGE_SIZE
@@ -2221,7 +2278,7 @@ class SlaMissModelView(AirflowModelView):
"""View to show SlaMiss table"""
route_base = '/slamiss'
- datamodel = AirflowModelView.CustomSQLAInterface(SlaMiss)
+ datamodel = AirflowModelView.CustomSQLAInterface(SlaMiss) # noqa # type: ignore
base_permissions = ['can_list']
@@ -2264,15 +2321,18 @@ class XComModelView(AirflowModelView):
@action('muldelete', 'Delete', "Are you sure you want to delete selected records?",
single=False)
def action_muldelete(self, items):
+ """Multiple delete action."""
self.datamodel.delete_all(items)
self.update_redirect()
return redirect(self.get_redirect())
def pre_add(self, item):
+ """Pre add hook."""
item.execution_date = timezone.make_aware(item.execution_date)
item.value = XCom.serialize_value(item.value)
def pre_update(self, item):
+ """Pre update hook."""
item.execution_date = timezone.make_aware(item.execution_date)
item.value = XCom.serialize_value(item.value)
@@ -2281,7 +2341,7 @@ class ConnectionModelView(AirflowModelView):
"""View to show records from Connections table"""
route_base = '/connection'
- datamodel = AirflowModelView.CustomSQLAInterface(Connection)
+ datamodel = AirflowModelView.CustomSQLAInterface(Connection) # noqa # type: ignore
base_permissions = ['can_add', 'can_list', 'can_edit', 'can_delete']
@@ -2316,11 +2376,13 @@ class ConnectionModelView(AirflowModelView):
single=False)
@has_dag_access(can_dag_edit=True)
def action_muldelete(self, items):
+ """Multiple delete."""
self.datamodel.delete_all(items)
self.update_redirect()
return redirect(self.get_redirect())
def process_form(self, form, is_created):
+ """Process form data."""
formdata = form.data
if formdata['conn_type'] in ['jdbc', 'google_cloud_platform', 'grpc', 'yandexcloud', 'kubernetes']:
extra = {
@@ -2329,18 +2391,18 @@ class ConnectionModelView(AirflowModelView):
form.extra.data = json.dumps(extra)
def prefill_form(self, form, pk):
+ """Prefill the form."""
try:
- d = json.loads(form.data.get('extra', '{}'))
+ extra_dictionary = json.loads(form.data.get('extra', '{}'))
except JSONDecodeError:
- d = {}
+ extra_dictionary = {}
- if not hasattr(d, 'get'):
- logging.warning('extra field for {} is not iterable'.format(
- form.data.get('conn_id', '