[AIRFLOW-2429] Make Airflow flake8 compliant

Closes #3342 from feng-tao/airflow-2429
This commit is contained in:
Tao feng 2018-05-19 00:29:59 +02:00 коммит произвёл Fokko Driesprong
Родитель 76b68b82d7
Коммит 06aec8ea6b
3 изменённых файлов: 88 добавлений и 87 удалений

Просмотреть файл

@ -22,31 +22,27 @@ from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from builtins import str
from collections import OrderedDict
import copy
import errno
import os
import subprocess
import warnings
import shlex
import sys
from future import standard_library
import os
import shlex
import six
from six import iteritems
import subprocess
import sys
import warnings
from backports.configparser import ConfigParser
from zope.deprecation import deprecated as _deprecated
from airflow.exceptions import AirflowConfigException
from airflow.utils.log.logging_mixin import LoggingMixin
standard_library.install_aliases()
from builtins import str
from collections import OrderedDict
from airflow.exceptions import AirflowConfigException
log = LoggingMixin().log
# show Airflow's deprecation warnings
@ -323,8 +319,8 @@ class AirflowConfigParser(ConfigParser):
opt = None
if opt:
if (
not display_sensitive
and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
not display_sensitive and
ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'):
opt = '< hidden >'
if display_source:
opt = (opt, 'env var')

Просмотреть файл

@ -28,6 +28,7 @@ from builtins import str
from builtins import object, bytes
import copy
from collections import namedtuple, defaultdict
import cryptography
from datetime import timedelta
import dill
@ -59,7 +60,6 @@ from sqlalchemy import (
Index, Float, LargeBinary)
from sqlalchemy import func, or_, and_, true as sqltrue
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import reconstructor, relationship, synonym
from sqlalchemy_utc import UtcDateTime
@ -77,7 +77,6 @@ from airflow.lineage import apply_lineage, prepare_lineage
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
from airflow.utils import timezone
@ -86,7 +85,7 @@ from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.email import send_email
from airflow.utils.helpers import (
as_tuple, is_container, is_in, validate_key, pprinttable)
as_tuple, is_container, validate_key, pprinttable)
from airflow.utils.operator_resources import Resources
from airflow.utils.state import State
from airflow.utils.timeout import timeout
@ -103,6 +102,7 @@ XCOM_RETURN_KEY = 'return_value'
Stats = settings.Stats
def get_fernet():
"""
Deferred load of Fernet key.
@ -115,7 +115,7 @@ def get_fernet():
"""
try:
from cryptography.fernet import Fernet
except:
except ImportError:
raise AirflowException('Failed to import Fernet, it may not be installed')
try:
return Fernet(configuration.conf.get('core', 'FERNET_KEY').encode('utf-8'))
@ -126,6 +126,7 @@ def get_fernet():
# Used by DAG context_managers
_CONTEXT_MANAGER_DAG = None
def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
"""
Clears a set of task instances, but makes sure the running ones
@ -247,7 +248,7 @@ class DagBag(BaseDagBag, LoggingMixin):
filepath=orm_dag.fileloc, only_if_updated=False)
# If the source file no longer exports `dag_id`, delete it from self.dags
if found_dags and dag_id in [dag.dag_id for dag in found_dags]:
if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
return self.dags[dag_id]
elif dag_id in self.dags:
del self.dags[dag_id]
@ -354,7 +355,6 @@ class DagBag(BaseDagBag, LoggingMixin):
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags
@ -429,7 +429,6 @@ class DagBag(BaseDagBag, LoggingMixin):
del self.dags[subdag.dag_id]
raise cycle_exception
def collect_dags(
self,
dag_folder=None,
@ -644,7 +643,7 @@ class Connection(Base, LoggingMixin):
if self._password and self.is_encrypted:
try:
fernet = get_fernet()
except:
except AirflowException:
raise AirflowException(
"Can't decrypt encrypted password for login={}, \
FERNET_KEY configuration is missing".format(self.login))
@ -660,7 +659,7 @@ class Connection(Base, LoggingMixin):
self.is_encrypted = True
except AirflowException:
self.log.exception("Failed to load fernet while encrypting value, "
"using non-encrypted value.")
"using non-encrypted value.")
self._password = value
self.is_encrypted = False
@ -673,7 +672,7 @@ class Connection(Base, LoggingMixin):
if self._extra and self.is_extra_encrypted:
try:
fernet = get_fernet()
except:
except AirflowException:
raise AirflowException(
"Can't decrypt `extra` params for login={},\
FERNET_KEY configuration is missing".format(self.login))
@ -689,7 +688,7 @@ class Connection(Base, LoggingMixin):
self.is_extra_encrypted = True
except AirflowException:
self.log.exception("Failed to load fernet while encrypting value, "
"using non-encrypted value.")
"using non-encrypted value.")
self._extra = value
self.is_extra_encrypted = False
else:
@ -757,7 +756,7 @@ class Connection(Base, LoggingMixin):
elif self.conn_type == 'cassandra':
from airflow.contrib.hooks.cassandra_hook import CassandraHook
return CassandraHook(cassandra_conn_id=self.conn_id)
except:
except Exception:
pass
def __repr__(self):
@ -1330,8 +1329,11 @@ class TaskInstance(Base, LoggingMixin):
if self.task.retry_exponential_backoff:
min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2)))
# deterministic per task instance
hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id,
self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16)
hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id,
self.task_id,
self.execution_date,
self.try_number)
.encode('utf-8')).hexdigest(), 16)
# between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number)
modded_hash = min_backoff + hash % min_backoff
# timedelta has a maximum representable value. The exponentiation
@ -1453,7 +1455,7 @@ class TaskInstance(Base, LoggingMixin):
session.commit()
return False
#TODO: Logging needs cleanup, not clear what is being printed
# TODO: Logging needs cleanup, not clear what is being printed
hr = "\n" + ("-" * 80) + "\n" # Line break
# For reporting purposes, we report based on 1-indexed,
@ -1518,7 +1520,8 @@ class TaskInstance(Base, LoggingMixin):
settings.engine.dispose()
if verbose:
if mark_success:
msg = "Marking success for {} on {}".format(self.task, self.execution_date)
msg = "Marking success for {} on {}".format(self.task,
self.execution_date)
self.log.info(msg)
else:
msg = "Executing {} on {}".format(self.task, self.execution_date)
@ -1661,23 +1664,23 @@ class TaskInstance(Base, LoggingMixin):
pool=None,
session=None):
res = self._check_and_change_state_before_execution(
verbose=verbose,
ignore_all_deps=ignore_all_deps,
ignore_depends_on_past=ignore_depends_on_past,
ignore_task_deps=ignore_task_deps,
ignore_ti_state=ignore_ti_state,
verbose=verbose,
ignore_all_deps=ignore_all_deps,
ignore_depends_on_past=ignore_depends_on_past,
ignore_task_deps=ignore_task_deps,
ignore_ti_state=ignore_ti_state,
mark_success=mark_success,
test_mode=test_mode,
job_id=job_id,
pool=pool,
session=session)
if res:
self._run_raw_task(
mark_success=mark_success,
test_mode=test_mode,
job_id=job_id,
pool=pool,
session=session)
if res:
self._run_raw_task(
mark_success=mark_success,
test_mode=test_mode,
job_id=job_id,
pool=pool,
session=session)
def dry_run(self):
task = self.task
@ -2074,7 +2077,7 @@ class SkipMixin(LoggingMixin):
TaskInstance.dag_id == dag_run.dag_id,
TaskInstance.execution_date == dag_run.execution_date,
TaskInstance.task_id.in_(task_ids)
).update({TaskInstance.state : State.SKIPPED,
).update({TaskInstance.state: State.SKIPPED,
TaskInstance.start_date: now,
TaskInstance.end_date: now},
synchronize_session=False)
@ -2732,8 +2735,7 @@ class BaseOperator(LoggingMixin):
return self._downstream_task_ids
@provide_session
def clear(
self,
def clear(self,
start_date=None,
end_date=None,
upstream=False,
@ -2810,7 +2812,6 @@ class BaseOperator(LoggingMixin):
return list(map(lambda task_id: self._dag.task_dict[task_id],
self.get_flat_relative_ids(upstream)))
def run(
self,
start_date=None,
@ -2970,8 +2971,9 @@ class DagModel(Base):
dag_id = Column(String(ID_LEN), primary_key=True)
# A DAG can be paused from the UI / DB
# Set this default value of is_paused based on a configuration value!
is_paused_at_creation = configuration.conf.getboolean('core',
'dags_are_paused_at_creation')
is_paused_at_creation = configuration.conf\
.getboolean('core',
'dags_are_paused_at_creation')
is_paused = Column(Boolean, default=is_paused_at_creation)
# Whether the DAG is a subdag
is_subdag = Column(Boolean, default=False)
@ -3072,7 +3074,8 @@ class DAG(BaseDag, LoggingMixin):
:param sla_miss_callback: specify a function to call when reporting SLA
timeouts.
:type sla_miss_callback: types.FunctionType
:param default_view: Specify DAG default view (tree, graph, duration, gantt, landing_times)
:param default_view: Specify DAG default view (tree, graph, duration,
gantt, landing_times)
:type default_view: string
:param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT)
:type orientation: string
@ -3539,14 +3542,14 @@ class DAG(BaseDag, LoggingMixin):
# Check SubDag for class but don't check class directly, see
# https://github.com/airbnb/airflow/issues/1168
from airflow.operators.subdag_operator import SubDagOperator
l = []
subdag_lst = []
for task in self.tasks:
if (isinstance(task, SubDagOperator) or
#TODO remove in Airflow 2.0
type(task).__name__ == 'SubDagOperator'):
l.append(task.subdag)
l += task.subdag.subdags
return l
# TODO remove in Airflow 2.0
type(task).__name__ == 'SubDagOperator'):
subdag_lst.append(task.subdag)
subdag_lst += task.subdag.subdags
return subdag_lst
def resolve_template_files(self):
for t in self.tasks:
@ -4276,13 +4279,13 @@ class Variable(Base, LoggingMixin):
if self._val and self.is_encrypted:
try:
fernet = get_fernet()
except:
except Exception:
log.error("Can't decrypt _val for key={}, FERNET_KEY "
"configuration missing".format(self.key))
return None
try:
return fernet.decrypt(bytes(self._val, 'utf-8')).decode()
except:
except cryptography.fernet.InvalidToken:
log.error("Can't decrypt _val for key={}, invalid token "
"or value".format(self.key))
return None
@ -4297,7 +4300,8 @@ class Variable(Base, LoggingMixin):
self.is_encrypted = True
except AirflowException:
self.log.exception(
"Failed to load fernet while encrypting value, using non-encrypted value."
"Failed to load fernet while encrypting value, "
"using non-encrypted value."
)
self._val = value
self.is_encrypted = False
@ -4323,7 +4327,8 @@ class Variable(Base, LoggingMixin):
:return: Mixed
"""
default_sentinel = object()
obj = Variable.get(key, default_var=default_sentinel, deserialize_json=deserialize_json)
obj = Variable.get(key, default_var=default_sentinel,
deserialize_json=deserialize_json)
if obj is default_sentinel:
if default is not None:
Variable.set(key, default, serialize_json=deserialize_json)
@ -4449,8 +4454,7 @@ class XCom(Base, LoggingMixin):
@classmethod
@provide_session
def get_one(
cls,
def get_one(cls,
execution_date,
key=None,
task_id=None,
@ -4460,9 +4464,11 @@ class XCom(Base, LoggingMixin):
session=None):
"""
Retrieve an XCom value, optionally meeting certain criteria.
TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be removed in Airflow 2.0.
TODO: "pickling" has been deprecated and JSON is preferred.
"pickling" will be removed in Airflow 2.0.
:param enable_pickling: If pickling is not enabled, the XCOM value will be parsed to JSON instead.
:param enable_pickling: If pickling is not enabled,
the XCOM value will be parsed to JSON instead.
:return: XCom value
"""
filters = []
@ -4478,9 +4484,8 @@ class XCom(Base, LoggingMixin):
filters.append(cls.execution_date == execution_date)
query = (
session.query(cls.value)
.filter(and_(*filters))
.order_by(cls.execution_date.desc(), cls.timestamp.desc()))
session.query(cls.value).filter(and_(*filters))
.order_by(cls.execution_date.desc(), cls.timestamp.desc()))
result = query.first()
if result:
@ -4504,19 +4509,19 @@ class XCom(Base, LoggingMixin):
@classmethod
@provide_session
def get_many(
cls,
execution_date,
key=None,
task_ids=None,
dag_ids=None,
include_prior_dates=False,
limit=100,
enable_pickling=None,
session=None):
def get_many(cls,
execution_date,
key=None,
task_ids=None,
dag_ids=None,
include_prior_dates=False,
limit=100,
enable_pickling=None,
session=None):
"""
Retrieve an XCom value, optionally meeting certain criteria
TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be removed in Airflow 2.0.
TODO: "pickling" has been deprecated and JSON is preferred.
"pickling" will be removed in Airflow 2.0.
"""
filters = []
if key:
@ -4531,10 +4536,9 @@ class XCom(Base, LoggingMixin):
filters.append(cls.execution_date == execution_date)
query = (
session.query(cls)
.filter(and_(*filters))
.order_by(cls.execution_date.desc(), cls.timestamp.desc())
.limit(limit))
session.query(cls).filter(and_(*filters))
.order_by(cls.execution_date.desc(), cls.timestamp.desc())
.limit(limit))
results = query.all()
if enable_pickling is None:
enable_pickling = configuration.conf.getboolean(
@ -4625,7 +4629,7 @@ class DagStat(Base):
if dag_ids:
qry = qry.filter(DagStat.dag_id.in_(set(dag_ids)))
if dirty_only:
qry = qry.filter(DagStat.dirty == True)
qry = qry.filter(DagStat.dirty == True) # noqa
qry = qry.with_for_update().all()
@ -4919,7 +4923,8 @@ class DagRun(Base, LoggingMixin):
session=session
)
none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks)
none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks)
none_task_concurrency = all(t.task.task_concurrency is None
for t in unfinished_tasks)
# small speed up
if unfinished_tasks and none_depends_on_past and none_task_concurrency:
# todo: this can actually get pretty slow: one task costs between 0.01-015s
@ -5036,7 +5041,7 @@ class DagRun(Base, LoggingMixin):
"""
qry = session.query(DagRun).filter(
DagRun.dag_id == dag_id,
DagRun.external_trigger == False,
DagRun.external_trigger == False, # noqa
DagRun.execution_date == execution_date,
)
return qry.first()

Просмотреть файл

@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY