diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index e0ea313b33..e9366e017f 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -12,16 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime - from airflow.jobs import BackfillJob from airflow.models import DagRun, TaskInstance from airflow.operators.subdag_operator import SubDagOperator from airflow.settings import Session +from airflow.utils import timezone from airflow.utils.state import State from sqlalchemy import or_ + def _create_dagruns(dag, execution_dates, state, run_id_template): """ Infers from the dates which dag runs need to be created and does so. @@ -39,7 +39,7 @@ def _create_dagruns(dag, execution_dates, state, run_id_template): dr = dag.create_dagrun( run_id=run_id_template.format(date.isoformat()), execution_date=date, - start_date=datetime.datetime.utcnow(), + start_date=timezone.utcnow(), external_trigger=False, state=state, ) @@ -67,7 +67,7 @@ def set_state(task, execution_date, upstream=False, downstream=False, :param commit: Commit tasks to be altered to the database :return: list of tasks that have been created and updated """ - assert isinstance(execution_date, datetime.datetime) + assert timezone.is_localized(execution_date) # microseconds are supported by the database, but is not handled # correctly by airflow on e.g. the filesystem and in other places @@ -185,6 +185,7 @@ def set_state(task, execution_date, upstream=False, downstream=False, return tis_altered + def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False): """ Set the state of a dag run and all task instances associated with the dag diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py index bfb6ad47c1..9d9934d2f4 100644 --- a/airflow/api/common/experimental/trigger_dag.py +++ b/airflow/api/common/experimental/trigger_dag.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime import json from airflow.exceptions import AirflowException from airflow.models import DagRun, DagBag +from airflow.utils import timezone from airflow.utils.state import State @@ -29,9 +29,9 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None): dag = dagbag.get_dag(dag_id) if not execution_date: - execution_date = datetime.datetime.utcnow() + execution_date = timezone.utcnow() - assert isinstance(execution_date, datetime.datetime) + assert timezone.is_localized(execution_date) execution_date = execution_date.replace(microsecond=0) if not run_id: diff --git a/airflow/jobs.py b/airflow/jobs.py index 664fab555f..4e1864e8a9 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -28,8 +28,9 @@ import socket import sys import threading import time +import datetime + from collections import defaultdict -from datetime import datetime from past.builtins import basestring from sqlalchemy import ( Column, Integer, String, DateTime, func, Index, or_, and_, not_) @@ -46,7 +47,7 @@ from airflow.models import DAG, DagRun from airflow.settings import Stats from airflow.task_runner import get_task_runner from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS -from airflow.utils import asciiart +from airflow.utils import asciiart, timezone from airflow.utils.dag_processing import (AbstractDagFileProcessor, DagFileProcessorManager, SimpleDag, @@ -100,22 +101,22 @@ class BaseJob(Base, LoggingMixin): self.hostname = socket.getfqdn() self.executor = executor self.executor_class = executor.__class__.__name__ - self.start_date = datetime.utcnow() - self.latest_heartbeat = datetime.utcnow() + self.start_date = timezone.utcnow() + self.latest_heartbeat = timezone.utcnow() self.heartrate = heartrate self.unixname = getpass.getuser() super(BaseJob, self).__init__(*args, **kwargs) def is_alive(self): return ( - (datetime.utcnow() - self.latest_heartbeat).seconds < + (timezone.utcnow() - self.latest_heartbeat).seconds < (conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1) ) @provide_session def kill(self, session=None): job = session.query(BaseJob).filter(BaseJob.id == self.id).first() - job.end_date = datetime.utcnow() + job.end_date = timezone.utcnow() try: self.on_kill() except: @@ -165,14 +166,14 @@ class BaseJob(Base, LoggingMixin): if job.latest_heartbeat: sleep_for = max( 0, - self.heartrate - (datetime.utcnow() - job.latest_heartbeat).total_seconds()) + self.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds()) sleep(sleep_for) # Update last heartbeat time with create_session() as session: job = session.query(BaseJob).filter(BaseJob.id == self.id).first() - job.latest_heartbeat = datetime.utcnow() + job.latest_heartbeat = timezone.utcnow() session.merge(job) session.commit() @@ -194,7 +195,7 @@ class BaseJob(Base, LoggingMixin): self._execute() # Marking the success in the DB - self.end_date = datetime.utcnow() + self.end_date = timezone.utcnow() self.state = State.SUCCESS session.merge(self) session.commit() @@ -399,7 +400,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): self._pickle_dags, self._dag_id_white_list, "DagFileProcessor{}".format(self._instance_id)) - self._start_time = datetime.utcnow() + self._start_time = timezone.utcnow() def terminate(self, sigkill=False): """ @@ -615,16 +616,16 @@ class SchedulerJob(BaseJob): TI.execution_date == sq.c.max_ti, ).all() - ts = datetime.utcnow() + ts = timezone.utcnow() SlaMiss = models.SlaMiss for ti in max_tis: task = dag.get_task(ti.task_id) dttm = ti.execution_date if task.sla: dttm = dag.following_schedule(dttm) - while dttm < datetime.utcnow(): + while dttm < timezone.utcnow(): following_schedule = dag.following_schedule(dttm) - if following_schedule + task.sla < datetime.utcnow(): + if following_schedule + task.sla < timezone.utcnow(): session.merge(models.SlaMiss( task_id=ti.task_id, dag_id=ti.dag_id, @@ -772,9 +773,9 @@ class SchedulerJob(BaseJob): for dr in active_runs: if ( dr.start_date and dag.dagrun_timeout and - dr.start_date < datetime.utcnow() - dag.dagrun_timeout): + dr.start_date < timezone.utcnow() - dag.dagrun_timeout): dr.state = State.FAILED - dr.end_date = datetime.utcnow() + dr.end_date = timezone.utcnow() timedout_runs += 1 session.commit() if len(active_runs) - timedout_runs >= dag.max_active_runs: @@ -799,9 +800,9 @@ class SchedulerJob(BaseJob): # don't do scheduler catchup for dag's that don't have dag.catchup = True if not dag.catchup: # The logic is that we move start_date up until - # one period before, so that datetime.utcnow() is AFTER + # one period before, so that timezone.utcnow() is AFTER # the period end, and the job can be created... - now = datetime.utcnow() + now = timezone.utcnow() next_start = dag.following_schedule(now) last_start = dag.previous_schedule(now) if next_start <= now: @@ -847,7 +848,7 @@ class SchedulerJob(BaseJob): ) # don't ever schedule in the future - if next_run_date > datetime.utcnow(): + if next_run_date > timezone.utcnow(): return # this structure is necessary to avoid a TypeError from concatenating @@ -870,11 +871,11 @@ class SchedulerJob(BaseJob): if next_run_date and min_task_end_date and next_run_date > min_task_end_date: return - if next_run_date and period_end and period_end <= datetime.utcnow(): + if next_run_date and period_end and period_end <= timezone.utcnow(): next_run = dag.create_dagrun( run_id=DagRun.ID_PREFIX + next_run_date.isoformat(), execution_date=next_run_date, - start_date=datetime.utcnow(), + start_date=timezone.utcnow(), state=State.RUNNING, external_trigger=False ) @@ -894,7 +895,7 @@ class SchedulerJob(BaseJob): for run in dag_runs: self.log.info("Examining DAG run %s", run) # don't consider runs that are executed in the future - if run.execution_date > datetime.utcnow(): + if run.execution_date > timezone.utcnow(): self.log.error( "Execution date is in future: %s", run.execution_date @@ -1231,7 +1232,7 @@ class SchedulerJob(BaseJob): # set TIs to queued state for task_instance in tis_to_set_to_queued: task_instance.state = State.QUEUED - task_instance.queued_dttm = (datetime.utcnow() + task_instance.queued_dttm = (timezone.utcnow() if not task_instance.queued_dttm else task_instance.queued_dttm) session.merge(task_instance) @@ -1468,7 +1469,7 @@ class SchedulerJob(BaseJob): last_runtime = processor_manager.get_last_runtime(file_path) processor_pid = processor_manager.get_pid(file_path) processor_start_time = processor_manager.get_start_time(file_path) - runtime = ((datetime.utcnow() - processor_start_time).total_seconds() + runtime = ((timezone.utcnow() - processor_start_time).total_seconds() if processor_start_time else None) last_run = processor_manager.get_last_finish_time(file_path) @@ -1585,34 +1586,34 @@ class SchedulerJob(BaseJob): self.log.info("Resetting orphaned tasks for active dag runs") self.reset_state_for_orphaned_tasks() - execute_start_time = datetime.utcnow() + execute_start_time = timezone.utcnow() # Last time stats were printed - last_stat_print_time = datetime(2000, 1, 1) + last_stat_print_time = datetime.datetime(2000, 1, 1, tzinfo=timezone.utc) # Last time that self.heartbeat() was called. - last_self_heartbeat_time = datetime.utcnow() + last_self_heartbeat_time = timezone.utcnow() # Last time that the DAG dir was traversed to look for files - last_dag_dir_refresh_time = datetime.utcnow() + last_dag_dir_refresh_time = timezone.utcnow() # Use this value initially known_file_paths = processor_manager.file_paths # For the execute duration, parse and schedule DAGs - while (datetime.utcnow() - execute_start_time).total_seconds() < \ + while (timezone.utcnow() - execute_start_time).total_seconds() < \ self.run_duration or self.run_duration < 0: self.log.debug("Starting Loop...") loop_start_time = time.time() # Traverse the DAG directory for Python files containing DAGs # periodically - elapsed_time_since_refresh = (datetime.utcnow() - + elapsed_time_since_refresh = (timezone.utcnow() - last_dag_dir_refresh_time).total_seconds() if elapsed_time_since_refresh > self.dag_dir_list_interval: # Build up a list of Python files that could contain DAGs self.log.info("Searching for files in %s", self.subdir) known_file_paths = list_py_file_paths(self.subdir) - last_dag_dir_refresh_time = datetime.utcnow() + last_dag_dir_refresh_time = timezone.utcnow() self.log.info("There are %s files in %s", len(known_file_paths), self.subdir) processor_manager.set_file_paths(known_file_paths) @@ -1662,20 +1663,20 @@ class SchedulerJob(BaseJob): self._process_executor_events(simple_dag_bag) # Heartbeat the scheduler periodically - time_since_last_heartbeat = (datetime.utcnow() - + time_since_last_heartbeat = (timezone.utcnow() - last_self_heartbeat_time).total_seconds() if time_since_last_heartbeat > self.heartrate: self.log.info("Heartbeating the scheduler") self.heartbeat() - last_self_heartbeat_time = datetime.utcnow() + last_self_heartbeat_time = timezone.utcnow() # Occasionally print out stats about how fast the files are getting processed - if ((datetime.utcnow() - last_stat_print_time).total_seconds() > + if ((timezone.utcnow() - last_stat_print_time).total_seconds() > self.print_stats_interval): if len(known_file_paths) > 0: self._log_file_processing_stats(known_file_paths, processor_manager) - last_stat_print_time = datetime.utcnow() + last_stat_print_time = timezone.utcnow() loop_end_time = time.time() self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time) @@ -2049,7 +2050,7 @@ class BackfillJob(BaseJob): run = run or self.dag.create_dagrun( run_id=run_id, execution_date=run_date, - start_date=datetime.utcnow(), + start_date=timezone.utcnow(), state=State.RUNNING, external_trigger=False, session=session diff --git a/airflow/models.py b/airflow/models.py index 37a49cf601..fe62ac54dc 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -24,7 +24,8 @@ from builtins import str from builtins import object, bytes import copy from collections import namedtuple -from datetime import datetime, timedelta +from datetime import timedelta + import dill import functools import getpass @@ -69,6 +70,7 @@ from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS +from airflow.utils import timezone from airflow.utils.dates import cron_presets, date_range as utils_date_range from airflow.utils.db import provide_session from airflow.utils.decorators import apply_defaults @@ -154,7 +156,7 @@ def clear_task_instances(tis, session, activate_dag_runs=True, dag=None): ).all() for dr in drs: dr.state = State.RUNNING - dr.start_date = datetime.utcnow() + dr.start_date = timezone.utcnow() class DagBag(BaseDagBag, LoggingMixin): @@ -341,7 +343,7 @@ class DagBag(BaseDagBag, LoggingMixin): self.log.info("Finding 'running' jobs without a recent heartbeat") TI = TaskInstance secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold') - limit_dttm = datetime.utcnow() - timedelta(seconds=secs) + limit_dttm = timezone.utcnow() - timedelta(seconds=secs) self.log.info("Failing jobs without heartbeat after %s", limit_dttm) tis = ( @@ -373,7 +375,7 @@ class DagBag(BaseDagBag, LoggingMixin): """ self.dags[dag.dag_id] = dag dag.resolve_template_files() - dag.last_loaded = datetime.utcnow() + dag.last_loaded = timezone.utcnow() for task in dag.tasks: settings.policy(task) @@ -398,7 +400,7 @@ class DagBag(BaseDagBag, LoggingMixin): ignoring files that match any of the regex patterns specified in the file. """ - start_dttm = datetime.utcnow() + start_dttm = timezone.utcnow() dag_folder = dag_folder or self.dag_folder # Used to store stats around DagBag processing @@ -426,11 +428,11 @@ class DagBag(BaseDagBag, LoggingMixin): continue if not any( [re.findall(p, filepath) for p in patterns]): - ts = datetime.utcnow() + ts = timezone.utcnow() found_dags = self.process_file( filepath, only_if_updated=only_if_updated) - td = datetime.utcnow() - ts + td = timezone.utcnow() - ts td = td.total_seconds() + ( float(td.microseconds) / 1000000) stats.append(FileLoadStat( @@ -443,7 +445,7 @@ class DagBag(BaseDagBag, LoggingMixin): except Exception as e: self.log.exception(e) Stats.gauge( - 'collect_dags', (datetime.utcnow() - start_dttm).total_seconds(), 1) + 'collect_dags', (timezone.utcnow() - start_dttm).total_seconds(), 1) Stats.gauge( 'dagbag_size', len(self.dags), 1) Stats.gauge( @@ -1065,8 +1067,8 @@ class TaskInstance(Base, LoggingMixin): @provide_session def set_state(self, state, session=None): self.state = state - self.start_date = datetime.utcnow() - self.end_date = datetime.utcnow() + self.start_date = timezone.utcnow() + self.end_date = timezone.utcnow() session.merge(self) session.commit() @@ -1231,7 +1233,7 @@ class TaskInstance(Base, LoggingMixin): to be retried. """ return (self.state == State.UP_FOR_RETRY and - self.next_retry_datetime() < datetime.utcnow()) + self.next_retry_datetime() < timezone.utcnow()) @provide_session def pool_full(self, session): @@ -1339,7 +1341,7 @@ class TaskInstance(Base, LoggingMixin): msg = "Starting attempt {attempt} of {total}".format( attempt=self.try_number + 1, total=self.max_tries + 1) - self.start_date = datetime.utcnow() + self.start_date = timezone.utcnow() dep_context = DepContext( deps=RUN_DEPS - QUEUE_DEPS, @@ -1363,7 +1365,7 @@ class TaskInstance(Base, LoggingMixin): total=self.max_tries + 1) self.log.warning(hr + msg + hr) - self.queued_dttm = datetime.utcnow() + self.queued_dttm = timezone.utcnow() self.log.info("Queuing into pool %s", self.pool) session.merge(self) session.commit() @@ -1508,7 +1510,7 @@ class TaskInstance(Base, LoggingMixin): raise # Recording SUCCESS - self.end_date = datetime.utcnow() + self.end_date = timezone.utcnow() self.set_duration() if not test_mode: session.add(Log(self.state, self)) @@ -1569,7 +1571,7 @@ class TaskInstance(Base, LoggingMixin): def handle_failure(self, error, test_mode=False, context=None, session=None): self.log.exception(error) task = self.task - self.end_date = datetime.utcnow() + self.end_date = timezone.utcnow() self.set_duration() Stats.incr('operator_failures_{}'.format(task.__class__.__name__), 1, 1) Stats.incr('ti_failures') @@ -1891,7 +1893,7 @@ class Log(Base): extra = Column(Text) def __init__(self, event, task_instance, owner=None, extra=None, **kwargs): - self.dttm = datetime.utcnow() + self.dttm = timezone.utcnow() self.event = event self.extra = extra @@ -1929,7 +1931,7 @@ class SkipMixin(LoggingMixin): return task_ids = [d.task_id for d in tasks] - now = datetime.utcnow() + now = timezone.utcnow() if dag_run: session.query(TaskInstance).filter( @@ -2544,7 +2546,7 @@ class BaseOperator(LoggingMixin): range. """ TI = TaskInstance - end_date = end_date or datetime.utcnow() + end_date = end_date or timezone.utcnow() return session.query(TI).filter( TI.dag_id == self.dag_id, TI.task_id == self.task_id, @@ -2591,7 +2593,7 @@ class BaseOperator(LoggingMixin): Run a set of task instances for a date range. """ start_date = start_date or self.start_date - end_date = end_date or self.end_date or datetime.utcnow() + end_date = end_date or self.end_date or timezone.utcnow() for dt in self.dag.date_range(start_date, end_date=end_date): TaskInstance(self, dt).run( @@ -2883,8 +2885,28 @@ class DAG(BaseDag, LoggingMixin): # set file location to caller source path self.fileloc = sys._getframe().f_back.f_code.co_filename self.task_dict = dict() - self.start_date = start_date - self.end_date = end_date + + # set timezone + if start_date and start_date.tzinfo: + self.timezone = start_date.tzinfo + elif 'start_date' in self.default_args and self.default_args['start_date'].tzinfo: + self.timezone = self.default_args['start_date'].tzinfo + else: + self.timezone = settings.TIMEZONE + + self.start_date = timezone.convert_to_utc(start_date) + self.end_date = timezone.convert_to_utc(end_date) + + # also convert tasks + if 'start_date' in self.default_args: + self.default_args['start_date'] = ( + timezone.convert_to_utc(self.default_args['start_date']) + ) + if 'end_date' in self.default_args: + self.default_args['end_date'] = ( + timezone.convert_to_utc(self.default_args['end_date']) + ) + self.schedule_interval = schedule_interval if schedule_interval in cron_presets: self._schedule_interval = cron_presets.get(schedule_interval) @@ -2896,7 +2918,7 @@ class DAG(BaseDag, LoggingMixin): template_searchpath = [template_searchpath] self.template_searchpath = template_searchpath self.parent_dag = None # Gets set when DAGs are loaded - self.last_loaded = datetime.utcnow() + self.last_loaded = timezone.utcnow() self.safe_dag_id = dag_id.replace('.', '__dot__') self.max_active_runs = max_active_runs self.dagrun_timeout = dagrun_timeout @@ -2965,7 +2987,7 @@ class DAG(BaseDag, LoggingMixin): # /Context Manager ---------------------------------------------- - def date_range(self, start_date, num=None, end_date=datetime.utcnow()): + def date_range(self, start_date, num=None, end_date=timezone.utcnow()): if num: end_date = None return utils_date_range( @@ -2993,7 +3015,7 @@ class DAG(BaseDag, LoggingMixin): :param start_date: the start date of the interval :type start_date: datetime - :param end_date: the end date of the interval, defaults to datetime.utcnow() + :param end_date: the end date of the interval, defaults to timezone.utcnow() :type end_date: datetime :return: a list of dates within the interval following the dag's schedule :rtype: list @@ -3005,7 +3027,7 @@ class DAG(BaseDag, LoggingMixin): # dates for dag runs using_start_date = using_start_date or min([t.start_date for t in self.tasks]) - using_end_date = using_end_date or datetime.utcnow() + using_end_date = using_end_date or timezone.utcnow() # next run date for a subdag isn't relevant (schedule_interval for subdags # is ignored) so we use the dag run's start date in the case of a subdag @@ -3274,9 +3296,9 @@ class DAG(BaseDag, LoggingMixin): self, session, start_date=None, end_date=None, state=None): TI = TaskInstance if not start_date: - start_date = (datetime.utcnow() - timedelta(30)).date() + start_date = (timezone.utcnow() - timedelta(30)).date() start_date = datetime.combine(start_date, datetime.min.time()) - end_date = end_date or datetime.utcnow() + end_date = end_date or timezone.utcnow() tis = session.query(TI).filter( TI.dag_id == self.dag_id, TI.execution_date >= start_date, @@ -3536,10 +3558,10 @@ class DAG(BaseDag, LoggingMixin): d = {} d['is_picklable'] = True try: - dttm = datetime.utcnow() + dttm = timezone.utcnow() pickled = pickle.dumps(self) d['pickle_len'] = len(pickled) - d['pickling_duration'] = "{}".format(datetime.utcnow() - dttm) + d['pickling_duration'] = "{}".format(timezone.utcnow() - dttm) except Exception as e: self.log.debug(e) d['is_picklable'] = False @@ -3557,7 +3579,7 @@ class DAG(BaseDag, LoggingMixin): if not dp or dp.pickle != self: dp = DagPickle(dag=self) session.add(dp) - self.last_pickled = datetime.utcnow() + self.last_pickled = timezone.utcnow() session.commit() self.pickle_id = dp.id @@ -3773,7 +3795,7 @@ class DAG(BaseDag, LoggingMixin): if owner is None: owner = self.owner if sync_time is None: - sync_time = datetime.utcnow() + sync_time = timezone.utcnow() orm_dag = session.query( DagModel).filter(DagModel.dag_id == self.dag_id).first() @@ -4566,7 +4588,7 @@ class DagRun(Base, LoggingMixin): # pre-calculate # db is faster - start_dttm = datetime.utcnow() + start_dttm = timezone.utcnow() unfinished_tasks = self.get_task_instances( state=State.unfinished(), session=session @@ -4590,7 +4612,7 @@ class DagRun(Base, LoggingMixin): no_dependencies_met = False break - duration = (datetime.utcnow() - start_dttm).total_seconds() * 1000 + duration = (timezone.utcnow() - start_dttm).total_seconds() * 1000 Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration) # future: remove the check on adhoc tasks (=active_tasks) diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index 923b8a4ad7..2b5a814111 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime - from airflow.models import BaseOperator, DagBag +from airflow.utils import timezone from airflow.utils.db import create_session from airflow.utils.decorators import apply_defaults from airflow.utils.state import State @@ -59,7 +58,7 @@ class TriggerDagRunOperator(BaseOperator): self.trigger_dag_id = trigger_dag_id def execute(self, context): - dro = DagRunOrder(run_id='trig__' + datetime.utcnow().isoformat()) + dro = DagRunOrder(run_id='trig__' + timezone.utcnow().isoformat()) dro = self.python_callable(context, dro) if dro: with create_session() as session: diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py index 7abd92d451..7b4e0ca6bb 100644 --- a/airflow/operators/latest_only_operator.py +++ b/airflow/operators/latest_only_operator.py @@ -12,9 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime from airflow.models import BaseOperator, SkipMixin +from airflow.utils import timezone class LatestOnlyOperator(BaseOperator, SkipMixin): @@ -35,7 +35,7 @@ class LatestOnlyOperator(BaseOperator, SkipMixin): self.log.info("Externally triggered DAG_Run: allowing execution to proceed.") return - now = datetime.datetime.utcnow() + now = timezone.utcnow() left_window = context['dag'].following_schedule( context['execution_date']) right_window = context['dag'].following_schedule(left_window) diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index bd073b885b..c8a8df67ee 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -21,7 +21,7 @@ standard_library.install_aliases() from builtins import str from past.builtins import basestring -from datetime import datetime +from airflow.utils import timezone from urllib.parse import urlparse from time import sleep import re @@ -75,9 +75,9 @@ class BaseSensorOperator(BaseOperator): raise AirflowException('Override me.') def execute(self, context): - started_at = datetime.utcnow() + started_at = timezone.utcnow() while not self.poke(context): - if (datetime.utcnow() - started_at).total_seconds() > self.timeout: + if (timezone.utcnow() - started_at).total_seconds() > self.timeout: if self.soft_fail: raise AirflowSkipException('Snap. Time is OUT.') else: @@ -602,7 +602,7 @@ class TimeSensor(BaseSensorOperator): def poke(self, context): self.log.info('Checking if the time (%s) has come', self.target_time) - return datetime.utcnow().time() > self.target_time + return timezone.utcnow().time() > self.target_time class TimeDeltaSensor(BaseSensorOperator): @@ -627,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator): target_dttm = dag.following_schedule(context['execution_date']) target_dttm += self.delta self.log.info('Checking if the time (%s) has come', target_dttm) - return datetime.utcnow() > target_dttm + return timezone.utcnow() > target_dttm class HttpSensor(BaseSensorOperator): diff --git a/airflow/ti_deps/deps/not_in_retry_period_dep.py b/airflow/ti_deps/deps/not_in_retry_period_dep.py index 7f9bff65f4..6628ff3333 100644 --- a/airflow/ti_deps/deps/not_in_retry_period_dep.py +++ b/airflow/ti_deps/deps/not_in_retry_period_dep.py @@ -11,9 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime from airflow.ti_deps.deps.base_ti_dep import BaseTIDep +from airflow.utils import timezone from airflow.utils.db import provide_session from airflow.utils.state import State @@ -38,7 +38,7 @@ class NotInRetryPeriodDep(BaseTIDep): # Calculate the date first so that it is always smaller than the timestamp used by # ready_for_retry - cur_date = datetime.utcnow() + cur_date = timezone.utcnow() next_task_retry_date = ti.next_retry_datetime() if ti.is_premature: yield self._failing_status( diff --git a/airflow/ti_deps/deps/runnable_exec_date_dep.py b/airflow/ti_deps/deps/runnable_exec_date_dep.py index 13e5345835..69321d97db 100644 --- a/airflow/ti_deps/deps/runnable_exec_date_dep.py +++ b/airflow/ti_deps/deps/runnable_exec_date_dep.py @@ -11,9 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime from airflow.ti_deps.deps.base_ti_dep import BaseTIDep +from airflow.utils import timezone from airflow.utils.db import provide_session @@ -23,7 +23,7 @@ class RunnableExecDateDep(BaseTIDep): @provide_session def _get_dep_statuses(self, ti, session, dep_context): - cur_date = datetime.utcnow() + cur_date = timezone.utcnow() if ti.execution_date > cur_date: yield self._failing_status( diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py index 68cee7601e..965e88b581 100644 --- a/airflow/utils/dag_processing.py +++ b/airflow/utils/dag_processing.py @@ -23,10 +23,10 @@ import time import zipfile from abc import ABCMeta, abstractmethod from collections import defaultdict -from datetime import datetime from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.exceptions import AirflowException +from airflow.utils import timezone from airflow.utils.log.logging_mixin import LoggingMixin @@ -376,7 +376,7 @@ class DagFileProcessorManager(LoggingMixin): being processed """ if file_path in self._processors: - return (datetime.utcnow() - self._processors[file_path].start_time)\ + return (timezone.utcnow() - self._processors[file_path].start_time)\ .total_seconds() return None @@ -466,7 +466,7 @@ class DagFileProcessorManager(LoggingMixin): for file_path, processor in self._processors.items(): if processor.done: self.log.info("Processor for %s finished", file_path) - now = datetime.utcnow() + now = timezone.utcnow() finished_processors[file_path] = processor self._last_runtime[file_path] = (now - processor.start_time).total_seconds() @@ -494,7 +494,7 @@ class DagFileProcessorManager(LoggingMixin): # If the file path is already being processed, or if a file was # processed recently, wait until the next batch file_paths_in_progress = self._processors.keys() - now = datetime.utcnow() + now = timezone.utcnow() file_paths_recently_processed = [] for file_path in self._file_paths: last_finish_time = self.get_last_finish_time(file_path) diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 81e1c2cb06..7d0d9d9775 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -17,6 +17,7 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals +from airflow.utils import timezone from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta # for doctest import six @@ -66,7 +67,7 @@ def date_range( if end_date and num: raise Exception("Wait. Either specify end_date OR num") if not end_date and not num: - end_date = datetime.utcnow() + end_date = timezone.utcnow() delta_iscron = False if isinstance(delta, six.string_types): @@ -219,7 +220,7 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0): Get a datetime object representing `n` days ago. By default the time is set to midnight. """ - today = datetime.utcnow().replace( + today = timezone.utcnow().replace( hour=hour, minute=minute, second=second, diff --git a/airflow/www/forms.py b/airflow/www/forms.py index 2c6118c0da..f5af35a6f3 100644 --- a/airflow/www/forms.py +++ b/airflow/www/forms.py @@ -17,7 +17,7 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals -from datetime import datetime +from airflow.utils import timezone from flask_admin.form import DateTimePickerWidget from wtforms import DateTimeField, SelectField from flask_wtf import Form @@ -33,7 +33,7 @@ class DateTimeWithNumRunsForm(Form): # Date time and number of runs form for tree view, task duration # and landing times base_date = DateTimeField( - "Anchor date", widget=DateTimePickerWidget(), default=datetime.utcnow()) + "Anchor date", widget=DateTimePickerWidget(), default=timezone.utcnow()) num_runs = SelectField("Number of runs", default=25, choices=( (5, "5"), (25, "25"), diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 52b22fca79..ae1fb5f687 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -21,7 +21,7 @@ from cgi import escape from io import BytesIO as IO import functools import gzip -import dateutil.parser as dateparser +import iso8601 import json import time @@ -46,6 +46,7 @@ DEFAULT_SENSITIVE_VARIABLE_FIELDS = ( 'access_token', ) + def should_hide_value_for_key(key_name): return any(s in key_name.lower() for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) \ and configuration.getboolean('admin', 'hide_sensitive_variable_fields') @@ -252,8 +253,8 @@ def action_logging(f): dag_id=request.args.get('dag_id')) if 'execution_date' in request.args: - log.execution_date = dateparser.parse( - request.args.get('execution_date')) + log.execution_date = iso8601.parse_date( + request.args.get('execution_date'), settings.TIMEZONE) with create_session() as session: session.add(log) diff --git a/airflow/www/views.py b/airflow/www/views.py index 1191bde405..a6378bfb2e 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -21,7 +21,7 @@ import os import pkg_resources import socket from functools import wraps -from datetime import datetime, timedelta +from datetime import timedelta import dateutil.parser import copy import math @@ -72,6 +72,7 @@ from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS from airflow.models import BaseOperator from airflow.operators.subdag_operator import SubDagOperator +from airflow.utils import timezone from airflow.utils.json import json_ser from airflow.utils.state import State from airflow.utils.db import create_session, provide_session @@ -170,7 +171,7 @@ def duration_f(v, c, m, p): def datetime_f(v, c, m, p): attr = getattr(m, p) dttm = attr.isoformat() if attr else '' - if datetime.utcnow().isoformat()[:4] == dttm[:4]: + if timezone.utcnow().isoformat()[:4] == dttm[:4]: dttm = dttm[5:] return Markup("{}".format(dttm)) @@ -922,7 +923,7 @@ class Airflow(BaseView): flash("Cannot find dag {}".format(dag_id)) return redirect(origin) - execution_date = datetime.utcnow() + execution_date = timezone.utcnow() run_id = "manual__{0}".format(execution_date.isoformat()) dr = DagRun.find(dag_id=dag_id, run_id=run_id) @@ -1161,7 +1162,7 @@ class Airflow(BaseView): if base_date: base_date = dateutil.parser.parse(base_date) else: - base_date = dag.latest_execution_date or datetime.utcnow() + base_date = dag.latest_execution_date or timezone.utcnow() dates = dag.date_range(base_date, num=-abs(num_runs)) min_date = dates[0] if dates else datetime(2000, 1, 1) @@ -1217,7 +1218,7 @@ class Airflow(BaseView): def set_duration(tid): if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and tid["start_date"] is not None): - d = datetime.utcnow() - dateutil.parser.parse(tid["start_date"]) + d = timezone.utcnow() - dateutil.parser.parse(tid["start_date"]) tid["duration"] = d.total_seconds() return tid @@ -1314,7 +1315,7 @@ class Airflow(BaseView): if dttm: dttm = dateutil.parser.parse(dttm) else: - dttm = dag.latest_execution_date or datetime.utcnow().date() + dttm = dag.latest_execution_date or timezone.utcnow().date() DR = models.DagRun drs = ( @@ -1390,7 +1391,7 @@ class Airflow(BaseView): if base_date: base_date = dateutil.parser.parse(base_date) else: - base_date = dag.latest_execution_date or datetime.utcnow() + base_date = dag.latest_execution_date or timezone.utcnow() dates = dag.date_range(base_date, num=-abs(num_runs)) min_date = dates[0] if dates else datetime(2000, 1, 1) @@ -1497,7 +1498,7 @@ class Airflow(BaseView): if base_date: base_date = dateutil.parser.parse(base_date) else: - base_date = dag.latest_execution_date or datetime.utcnow() + base_date = dag.latest_execution_date or timezone.utcnow() dates = dag.date_range(base_date, num=-abs(num_runs)) min_date = dates[0] if dates else datetime(2000, 1, 1) @@ -1560,7 +1561,7 @@ class Airflow(BaseView): if base_date: base_date = dateutil.parser.parse(base_date) else: - base_date = dag.latest_execution_date or datetime.utcnow() + base_date = dag.latest_execution_date or timezone.utcnow() dates = dag.date_range(base_date, num=-abs(num_runs)) min_date = dates[0] if dates else datetime(2000, 1, 1) @@ -1651,7 +1652,7 @@ class Airflow(BaseView): DagModel).filter(DagModel.dag_id == dag_id).first() if orm_dag: - orm_dag.last_expired = datetime.utcnow() + orm_dag.last_expired = timezone.utcnow() session.merge(orm_dag) session.commit() @@ -1687,7 +1688,7 @@ class Airflow(BaseView): if dttm: dttm = dateutil.parser.parse(dttm) else: - dttm = dag.latest_execution_date or datetime.utcnow().date() + dttm = dag.latest_execution_date or timezone.utcnow().date() form = DateTimeForm(data={'execution_date': dttm}) @@ -1698,7 +1699,7 @@ class Airflow(BaseView): tasks = [] for ti in tis: - end_date = ti.end_date if ti.end_date else datetime.utcnow() + end_date = ti.end_date if ti.end_date else timezone.utcnow() tasks.append({ 'startDate': wwwutils.epoch(ti.start_date), 'endDate': wwwutils.epoch(end_date), @@ -2172,7 +2173,7 @@ class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView): model.iteration_no += 1 if not model.user_id and current_user and hasattr(current_user, 'id'): model.user_id = current_user.id - model.last_modified = datetime.utcnow() + model.last_modified = timezone.utcnow() chart_mapping = ( @@ -2433,9 +2434,9 @@ class DagRunModelView(ModelViewOnly): count += 1 dr.state = target_state if target_state == State.RUNNING: - dr.start_date = datetime.utcnow() + dr.start_date = timezone.utcnow() else: - dr.end_date = datetime.utcnow() + dr.end_date = timezone.utcnow() session.commit() models.DagStat.update(dirty_ids, session=session) flash( diff --git a/setup.py b/setup.py index 9408192d25..de2bd540f3 100644 --- a/setup.py +++ b/setup.py @@ -222,6 +222,7 @@ def do_setup(): 'future>=0.16.0, <0.17', 'gitpython>=2.0.2', 'gunicorn>=19.4.0, <20.0', + 'iso8601>=0.1.12', 'jinja2>=2.7.3, <2.9.0', 'lxml>=3.6.0, <4.0', 'markdown>=2.5.2, <3.0',