diff --git a/airflow/configuration.py b/airflow/configuration.py index 20ef0674be..e19a8b156d 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -22,31 +22,27 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals +from builtins import str +from collections import OrderedDict import copy import errno -import os -import subprocess -import warnings -import shlex -import sys - from future import standard_library - +import os +import shlex import six from six import iteritems +import subprocess +import sys +import warnings + from backports.configparser import ConfigParser from zope.deprecation import deprecated as _deprecated +from airflow.exceptions import AirflowConfigException from airflow.utils.log.logging_mixin import LoggingMixin standard_library.install_aliases() -from builtins import str -from collections import OrderedDict - -from airflow.exceptions import AirflowConfigException - - log = LoggingMixin().log # show Airflow's deprecation warnings @@ -323,8 +319,8 @@ class AirflowConfigParser(ConfigParser): opt = None if opt: if ( - not display_sensitive - and ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'): + not display_sensitive and + ev != 'AIRFLOW__CORE__UNIT_TEST_MODE'): opt = '< hidden >' if display_source: opt = (opt, 'env var') diff --git a/airflow/models.py b/airflow/models.py index 7aab4b59ff..4c1be8eaa8 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -28,6 +28,7 @@ from builtins import str from builtins import object, bytes import copy from collections import namedtuple, defaultdict +import cryptography from datetime import timedelta import dill @@ -59,7 +60,6 @@ from sqlalchemy import ( Index, Float, LargeBinary) from sqlalchemy import func, or_, and_, true as sqltrue from sqlalchemy.ext.declarative import declarative_base, declared_attr -from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.orm import reconstructor, relationship, synonym from sqlalchemy_utc import UtcDateTime @@ -77,7 +77,6 @@ from airflow.lineage import apply_lineage, prepare_lineage from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep -from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS from airflow.utils import timezone @@ -86,7 +85,7 @@ from airflow.utils.db import provide_session from airflow.utils.decorators import apply_defaults from airflow.utils.email import send_email from airflow.utils.helpers import ( - as_tuple, is_container, is_in, validate_key, pprinttable) + as_tuple, is_container, validate_key, pprinttable) from airflow.utils.operator_resources import Resources from airflow.utils.state import State from airflow.utils.timeout import timeout @@ -103,6 +102,7 @@ XCOM_RETURN_KEY = 'return_value' Stats = settings.Stats + def get_fernet(): """ Deferred load of Fernet key. @@ -115,7 +115,7 @@ def get_fernet(): """ try: from cryptography.fernet import Fernet - except: + except ImportError: raise AirflowException('Failed to import Fernet, it may not be installed') try: return Fernet(configuration.conf.get('core', 'FERNET_KEY').encode('utf-8')) @@ -126,6 +126,7 @@ def get_fernet(): # Used by DAG context_managers _CONTEXT_MANAGER_DAG = None + def clear_task_instances(tis, session, activate_dag_runs=True, dag=None): """ Clears a set of task instances, but makes sure the running ones @@ -247,7 +248,7 @@ class DagBag(BaseDagBag, LoggingMixin): filepath=orm_dag.fileloc, only_if_updated=False) # If the source file no longer exports `dag_id`, delete it from self.dags - if found_dags and dag_id in [dag.dag_id for dag in found_dags]: + if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]: return self.dags[dag_id] elif dag_id in self.dags: del self.dags[dag_id] @@ -354,7 +355,6 @@ class DagBag(BaseDagBag, LoggingMixin): self.file_last_changed[dag.full_filepath] = \ file_last_changed_on_disk - self.file_last_changed[filepath] = file_last_changed_on_disk return found_dags @@ -429,7 +429,6 @@ class DagBag(BaseDagBag, LoggingMixin): del self.dags[subdag.dag_id] raise cycle_exception - def collect_dags( self, dag_folder=None, @@ -644,7 +643,7 @@ class Connection(Base, LoggingMixin): if self._password and self.is_encrypted: try: fernet = get_fernet() - except: + except AirflowException: raise AirflowException( "Can't decrypt encrypted password for login={}, \ FERNET_KEY configuration is missing".format(self.login)) @@ -660,7 +659,7 @@ class Connection(Base, LoggingMixin): self.is_encrypted = True except AirflowException: self.log.exception("Failed to load fernet while encrypting value, " - "using non-encrypted value.") + "using non-encrypted value.") self._password = value self.is_encrypted = False @@ -673,7 +672,7 @@ class Connection(Base, LoggingMixin): if self._extra and self.is_extra_encrypted: try: fernet = get_fernet() - except: + except AirflowException: raise AirflowException( "Can't decrypt `extra` params for login={},\ FERNET_KEY configuration is missing".format(self.login)) @@ -689,7 +688,7 @@ class Connection(Base, LoggingMixin): self.is_extra_encrypted = True except AirflowException: self.log.exception("Failed to load fernet while encrypting value, " - "using non-encrypted value.") + "using non-encrypted value.") self._extra = value self.is_extra_encrypted = False else: @@ -757,7 +756,7 @@ class Connection(Base, LoggingMixin): elif self.conn_type == 'cassandra': from airflow.contrib.hooks.cassandra_hook import CassandraHook return CassandraHook(cassandra_conn_id=self.conn_id) - except: + except Exception: pass def __repr__(self): @@ -1330,8 +1329,11 @@ class TaskInstance(Base, LoggingMixin): if self.task.retry_exponential_backoff: min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2))) # deterministic per task instance - hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id, - self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16) + hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, + self.task_id, + self.execution_date, + self.try_number) + .encode('utf-8')).hexdigest(), 16) # between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number) modded_hash = min_backoff + hash % min_backoff # timedelta has a maximum representable value. The exponentiation @@ -1453,7 +1455,7 @@ class TaskInstance(Base, LoggingMixin): session.commit() return False - #TODO: Logging needs cleanup, not clear what is being printed + # TODO: Logging needs cleanup, not clear what is being printed hr = "\n" + ("-" * 80) + "\n" # Line break # For reporting purposes, we report based on 1-indexed, @@ -1518,7 +1520,8 @@ class TaskInstance(Base, LoggingMixin): settings.engine.dispose() if verbose: if mark_success: - msg = "Marking success for {} on {}".format(self.task, self.execution_date) + msg = "Marking success for {} on {}".format(self.task, + self.execution_date) self.log.info(msg) else: msg = "Executing {} on {}".format(self.task, self.execution_date) @@ -1661,23 +1664,23 @@ class TaskInstance(Base, LoggingMixin): pool=None, session=None): res = self._check_and_change_state_before_execution( - verbose=verbose, - ignore_all_deps=ignore_all_deps, - ignore_depends_on_past=ignore_depends_on_past, - ignore_task_deps=ignore_task_deps, - ignore_ti_state=ignore_ti_state, + verbose=verbose, + ignore_all_deps=ignore_all_deps, + ignore_depends_on_past=ignore_depends_on_past, + ignore_task_deps=ignore_task_deps, + ignore_ti_state=ignore_ti_state, + mark_success=mark_success, + test_mode=test_mode, + job_id=job_id, + pool=pool, + session=session) + if res: + self._run_raw_task( mark_success=mark_success, test_mode=test_mode, job_id=job_id, pool=pool, session=session) - if res: - self._run_raw_task( - mark_success=mark_success, - test_mode=test_mode, - job_id=job_id, - pool=pool, - session=session) def dry_run(self): task = self.task @@ -2074,7 +2077,7 @@ class SkipMixin(LoggingMixin): TaskInstance.dag_id == dag_run.dag_id, TaskInstance.execution_date == dag_run.execution_date, TaskInstance.task_id.in_(task_ids) - ).update({TaskInstance.state : State.SKIPPED, + ).update({TaskInstance.state: State.SKIPPED, TaskInstance.start_date: now, TaskInstance.end_date: now}, synchronize_session=False) @@ -2732,8 +2735,7 @@ class BaseOperator(LoggingMixin): return self._downstream_task_ids @provide_session - def clear( - self, + def clear(self, start_date=None, end_date=None, upstream=False, @@ -2810,7 +2812,6 @@ class BaseOperator(LoggingMixin): return list(map(lambda task_id: self._dag.task_dict[task_id], self.get_flat_relative_ids(upstream))) - def run( self, start_date=None, @@ -2970,8 +2971,9 @@ class DagModel(Base): dag_id = Column(String(ID_LEN), primary_key=True) # A DAG can be paused from the UI / DB # Set this default value of is_paused based on a configuration value! - is_paused_at_creation = configuration.conf.getboolean('core', - 'dags_are_paused_at_creation') + is_paused_at_creation = configuration.conf\ + .getboolean('core', + 'dags_are_paused_at_creation') is_paused = Column(Boolean, default=is_paused_at_creation) # Whether the DAG is a subdag is_subdag = Column(Boolean, default=False) @@ -3072,7 +3074,8 @@ class DAG(BaseDag, LoggingMixin): :param sla_miss_callback: specify a function to call when reporting SLA timeouts. :type sla_miss_callback: types.FunctionType - :param default_view: Specify DAG default view (tree, graph, duration, gantt, landing_times) + :param default_view: Specify DAG default view (tree, graph, duration, + gantt, landing_times) :type default_view: string :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT) :type orientation: string @@ -3539,14 +3542,14 @@ class DAG(BaseDag, LoggingMixin): # Check SubDag for class but don't check class directly, see # https://github.com/airbnb/airflow/issues/1168 from airflow.operators.subdag_operator import SubDagOperator - l = [] + subdag_lst = [] for task in self.tasks: if (isinstance(task, SubDagOperator) or - #TODO remove in Airflow 2.0 - type(task).__name__ == 'SubDagOperator'): - l.append(task.subdag) - l += task.subdag.subdags - return l + # TODO remove in Airflow 2.0 + type(task).__name__ == 'SubDagOperator'): + subdag_lst.append(task.subdag) + subdag_lst += task.subdag.subdags + return subdag_lst def resolve_template_files(self): for t in self.tasks: @@ -4276,13 +4279,13 @@ class Variable(Base, LoggingMixin): if self._val and self.is_encrypted: try: fernet = get_fernet() - except: + except Exception: log.error("Can't decrypt _val for key={}, FERNET_KEY " "configuration missing".format(self.key)) return None try: return fernet.decrypt(bytes(self._val, 'utf-8')).decode() - except: + except cryptography.fernet.InvalidToken: log.error("Can't decrypt _val for key={}, invalid token " "or value".format(self.key)) return None @@ -4297,7 +4300,8 @@ class Variable(Base, LoggingMixin): self.is_encrypted = True except AirflowException: self.log.exception( - "Failed to load fernet while encrypting value, using non-encrypted value." + "Failed to load fernet while encrypting value, " + "using non-encrypted value." ) self._val = value self.is_encrypted = False @@ -4323,7 +4327,8 @@ class Variable(Base, LoggingMixin): :return: Mixed """ default_sentinel = object() - obj = Variable.get(key, default_var=default_sentinel, deserialize_json=deserialize_json) + obj = Variable.get(key, default_var=default_sentinel, + deserialize_json=deserialize_json) if obj is default_sentinel: if default is not None: Variable.set(key, default, serialize_json=deserialize_json) @@ -4449,8 +4454,7 @@ class XCom(Base, LoggingMixin): @classmethod @provide_session - def get_one( - cls, + def get_one(cls, execution_date, key=None, task_id=None, @@ -4460,9 +4464,11 @@ class XCom(Base, LoggingMixin): session=None): """ Retrieve an XCom value, optionally meeting certain criteria. - TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be removed in Airflow 2.0. + TODO: "pickling" has been deprecated and JSON is preferred. + "pickling" will be removed in Airflow 2.0. - :param enable_pickling: If pickling is not enabled, the XCOM value will be parsed to JSON instead. + :param enable_pickling: If pickling is not enabled, + the XCOM value will be parsed to JSON instead. :return: XCom value """ filters = [] @@ -4478,9 +4484,8 @@ class XCom(Base, LoggingMixin): filters.append(cls.execution_date == execution_date) query = ( - session.query(cls.value) - .filter(and_(*filters)) - .order_by(cls.execution_date.desc(), cls.timestamp.desc())) + session.query(cls.value).filter(and_(*filters)) + .order_by(cls.execution_date.desc(), cls.timestamp.desc())) result = query.first() if result: @@ -4504,19 +4509,19 @@ class XCom(Base, LoggingMixin): @classmethod @provide_session - def get_many( - cls, - execution_date, - key=None, - task_ids=None, - dag_ids=None, - include_prior_dates=False, - limit=100, - enable_pickling=None, - session=None): + def get_many(cls, + execution_date, + key=None, + task_ids=None, + dag_ids=None, + include_prior_dates=False, + limit=100, + enable_pickling=None, + session=None): """ Retrieve an XCom value, optionally meeting certain criteria - TODO: "pickling" has been deprecated and JSON is preferred. "pickling" will be removed in Airflow 2.0. + TODO: "pickling" has been deprecated and JSON is preferred. + "pickling" will be removed in Airflow 2.0. """ filters = [] if key: @@ -4531,10 +4536,9 @@ class XCom(Base, LoggingMixin): filters.append(cls.execution_date == execution_date) query = ( - session.query(cls) - .filter(and_(*filters)) - .order_by(cls.execution_date.desc(), cls.timestamp.desc()) - .limit(limit)) + session.query(cls).filter(and_(*filters)) + .order_by(cls.execution_date.desc(), cls.timestamp.desc()) + .limit(limit)) results = query.all() if enable_pickling is None: enable_pickling = configuration.conf.getboolean( @@ -4625,7 +4629,7 @@ class DagStat(Base): if dag_ids: qry = qry.filter(DagStat.dag_id.in_(set(dag_ids))) if dirty_only: - qry = qry.filter(DagStat.dirty == True) + qry = qry.filter(DagStat.dirty == True) # noqa qry = qry.with_for_update().all() @@ -4919,7 +4923,8 @@ class DagRun(Base, LoggingMixin): session=session ) none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks) - none_task_concurrency = all(t.task.task_concurrency is None for t in unfinished_tasks) + none_task_concurrency = all(t.task.task_concurrency is None + for t in unfinished_tasks) # small speed up if unfinished_tasks and none_depends_on_past and none_task_concurrency: # todo: this can actually get pretty slow: one task costs between 0.01-015s @@ -5036,7 +5041,7 @@ class DagRun(Base, LoggingMixin): """ qry = session.query(DagRun).filter( DagRun.dag_id == dag_id, - DagRun.external_trigger == False, + DagRun.external_trigger == False, # noqa DagRun.execution_date == execution_date, ) return qry.first() diff --git a/airflow/version.py b/airflow/version.py index 750da36d9e..d11d76608d 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY