[AIRFLOW-1808] Convert all utcnow() to time zone aware
datetime.utcnow() does not set time zone information.
This commit is contained in:
Родитель
a47255fb2d
Коммит
c857436b75
|
@ -12,16 +12,16 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import datetime
|
|
||||||
|
|
||||||
from airflow.jobs import BackfillJob
|
from airflow.jobs import BackfillJob
|
||||||
from airflow.models import DagRun, TaskInstance
|
from airflow.models import DagRun, TaskInstance
|
||||||
from airflow.operators.subdag_operator import SubDagOperator
|
from airflow.operators.subdag_operator import SubDagOperator
|
||||||
from airflow.settings import Session
|
from airflow.settings import Session
|
||||||
|
from airflow.utils import timezone
|
||||||
from airflow.utils.state import State
|
from airflow.utils.state import State
|
||||||
|
|
||||||
from sqlalchemy import or_
|
from sqlalchemy import or_
|
||||||
|
|
||||||
|
|
||||||
def _create_dagruns(dag, execution_dates, state, run_id_template):
|
def _create_dagruns(dag, execution_dates, state, run_id_template):
|
||||||
"""
|
"""
|
||||||
Infers from the dates which dag runs need to be created and does so.
|
Infers from the dates which dag runs need to be created and does so.
|
||||||
|
@ -39,7 +39,7 @@ def _create_dagruns(dag, execution_dates, state, run_id_template):
|
||||||
dr = dag.create_dagrun(
|
dr = dag.create_dagrun(
|
||||||
run_id=run_id_template.format(date.isoformat()),
|
run_id=run_id_template.format(date.isoformat()),
|
||||||
execution_date=date,
|
execution_date=date,
|
||||||
start_date=datetime.datetime.utcnow(),
|
start_date=timezone.utcnow(),
|
||||||
external_trigger=False,
|
external_trigger=False,
|
||||||
state=state,
|
state=state,
|
||||||
)
|
)
|
||||||
|
@ -67,7 +67,7 @@ def set_state(task, execution_date, upstream=False, downstream=False,
|
||||||
:param commit: Commit tasks to be altered to the database
|
:param commit: Commit tasks to be altered to the database
|
||||||
:return: list of tasks that have been created and updated
|
:return: list of tasks that have been created and updated
|
||||||
"""
|
"""
|
||||||
assert isinstance(execution_date, datetime.datetime)
|
assert timezone.is_localized(execution_date)
|
||||||
|
|
||||||
# microseconds are supported by the database, but is not handled
|
# microseconds are supported by the database, but is not handled
|
||||||
# correctly by airflow on e.g. the filesystem and in other places
|
# correctly by airflow on e.g. the filesystem and in other places
|
||||||
|
@ -185,6 +185,7 @@ def set_state(task, execution_date, upstream=False, downstream=False,
|
||||||
|
|
||||||
return tis_altered
|
return tis_altered
|
||||||
|
|
||||||
|
|
||||||
def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
|
def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False):
|
||||||
"""
|
"""
|
||||||
Set the state of a dag run and all task instances associated with the dag
|
Set the state of a dag run and all task instances associated with the dag
|
||||||
|
|
|
@ -12,11 +12,11 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import datetime
|
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from airflow.exceptions import AirflowException
|
from airflow.exceptions import AirflowException
|
||||||
from airflow.models import DagRun, DagBag
|
from airflow.models import DagRun, DagBag
|
||||||
|
from airflow.utils import timezone
|
||||||
from airflow.utils.state import State
|
from airflow.utils.state import State
|
||||||
|
|
||||||
|
|
||||||
|
@ -29,9 +29,9 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
|
||||||
dag = dagbag.get_dag(dag_id)
|
dag = dagbag.get_dag(dag_id)
|
||||||
|
|
||||||
if not execution_date:
|
if not execution_date:
|
||||||
execution_date = datetime.datetime.utcnow()
|
execution_date = timezone.utcnow()
|
||||||
|
|
||||||
assert isinstance(execution_date, datetime.datetime)
|
assert timezone.is_localized(execution_date)
|
||||||
execution_date = execution_date.replace(microsecond=0)
|
execution_date = execution_date.replace(microsecond=0)
|
||||||
|
|
||||||
if not run_id:
|
if not run_id:
|
||||||
|
|
|
@ -28,8 +28,9 @@ import socket
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
import datetime
|
||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from datetime import datetime
|
|
||||||
from past.builtins import basestring
|
from past.builtins import basestring
|
||||||
from sqlalchemy import (
|
from sqlalchemy import (
|
||||||
Column, Integer, String, DateTime, func, Index, or_, and_, not_)
|
Column, Integer, String, DateTime, func, Index, or_, and_, not_)
|
||||||
|
@ -46,7 +47,7 @@ from airflow.models import DAG, DagRun
|
||||||
from airflow.settings import Stats
|
from airflow.settings import Stats
|
||||||
from airflow.task_runner import get_task_runner
|
from airflow.task_runner import get_task_runner
|
||||||
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
|
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
|
||||||
from airflow.utils import asciiart
|
from airflow.utils import asciiart, timezone
|
||||||
from airflow.utils.dag_processing import (AbstractDagFileProcessor,
|
from airflow.utils.dag_processing import (AbstractDagFileProcessor,
|
||||||
DagFileProcessorManager,
|
DagFileProcessorManager,
|
||||||
SimpleDag,
|
SimpleDag,
|
||||||
|
@ -100,22 +101,22 @@ class BaseJob(Base, LoggingMixin):
|
||||||
self.hostname = socket.getfqdn()
|
self.hostname = socket.getfqdn()
|
||||||
self.executor = executor
|
self.executor = executor
|
||||||
self.executor_class = executor.__class__.__name__
|
self.executor_class = executor.__class__.__name__
|
||||||
self.start_date = datetime.utcnow()
|
self.start_date = timezone.utcnow()
|
||||||
self.latest_heartbeat = datetime.utcnow()
|
self.latest_heartbeat = timezone.utcnow()
|
||||||
self.heartrate = heartrate
|
self.heartrate = heartrate
|
||||||
self.unixname = getpass.getuser()
|
self.unixname = getpass.getuser()
|
||||||
super(BaseJob, self).__init__(*args, **kwargs)
|
super(BaseJob, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
def is_alive(self):
|
def is_alive(self):
|
||||||
return (
|
return (
|
||||||
(datetime.utcnow() - self.latest_heartbeat).seconds <
|
(timezone.utcnow() - self.latest_heartbeat).seconds <
|
||||||
(conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1)
|
(conf.getint('scheduler', 'JOB_HEARTBEAT_SEC') * 2.1)
|
||||||
)
|
)
|
||||||
|
|
||||||
@provide_session
|
@provide_session
|
||||||
def kill(self, session=None):
|
def kill(self, session=None):
|
||||||
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
|
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
|
||||||
job.end_date = datetime.utcnow()
|
job.end_date = timezone.utcnow()
|
||||||
try:
|
try:
|
||||||
self.on_kill()
|
self.on_kill()
|
||||||
except:
|
except:
|
||||||
|
@ -165,14 +166,14 @@ class BaseJob(Base, LoggingMixin):
|
||||||
if job.latest_heartbeat:
|
if job.latest_heartbeat:
|
||||||
sleep_for = max(
|
sleep_for = max(
|
||||||
0,
|
0,
|
||||||
self.heartrate - (datetime.utcnow() - job.latest_heartbeat).total_seconds())
|
self.heartrate - (timezone.utcnow() - job.latest_heartbeat).total_seconds())
|
||||||
|
|
||||||
sleep(sleep_for)
|
sleep(sleep_for)
|
||||||
|
|
||||||
# Update last heartbeat time
|
# Update last heartbeat time
|
||||||
with create_session() as session:
|
with create_session() as session:
|
||||||
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
|
job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
|
||||||
job.latest_heartbeat = datetime.utcnow()
|
job.latest_heartbeat = timezone.utcnow()
|
||||||
session.merge(job)
|
session.merge(job)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
|
@ -194,7 +195,7 @@ class BaseJob(Base, LoggingMixin):
|
||||||
self._execute()
|
self._execute()
|
||||||
|
|
||||||
# Marking the success in the DB
|
# Marking the success in the DB
|
||||||
self.end_date = datetime.utcnow()
|
self.end_date = timezone.utcnow()
|
||||||
self.state = State.SUCCESS
|
self.state = State.SUCCESS
|
||||||
session.merge(self)
|
session.merge(self)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
@ -399,7 +400,7 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
|
||||||
self._pickle_dags,
|
self._pickle_dags,
|
||||||
self._dag_id_white_list,
|
self._dag_id_white_list,
|
||||||
"DagFileProcessor{}".format(self._instance_id))
|
"DagFileProcessor{}".format(self._instance_id))
|
||||||
self._start_time = datetime.utcnow()
|
self._start_time = timezone.utcnow()
|
||||||
|
|
||||||
def terminate(self, sigkill=False):
|
def terminate(self, sigkill=False):
|
||||||
"""
|
"""
|
||||||
|
@ -615,16 +616,16 @@ class SchedulerJob(BaseJob):
|
||||||
TI.execution_date == sq.c.max_ti,
|
TI.execution_date == sq.c.max_ti,
|
||||||
).all()
|
).all()
|
||||||
|
|
||||||
ts = datetime.utcnow()
|
ts = timezone.utcnow()
|
||||||
SlaMiss = models.SlaMiss
|
SlaMiss = models.SlaMiss
|
||||||
for ti in max_tis:
|
for ti in max_tis:
|
||||||
task = dag.get_task(ti.task_id)
|
task = dag.get_task(ti.task_id)
|
||||||
dttm = ti.execution_date
|
dttm = ti.execution_date
|
||||||
if task.sla:
|
if task.sla:
|
||||||
dttm = dag.following_schedule(dttm)
|
dttm = dag.following_schedule(dttm)
|
||||||
while dttm < datetime.utcnow():
|
while dttm < timezone.utcnow():
|
||||||
following_schedule = dag.following_schedule(dttm)
|
following_schedule = dag.following_schedule(dttm)
|
||||||
if following_schedule + task.sla < datetime.utcnow():
|
if following_schedule + task.sla < timezone.utcnow():
|
||||||
session.merge(models.SlaMiss(
|
session.merge(models.SlaMiss(
|
||||||
task_id=ti.task_id,
|
task_id=ti.task_id,
|
||||||
dag_id=ti.dag_id,
|
dag_id=ti.dag_id,
|
||||||
|
@ -772,9 +773,9 @@ class SchedulerJob(BaseJob):
|
||||||
for dr in active_runs:
|
for dr in active_runs:
|
||||||
if (
|
if (
|
||||||
dr.start_date and dag.dagrun_timeout and
|
dr.start_date and dag.dagrun_timeout and
|
||||||
dr.start_date < datetime.utcnow() - dag.dagrun_timeout):
|
dr.start_date < timezone.utcnow() - dag.dagrun_timeout):
|
||||||
dr.state = State.FAILED
|
dr.state = State.FAILED
|
||||||
dr.end_date = datetime.utcnow()
|
dr.end_date = timezone.utcnow()
|
||||||
timedout_runs += 1
|
timedout_runs += 1
|
||||||
session.commit()
|
session.commit()
|
||||||
if len(active_runs) - timedout_runs >= dag.max_active_runs:
|
if len(active_runs) - timedout_runs >= dag.max_active_runs:
|
||||||
|
@ -799,9 +800,9 @@ class SchedulerJob(BaseJob):
|
||||||
# don't do scheduler catchup for dag's that don't have dag.catchup = True
|
# don't do scheduler catchup for dag's that don't have dag.catchup = True
|
||||||
if not dag.catchup:
|
if not dag.catchup:
|
||||||
# The logic is that we move start_date up until
|
# The logic is that we move start_date up until
|
||||||
# one period before, so that datetime.utcnow() is AFTER
|
# one period before, so that timezone.utcnow() is AFTER
|
||||||
# the period end, and the job can be created...
|
# the period end, and the job can be created...
|
||||||
now = datetime.utcnow()
|
now = timezone.utcnow()
|
||||||
next_start = dag.following_schedule(now)
|
next_start = dag.following_schedule(now)
|
||||||
last_start = dag.previous_schedule(now)
|
last_start = dag.previous_schedule(now)
|
||||||
if next_start <= now:
|
if next_start <= now:
|
||||||
|
@ -847,7 +848,7 @@ class SchedulerJob(BaseJob):
|
||||||
)
|
)
|
||||||
|
|
||||||
# don't ever schedule in the future
|
# don't ever schedule in the future
|
||||||
if next_run_date > datetime.utcnow():
|
if next_run_date > timezone.utcnow():
|
||||||
return
|
return
|
||||||
|
|
||||||
# this structure is necessary to avoid a TypeError from concatenating
|
# this structure is necessary to avoid a TypeError from concatenating
|
||||||
|
@ -870,11 +871,11 @@ class SchedulerJob(BaseJob):
|
||||||
if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
|
if next_run_date and min_task_end_date and next_run_date > min_task_end_date:
|
||||||
return
|
return
|
||||||
|
|
||||||
if next_run_date and period_end and period_end <= datetime.utcnow():
|
if next_run_date and period_end and period_end <= timezone.utcnow():
|
||||||
next_run = dag.create_dagrun(
|
next_run = dag.create_dagrun(
|
||||||
run_id=DagRun.ID_PREFIX + next_run_date.isoformat(),
|
run_id=DagRun.ID_PREFIX + next_run_date.isoformat(),
|
||||||
execution_date=next_run_date,
|
execution_date=next_run_date,
|
||||||
start_date=datetime.utcnow(),
|
start_date=timezone.utcnow(),
|
||||||
state=State.RUNNING,
|
state=State.RUNNING,
|
||||||
external_trigger=False
|
external_trigger=False
|
||||||
)
|
)
|
||||||
|
@ -894,7 +895,7 @@ class SchedulerJob(BaseJob):
|
||||||
for run in dag_runs:
|
for run in dag_runs:
|
||||||
self.log.info("Examining DAG run %s", run)
|
self.log.info("Examining DAG run %s", run)
|
||||||
# don't consider runs that are executed in the future
|
# don't consider runs that are executed in the future
|
||||||
if run.execution_date > datetime.utcnow():
|
if run.execution_date > timezone.utcnow():
|
||||||
self.log.error(
|
self.log.error(
|
||||||
"Execution date is in future: %s",
|
"Execution date is in future: %s",
|
||||||
run.execution_date
|
run.execution_date
|
||||||
|
@ -1231,7 +1232,7 @@ class SchedulerJob(BaseJob):
|
||||||
# set TIs to queued state
|
# set TIs to queued state
|
||||||
for task_instance in tis_to_set_to_queued:
|
for task_instance in tis_to_set_to_queued:
|
||||||
task_instance.state = State.QUEUED
|
task_instance.state = State.QUEUED
|
||||||
task_instance.queued_dttm = (datetime.utcnow()
|
task_instance.queued_dttm = (timezone.utcnow()
|
||||||
if not task_instance.queued_dttm
|
if not task_instance.queued_dttm
|
||||||
else task_instance.queued_dttm)
|
else task_instance.queued_dttm)
|
||||||
session.merge(task_instance)
|
session.merge(task_instance)
|
||||||
|
@ -1468,7 +1469,7 @@ class SchedulerJob(BaseJob):
|
||||||
last_runtime = processor_manager.get_last_runtime(file_path)
|
last_runtime = processor_manager.get_last_runtime(file_path)
|
||||||
processor_pid = processor_manager.get_pid(file_path)
|
processor_pid = processor_manager.get_pid(file_path)
|
||||||
processor_start_time = processor_manager.get_start_time(file_path)
|
processor_start_time = processor_manager.get_start_time(file_path)
|
||||||
runtime = ((datetime.utcnow() - processor_start_time).total_seconds()
|
runtime = ((timezone.utcnow() - processor_start_time).total_seconds()
|
||||||
if processor_start_time else None)
|
if processor_start_time else None)
|
||||||
last_run = processor_manager.get_last_finish_time(file_path)
|
last_run = processor_manager.get_last_finish_time(file_path)
|
||||||
|
|
||||||
|
@ -1585,34 +1586,34 @@ class SchedulerJob(BaseJob):
|
||||||
self.log.info("Resetting orphaned tasks for active dag runs")
|
self.log.info("Resetting orphaned tasks for active dag runs")
|
||||||
self.reset_state_for_orphaned_tasks()
|
self.reset_state_for_orphaned_tasks()
|
||||||
|
|
||||||
execute_start_time = datetime.utcnow()
|
execute_start_time = timezone.utcnow()
|
||||||
|
|
||||||
# Last time stats were printed
|
# Last time stats were printed
|
||||||
last_stat_print_time = datetime(2000, 1, 1)
|
last_stat_print_time = datetime.datetime(2000, 1, 1, tzinfo=timezone.utc)
|
||||||
# Last time that self.heartbeat() was called.
|
# Last time that self.heartbeat() was called.
|
||||||
last_self_heartbeat_time = datetime.utcnow()
|
last_self_heartbeat_time = timezone.utcnow()
|
||||||
# Last time that the DAG dir was traversed to look for files
|
# Last time that the DAG dir was traversed to look for files
|
||||||
last_dag_dir_refresh_time = datetime.utcnow()
|
last_dag_dir_refresh_time = timezone.utcnow()
|
||||||
|
|
||||||
# Use this value initially
|
# Use this value initially
|
||||||
known_file_paths = processor_manager.file_paths
|
known_file_paths = processor_manager.file_paths
|
||||||
|
|
||||||
# For the execute duration, parse and schedule DAGs
|
# For the execute duration, parse and schedule DAGs
|
||||||
while (datetime.utcnow() - execute_start_time).total_seconds() < \
|
while (timezone.utcnow() - execute_start_time).total_seconds() < \
|
||||||
self.run_duration or self.run_duration < 0:
|
self.run_duration or self.run_duration < 0:
|
||||||
self.log.debug("Starting Loop...")
|
self.log.debug("Starting Loop...")
|
||||||
loop_start_time = time.time()
|
loop_start_time = time.time()
|
||||||
|
|
||||||
# Traverse the DAG directory for Python files containing DAGs
|
# Traverse the DAG directory for Python files containing DAGs
|
||||||
# periodically
|
# periodically
|
||||||
elapsed_time_since_refresh = (datetime.utcnow() -
|
elapsed_time_since_refresh = (timezone.utcnow() -
|
||||||
last_dag_dir_refresh_time).total_seconds()
|
last_dag_dir_refresh_time).total_seconds()
|
||||||
|
|
||||||
if elapsed_time_since_refresh > self.dag_dir_list_interval:
|
if elapsed_time_since_refresh > self.dag_dir_list_interval:
|
||||||
# Build up a list of Python files that could contain DAGs
|
# Build up a list of Python files that could contain DAGs
|
||||||
self.log.info("Searching for files in %s", self.subdir)
|
self.log.info("Searching for files in %s", self.subdir)
|
||||||
known_file_paths = list_py_file_paths(self.subdir)
|
known_file_paths = list_py_file_paths(self.subdir)
|
||||||
last_dag_dir_refresh_time = datetime.utcnow()
|
last_dag_dir_refresh_time = timezone.utcnow()
|
||||||
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
|
self.log.info("There are %s files in %s", len(known_file_paths), self.subdir)
|
||||||
processor_manager.set_file_paths(known_file_paths)
|
processor_manager.set_file_paths(known_file_paths)
|
||||||
|
|
||||||
|
@ -1662,20 +1663,20 @@ class SchedulerJob(BaseJob):
|
||||||
self._process_executor_events(simple_dag_bag)
|
self._process_executor_events(simple_dag_bag)
|
||||||
|
|
||||||
# Heartbeat the scheduler periodically
|
# Heartbeat the scheduler periodically
|
||||||
time_since_last_heartbeat = (datetime.utcnow() -
|
time_since_last_heartbeat = (timezone.utcnow() -
|
||||||
last_self_heartbeat_time).total_seconds()
|
last_self_heartbeat_time).total_seconds()
|
||||||
if time_since_last_heartbeat > self.heartrate:
|
if time_since_last_heartbeat > self.heartrate:
|
||||||
self.log.info("Heartbeating the scheduler")
|
self.log.info("Heartbeating the scheduler")
|
||||||
self.heartbeat()
|
self.heartbeat()
|
||||||
last_self_heartbeat_time = datetime.utcnow()
|
last_self_heartbeat_time = timezone.utcnow()
|
||||||
|
|
||||||
# Occasionally print out stats about how fast the files are getting processed
|
# Occasionally print out stats about how fast the files are getting processed
|
||||||
if ((datetime.utcnow() - last_stat_print_time).total_seconds() >
|
if ((timezone.utcnow() - last_stat_print_time).total_seconds() >
|
||||||
self.print_stats_interval):
|
self.print_stats_interval):
|
||||||
if len(known_file_paths) > 0:
|
if len(known_file_paths) > 0:
|
||||||
self._log_file_processing_stats(known_file_paths,
|
self._log_file_processing_stats(known_file_paths,
|
||||||
processor_manager)
|
processor_manager)
|
||||||
last_stat_print_time = datetime.utcnow()
|
last_stat_print_time = timezone.utcnow()
|
||||||
|
|
||||||
loop_end_time = time.time()
|
loop_end_time = time.time()
|
||||||
self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
|
self.log.debug("Ran scheduling loop in %.2f seconds", loop_end_time - loop_start_time)
|
||||||
|
@ -2049,7 +2050,7 @@ class BackfillJob(BaseJob):
|
||||||
run = run or self.dag.create_dagrun(
|
run = run or self.dag.create_dagrun(
|
||||||
run_id=run_id,
|
run_id=run_id,
|
||||||
execution_date=run_date,
|
execution_date=run_date,
|
||||||
start_date=datetime.utcnow(),
|
start_date=timezone.utcnow(),
|
||||||
state=State.RUNNING,
|
state=State.RUNNING,
|
||||||
external_trigger=False,
|
external_trigger=False,
|
||||||
session=session
|
session=session
|
||||||
|
|
|
@ -24,7 +24,8 @@ from builtins import str
|
||||||
from builtins import object, bytes
|
from builtins import object, bytes
|
||||||
import copy
|
import copy
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from datetime import datetime, timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
import dill
|
import dill
|
||||||
import functools
|
import functools
|
||||||
import getpass
|
import getpass
|
||||||
|
@ -69,6 +70,7 @@ from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
|
||||||
from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
|
from airflow.ti_deps.deps.task_concurrency_dep import TaskConcurrencyDep
|
||||||
|
|
||||||
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
|
from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
|
||||||
|
from airflow.utils import timezone
|
||||||
from airflow.utils.dates import cron_presets, date_range as utils_date_range
|
from airflow.utils.dates import cron_presets, date_range as utils_date_range
|
||||||
from airflow.utils.db import provide_session
|
from airflow.utils.db import provide_session
|
||||||
from airflow.utils.decorators import apply_defaults
|
from airflow.utils.decorators import apply_defaults
|
||||||
|
@ -154,7 +156,7 @@ def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
|
||||||
).all()
|
).all()
|
||||||
for dr in drs:
|
for dr in drs:
|
||||||
dr.state = State.RUNNING
|
dr.state = State.RUNNING
|
||||||
dr.start_date = datetime.utcnow()
|
dr.start_date = timezone.utcnow()
|
||||||
|
|
||||||
|
|
||||||
class DagBag(BaseDagBag, LoggingMixin):
|
class DagBag(BaseDagBag, LoggingMixin):
|
||||||
|
@ -341,7 +343,7 @@ class DagBag(BaseDagBag, LoggingMixin):
|
||||||
self.log.info("Finding 'running' jobs without a recent heartbeat")
|
self.log.info("Finding 'running' jobs without a recent heartbeat")
|
||||||
TI = TaskInstance
|
TI = TaskInstance
|
||||||
secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold')
|
secs = configuration.getint('scheduler', 'scheduler_zombie_task_threshold')
|
||||||
limit_dttm = datetime.utcnow() - timedelta(seconds=secs)
|
limit_dttm = timezone.utcnow() - timedelta(seconds=secs)
|
||||||
self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
|
self.log.info("Failing jobs without heartbeat after %s", limit_dttm)
|
||||||
|
|
||||||
tis = (
|
tis = (
|
||||||
|
@ -373,7 +375,7 @@ class DagBag(BaseDagBag, LoggingMixin):
|
||||||
"""
|
"""
|
||||||
self.dags[dag.dag_id] = dag
|
self.dags[dag.dag_id] = dag
|
||||||
dag.resolve_template_files()
|
dag.resolve_template_files()
|
||||||
dag.last_loaded = datetime.utcnow()
|
dag.last_loaded = timezone.utcnow()
|
||||||
|
|
||||||
for task in dag.tasks:
|
for task in dag.tasks:
|
||||||
settings.policy(task)
|
settings.policy(task)
|
||||||
|
@ -398,7 +400,7 @@ class DagBag(BaseDagBag, LoggingMixin):
|
||||||
ignoring files that match any of the regex patterns specified
|
ignoring files that match any of the regex patterns specified
|
||||||
in the file.
|
in the file.
|
||||||
"""
|
"""
|
||||||
start_dttm = datetime.utcnow()
|
start_dttm = timezone.utcnow()
|
||||||
dag_folder = dag_folder or self.dag_folder
|
dag_folder = dag_folder or self.dag_folder
|
||||||
|
|
||||||
# Used to store stats around DagBag processing
|
# Used to store stats around DagBag processing
|
||||||
|
@ -426,11 +428,11 @@ class DagBag(BaseDagBag, LoggingMixin):
|
||||||
continue
|
continue
|
||||||
if not any(
|
if not any(
|
||||||
[re.findall(p, filepath) for p in patterns]):
|
[re.findall(p, filepath) for p in patterns]):
|
||||||
ts = datetime.utcnow()
|
ts = timezone.utcnow()
|
||||||
found_dags = self.process_file(
|
found_dags = self.process_file(
|
||||||
filepath, only_if_updated=only_if_updated)
|
filepath, only_if_updated=only_if_updated)
|
||||||
|
|
||||||
td = datetime.utcnow() - ts
|
td = timezone.utcnow() - ts
|
||||||
td = td.total_seconds() + (
|
td = td.total_seconds() + (
|
||||||
float(td.microseconds) / 1000000)
|
float(td.microseconds) / 1000000)
|
||||||
stats.append(FileLoadStat(
|
stats.append(FileLoadStat(
|
||||||
|
@ -443,7 +445,7 @@ class DagBag(BaseDagBag, LoggingMixin):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.exception(e)
|
self.log.exception(e)
|
||||||
Stats.gauge(
|
Stats.gauge(
|
||||||
'collect_dags', (datetime.utcnow() - start_dttm).total_seconds(), 1)
|
'collect_dags', (timezone.utcnow() - start_dttm).total_seconds(), 1)
|
||||||
Stats.gauge(
|
Stats.gauge(
|
||||||
'dagbag_size', len(self.dags), 1)
|
'dagbag_size', len(self.dags), 1)
|
||||||
Stats.gauge(
|
Stats.gauge(
|
||||||
|
@ -1065,8 +1067,8 @@ class TaskInstance(Base, LoggingMixin):
|
||||||
@provide_session
|
@provide_session
|
||||||
def set_state(self, state, session=None):
|
def set_state(self, state, session=None):
|
||||||
self.state = state
|
self.state = state
|
||||||
self.start_date = datetime.utcnow()
|
self.start_date = timezone.utcnow()
|
||||||
self.end_date = datetime.utcnow()
|
self.end_date = timezone.utcnow()
|
||||||
session.merge(self)
|
session.merge(self)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
|
@ -1231,7 +1233,7 @@ class TaskInstance(Base, LoggingMixin):
|
||||||
to be retried.
|
to be retried.
|
||||||
"""
|
"""
|
||||||
return (self.state == State.UP_FOR_RETRY and
|
return (self.state == State.UP_FOR_RETRY and
|
||||||
self.next_retry_datetime() < datetime.utcnow())
|
self.next_retry_datetime() < timezone.utcnow())
|
||||||
|
|
||||||
@provide_session
|
@provide_session
|
||||||
def pool_full(self, session):
|
def pool_full(self, session):
|
||||||
|
@ -1339,7 +1341,7 @@ class TaskInstance(Base, LoggingMixin):
|
||||||
msg = "Starting attempt {attempt} of {total}".format(
|
msg = "Starting attempt {attempt} of {total}".format(
|
||||||
attempt=self.try_number + 1,
|
attempt=self.try_number + 1,
|
||||||
total=self.max_tries + 1)
|
total=self.max_tries + 1)
|
||||||
self.start_date = datetime.utcnow()
|
self.start_date = timezone.utcnow()
|
||||||
|
|
||||||
dep_context = DepContext(
|
dep_context = DepContext(
|
||||||
deps=RUN_DEPS - QUEUE_DEPS,
|
deps=RUN_DEPS - QUEUE_DEPS,
|
||||||
|
@ -1363,7 +1365,7 @@ class TaskInstance(Base, LoggingMixin):
|
||||||
total=self.max_tries + 1)
|
total=self.max_tries + 1)
|
||||||
self.log.warning(hr + msg + hr)
|
self.log.warning(hr + msg + hr)
|
||||||
|
|
||||||
self.queued_dttm = datetime.utcnow()
|
self.queued_dttm = timezone.utcnow()
|
||||||
self.log.info("Queuing into pool %s", self.pool)
|
self.log.info("Queuing into pool %s", self.pool)
|
||||||
session.merge(self)
|
session.merge(self)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
@ -1508,7 +1510,7 @@ class TaskInstance(Base, LoggingMixin):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# Recording SUCCESS
|
# Recording SUCCESS
|
||||||
self.end_date = datetime.utcnow()
|
self.end_date = timezone.utcnow()
|
||||||
self.set_duration()
|
self.set_duration()
|
||||||
if not test_mode:
|
if not test_mode:
|
||||||
session.add(Log(self.state, self))
|
session.add(Log(self.state, self))
|
||||||
|
@ -1569,7 +1571,7 @@ class TaskInstance(Base, LoggingMixin):
|
||||||
def handle_failure(self, error, test_mode=False, context=None, session=None):
|
def handle_failure(self, error, test_mode=False, context=None, session=None):
|
||||||
self.log.exception(error)
|
self.log.exception(error)
|
||||||
task = self.task
|
task = self.task
|
||||||
self.end_date = datetime.utcnow()
|
self.end_date = timezone.utcnow()
|
||||||
self.set_duration()
|
self.set_duration()
|
||||||
Stats.incr('operator_failures_{}'.format(task.__class__.__name__), 1, 1)
|
Stats.incr('operator_failures_{}'.format(task.__class__.__name__), 1, 1)
|
||||||
Stats.incr('ti_failures')
|
Stats.incr('ti_failures')
|
||||||
|
@ -1891,7 +1893,7 @@ class Log(Base):
|
||||||
extra = Column(Text)
|
extra = Column(Text)
|
||||||
|
|
||||||
def __init__(self, event, task_instance, owner=None, extra=None, **kwargs):
|
def __init__(self, event, task_instance, owner=None, extra=None, **kwargs):
|
||||||
self.dttm = datetime.utcnow()
|
self.dttm = timezone.utcnow()
|
||||||
self.event = event
|
self.event = event
|
||||||
self.extra = extra
|
self.extra = extra
|
||||||
|
|
||||||
|
@ -1929,7 +1931,7 @@ class SkipMixin(LoggingMixin):
|
||||||
return
|
return
|
||||||
|
|
||||||
task_ids = [d.task_id for d in tasks]
|
task_ids = [d.task_id for d in tasks]
|
||||||
now = datetime.utcnow()
|
now = timezone.utcnow()
|
||||||
|
|
||||||
if dag_run:
|
if dag_run:
|
||||||
session.query(TaskInstance).filter(
|
session.query(TaskInstance).filter(
|
||||||
|
@ -2544,7 +2546,7 @@ class BaseOperator(LoggingMixin):
|
||||||
range.
|
range.
|
||||||
"""
|
"""
|
||||||
TI = TaskInstance
|
TI = TaskInstance
|
||||||
end_date = end_date or datetime.utcnow()
|
end_date = end_date or timezone.utcnow()
|
||||||
return session.query(TI).filter(
|
return session.query(TI).filter(
|
||||||
TI.dag_id == self.dag_id,
|
TI.dag_id == self.dag_id,
|
||||||
TI.task_id == self.task_id,
|
TI.task_id == self.task_id,
|
||||||
|
@ -2591,7 +2593,7 @@ class BaseOperator(LoggingMixin):
|
||||||
Run a set of task instances for a date range.
|
Run a set of task instances for a date range.
|
||||||
"""
|
"""
|
||||||
start_date = start_date or self.start_date
|
start_date = start_date or self.start_date
|
||||||
end_date = end_date or self.end_date or datetime.utcnow()
|
end_date = end_date or self.end_date or timezone.utcnow()
|
||||||
|
|
||||||
for dt in self.dag.date_range(start_date, end_date=end_date):
|
for dt in self.dag.date_range(start_date, end_date=end_date):
|
||||||
TaskInstance(self, dt).run(
|
TaskInstance(self, dt).run(
|
||||||
|
@ -2883,8 +2885,28 @@ class DAG(BaseDag, LoggingMixin):
|
||||||
# set file location to caller source path
|
# set file location to caller source path
|
||||||
self.fileloc = sys._getframe().f_back.f_code.co_filename
|
self.fileloc = sys._getframe().f_back.f_code.co_filename
|
||||||
self.task_dict = dict()
|
self.task_dict = dict()
|
||||||
self.start_date = start_date
|
|
||||||
self.end_date = end_date
|
# set timezone
|
||||||
|
if start_date and start_date.tzinfo:
|
||||||
|
self.timezone = start_date.tzinfo
|
||||||
|
elif 'start_date' in self.default_args and self.default_args['start_date'].tzinfo:
|
||||||
|
self.timezone = self.default_args['start_date'].tzinfo
|
||||||
|
else:
|
||||||
|
self.timezone = settings.TIMEZONE
|
||||||
|
|
||||||
|
self.start_date = timezone.convert_to_utc(start_date)
|
||||||
|
self.end_date = timezone.convert_to_utc(end_date)
|
||||||
|
|
||||||
|
# also convert tasks
|
||||||
|
if 'start_date' in self.default_args:
|
||||||
|
self.default_args['start_date'] = (
|
||||||
|
timezone.convert_to_utc(self.default_args['start_date'])
|
||||||
|
)
|
||||||
|
if 'end_date' in self.default_args:
|
||||||
|
self.default_args['end_date'] = (
|
||||||
|
timezone.convert_to_utc(self.default_args['end_date'])
|
||||||
|
)
|
||||||
|
|
||||||
self.schedule_interval = schedule_interval
|
self.schedule_interval = schedule_interval
|
||||||
if schedule_interval in cron_presets:
|
if schedule_interval in cron_presets:
|
||||||
self._schedule_interval = cron_presets.get(schedule_interval)
|
self._schedule_interval = cron_presets.get(schedule_interval)
|
||||||
|
@ -2896,7 +2918,7 @@ class DAG(BaseDag, LoggingMixin):
|
||||||
template_searchpath = [template_searchpath]
|
template_searchpath = [template_searchpath]
|
||||||
self.template_searchpath = template_searchpath
|
self.template_searchpath = template_searchpath
|
||||||
self.parent_dag = None # Gets set when DAGs are loaded
|
self.parent_dag = None # Gets set when DAGs are loaded
|
||||||
self.last_loaded = datetime.utcnow()
|
self.last_loaded = timezone.utcnow()
|
||||||
self.safe_dag_id = dag_id.replace('.', '__dot__')
|
self.safe_dag_id = dag_id.replace('.', '__dot__')
|
||||||
self.max_active_runs = max_active_runs
|
self.max_active_runs = max_active_runs
|
||||||
self.dagrun_timeout = dagrun_timeout
|
self.dagrun_timeout = dagrun_timeout
|
||||||
|
@ -2965,7 +2987,7 @@ class DAG(BaseDag, LoggingMixin):
|
||||||
|
|
||||||
# /Context Manager ----------------------------------------------
|
# /Context Manager ----------------------------------------------
|
||||||
|
|
||||||
def date_range(self, start_date, num=None, end_date=datetime.utcnow()):
|
def date_range(self, start_date, num=None, end_date=timezone.utcnow()):
|
||||||
if num:
|
if num:
|
||||||
end_date = None
|
end_date = None
|
||||||
return utils_date_range(
|
return utils_date_range(
|
||||||
|
@ -2993,7 +3015,7 @@ class DAG(BaseDag, LoggingMixin):
|
||||||
|
|
||||||
:param start_date: the start date of the interval
|
:param start_date: the start date of the interval
|
||||||
:type start_date: datetime
|
:type start_date: datetime
|
||||||
:param end_date: the end date of the interval, defaults to datetime.utcnow()
|
:param end_date: the end date of the interval, defaults to timezone.utcnow()
|
||||||
:type end_date: datetime
|
:type end_date: datetime
|
||||||
:return: a list of dates within the interval following the dag's schedule
|
:return: a list of dates within the interval following the dag's schedule
|
||||||
:rtype: list
|
:rtype: list
|
||||||
|
@ -3005,7 +3027,7 @@ class DAG(BaseDag, LoggingMixin):
|
||||||
|
|
||||||
# dates for dag runs
|
# dates for dag runs
|
||||||
using_start_date = using_start_date or min([t.start_date for t in self.tasks])
|
using_start_date = using_start_date or min([t.start_date for t in self.tasks])
|
||||||
using_end_date = using_end_date or datetime.utcnow()
|
using_end_date = using_end_date or timezone.utcnow()
|
||||||
|
|
||||||
# next run date for a subdag isn't relevant (schedule_interval for subdags
|
# next run date for a subdag isn't relevant (schedule_interval for subdags
|
||||||
# is ignored) so we use the dag run's start date in the case of a subdag
|
# is ignored) so we use the dag run's start date in the case of a subdag
|
||||||
|
@ -3274,9 +3296,9 @@ class DAG(BaseDag, LoggingMixin):
|
||||||
self, session, start_date=None, end_date=None, state=None):
|
self, session, start_date=None, end_date=None, state=None):
|
||||||
TI = TaskInstance
|
TI = TaskInstance
|
||||||
if not start_date:
|
if not start_date:
|
||||||
start_date = (datetime.utcnow() - timedelta(30)).date()
|
start_date = (timezone.utcnow() - timedelta(30)).date()
|
||||||
start_date = datetime.combine(start_date, datetime.min.time())
|
start_date = datetime.combine(start_date, datetime.min.time())
|
||||||
end_date = end_date or datetime.utcnow()
|
end_date = end_date or timezone.utcnow()
|
||||||
tis = session.query(TI).filter(
|
tis = session.query(TI).filter(
|
||||||
TI.dag_id == self.dag_id,
|
TI.dag_id == self.dag_id,
|
||||||
TI.execution_date >= start_date,
|
TI.execution_date >= start_date,
|
||||||
|
@ -3536,10 +3558,10 @@ class DAG(BaseDag, LoggingMixin):
|
||||||
d = {}
|
d = {}
|
||||||
d['is_picklable'] = True
|
d['is_picklable'] = True
|
||||||
try:
|
try:
|
||||||
dttm = datetime.utcnow()
|
dttm = timezone.utcnow()
|
||||||
pickled = pickle.dumps(self)
|
pickled = pickle.dumps(self)
|
||||||
d['pickle_len'] = len(pickled)
|
d['pickle_len'] = len(pickled)
|
||||||
d['pickling_duration'] = "{}".format(datetime.utcnow() - dttm)
|
d['pickling_duration'] = "{}".format(timezone.utcnow() - dttm)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.debug(e)
|
self.log.debug(e)
|
||||||
d['is_picklable'] = False
|
d['is_picklable'] = False
|
||||||
|
@ -3557,7 +3579,7 @@ class DAG(BaseDag, LoggingMixin):
|
||||||
if not dp or dp.pickle != self:
|
if not dp or dp.pickle != self:
|
||||||
dp = DagPickle(dag=self)
|
dp = DagPickle(dag=self)
|
||||||
session.add(dp)
|
session.add(dp)
|
||||||
self.last_pickled = datetime.utcnow()
|
self.last_pickled = timezone.utcnow()
|
||||||
session.commit()
|
session.commit()
|
||||||
self.pickle_id = dp.id
|
self.pickle_id = dp.id
|
||||||
|
|
||||||
|
@ -3773,7 +3795,7 @@ class DAG(BaseDag, LoggingMixin):
|
||||||
if owner is None:
|
if owner is None:
|
||||||
owner = self.owner
|
owner = self.owner
|
||||||
if sync_time is None:
|
if sync_time is None:
|
||||||
sync_time = datetime.utcnow()
|
sync_time = timezone.utcnow()
|
||||||
|
|
||||||
orm_dag = session.query(
|
orm_dag = session.query(
|
||||||
DagModel).filter(DagModel.dag_id == self.dag_id).first()
|
DagModel).filter(DagModel.dag_id == self.dag_id).first()
|
||||||
|
@ -4566,7 +4588,7 @@ class DagRun(Base, LoggingMixin):
|
||||||
|
|
||||||
# pre-calculate
|
# pre-calculate
|
||||||
# db is faster
|
# db is faster
|
||||||
start_dttm = datetime.utcnow()
|
start_dttm = timezone.utcnow()
|
||||||
unfinished_tasks = self.get_task_instances(
|
unfinished_tasks = self.get_task_instances(
|
||||||
state=State.unfinished(),
|
state=State.unfinished(),
|
||||||
session=session
|
session=session
|
||||||
|
@ -4590,7 +4612,7 @@ class DagRun(Base, LoggingMixin):
|
||||||
no_dependencies_met = False
|
no_dependencies_met = False
|
||||||
break
|
break
|
||||||
|
|
||||||
duration = (datetime.utcnow() - start_dttm).total_seconds() * 1000
|
duration = (timezone.utcnow() - start_dttm).total_seconds() * 1000
|
||||||
Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration)
|
Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), duration)
|
||||||
|
|
||||||
# future: remove the check on adhoc tasks (=active_tasks)
|
# future: remove the check on adhoc tasks (=active_tasks)
|
||||||
|
|
|
@ -12,9 +12,8 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from airflow.models import BaseOperator, DagBag
|
from airflow.models import BaseOperator, DagBag
|
||||||
|
from airflow.utils import timezone
|
||||||
from airflow.utils.db import create_session
|
from airflow.utils.db import create_session
|
||||||
from airflow.utils.decorators import apply_defaults
|
from airflow.utils.decorators import apply_defaults
|
||||||
from airflow.utils.state import State
|
from airflow.utils.state import State
|
||||||
|
@ -59,7 +58,7 @@ class TriggerDagRunOperator(BaseOperator):
|
||||||
self.trigger_dag_id = trigger_dag_id
|
self.trigger_dag_id = trigger_dag_id
|
||||||
|
|
||||||
def execute(self, context):
|
def execute(self, context):
|
||||||
dro = DagRunOrder(run_id='trig__' + datetime.utcnow().isoformat())
|
dro = DagRunOrder(run_id='trig__' + timezone.utcnow().isoformat())
|
||||||
dro = self.python_callable(context, dro)
|
dro = self.python_callable(context, dro)
|
||||||
if dro:
|
if dro:
|
||||||
with create_session() as session:
|
with create_session() as session:
|
||||||
|
|
|
@ -12,9 +12,9 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import datetime
|
|
||||||
|
|
||||||
from airflow.models import BaseOperator, SkipMixin
|
from airflow.models import BaseOperator, SkipMixin
|
||||||
|
from airflow.utils import timezone
|
||||||
|
|
||||||
|
|
||||||
class LatestOnlyOperator(BaseOperator, SkipMixin):
|
class LatestOnlyOperator(BaseOperator, SkipMixin):
|
||||||
|
@ -35,7 +35,7 @@ class LatestOnlyOperator(BaseOperator, SkipMixin):
|
||||||
self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
|
self.log.info("Externally triggered DAG_Run: allowing execution to proceed.")
|
||||||
return
|
return
|
||||||
|
|
||||||
now = datetime.datetime.utcnow()
|
now = timezone.utcnow()
|
||||||
left_window = context['dag'].following_schedule(
|
left_window = context['dag'].following_schedule(
|
||||||
context['execution_date'])
|
context['execution_date'])
|
||||||
right_window = context['dag'].following_schedule(left_window)
|
right_window = context['dag'].following_schedule(left_window)
|
||||||
|
|
|
@ -21,7 +21,7 @@ standard_library.install_aliases()
|
||||||
from builtins import str
|
from builtins import str
|
||||||
from past.builtins import basestring
|
from past.builtins import basestring
|
||||||
|
|
||||||
from datetime import datetime
|
from airflow.utils import timezone
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
from time import sleep
|
from time import sleep
|
||||||
import re
|
import re
|
||||||
|
@ -75,9 +75,9 @@ class BaseSensorOperator(BaseOperator):
|
||||||
raise AirflowException('Override me.')
|
raise AirflowException('Override me.')
|
||||||
|
|
||||||
def execute(self, context):
|
def execute(self, context):
|
||||||
started_at = datetime.utcnow()
|
started_at = timezone.utcnow()
|
||||||
while not self.poke(context):
|
while not self.poke(context):
|
||||||
if (datetime.utcnow() - started_at).total_seconds() > self.timeout:
|
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
|
||||||
if self.soft_fail:
|
if self.soft_fail:
|
||||||
raise AirflowSkipException('Snap. Time is OUT.')
|
raise AirflowSkipException('Snap. Time is OUT.')
|
||||||
else:
|
else:
|
||||||
|
@ -602,7 +602,7 @@ class TimeSensor(BaseSensorOperator):
|
||||||
|
|
||||||
def poke(self, context):
|
def poke(self, context):
|
||||||
self.log.info('Checking if the time (%s) has come', self.target_time)
|
self.log.info('Checking if the time (%s) has come', self.target_time)
|
||||||
return datetime.utcnow().time() > self.target_time
|
return timezone.utcnow().time() > self.target_time
|
||||||
|
|
||||||
|
|
||||||
class TimeDeltaSensor(BaseSensorOperator):
|
class TimeDeltaSensor(BaseSensorOperator):
|
||||||
|
@ -627,7 +627,7 @@ class TimeDeltaSensor(BaseSensorOperator):
|
||||||
target_dttm = dag.following_schedule(context['execution_date'])
|
target_dttm = dag.following_schedule(context['execution_date'])
|
||||||
target_dttm += self.delta
|
target_dttm += self.delta
|
||||||
self.log.info('Checking if the time (%s) has come', target_dttm)
|
self.log.info('Checking if the time (%s) has come', target_dttm)
|
||||||
return datetime.utcnow() > target_dttm
|
return timezone.utcnow() > target_dttm
|
||||||
|
|
||||||
|
|
||||||
class HttpSensor(BaseSensorOperator):
|
class HttpSensor(BaseSensorOperator):
|
||||||
|
|
|
@ -11,9 +11,9 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
|
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
|
||||||
|
from airflow.utils import timezone
|
||||||
from airflow.utils.db import provide_session
|
from airflow.utils.db import provide_session
|
||||||
from airflow.utils.state import State
|
from airflow.utils.state import State
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ class NotInRetryPeriodDep(BaseTIDep):
|
||||||
|
|
||||||
# Calculate the date first so that it is always smaller than the timestamp used by
|
# Calculate the date first so that it is always smaller than the timestamp used by
|
||||||
# ready_for_retry
|
# ready_for_retry
|
||||||
cur_date = datetime.utcnow()
|
cur_date = timezone.utcnow()
|
||||||
next_task_retry_date = ti.next_retry_datetime()
|
next_task_retry_date = ti.next_retry_datetime()
|
||||||
if ti.is_premature:
|
if ti.is_premature:
|
||||||
yield self._failing_status(
|
yield self._failing_status(
|
||||||
|
|
|
@ -11,9 +11,9 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
|
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
|
||||||
|
from airflow.utils import timezone
|
||||||
from airflow.utils.db import provide_session
|
from airflow.utils.db import provide_session
|
||||||
|
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ class RunnableExecDateDep(BaseTIDep):
|
||||||
|
|
||||||
@provide_session
|
@provide_session
|
||||||
def _get_dep_statuses(self, ti, session, dep_context):
|
def _get_dep_statuses(self, ti, session, dep_context):
|
||||||
cur_date = datetime.utcnow()
|
cur_date = timezone.utcnow()
|
||||||
|
|
||||||
if ti.execution_date > cur_date:
|
if ti.execution_date > cur_date:
|
||||||
yield self._failing_status(
|
yield self._failing_status(
|
||||||
|
|
|
@ -23,10 +23,10 @@ import time
|
||||||
import zipfile
|
import zipfile
|
||||||
from abc import ABCMeta, abstractmethod
|
from abc import ABCMeta, abstractmethod
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from airflow.dag.base_dag import BaseDag, BaseDagBag
|
from airflow.dag.base_dag import BaseDag, BaseDagBag
|
||||||
from airflow.exceptions import AirflowException
|
from airflow.exceptions import AirflowException
|
||||||
|
from airflow.utils import timezone
|
||||||
from airflow.utils.log.logging_mixin import LoggingMixin
|
from airflow.utils.log.logging_mixin import LoggingMixin
|
||||||
|
|
||||||
|
|
||||||
|
@ -376,7 +376,7 @@ class DagFileProcessorManager(LoggingMixin):
|
||||||
being processed
|
being processed
|
||||||
"""
|
"""
|
||||||
if file_path in self._processors:
|
if file_path in self._processors:
|
||||||
return (datetime.utcnow() - self._processors[file_path].start_time)\
|
return (timezone.utcnow() - self._processors[file_path].start_time)\
|
||||||
.total_seconds()
|
.total_seconds()
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -466,7 +466,7 @@ class DagFileProcessorManager(LoggingMixin):
|
||||||
for file_path, processor in self._processors.items():
|
for file_path, processor in self._processors.items():
|
||||||
if processor.done:
|
if processor.done:
|
||||||
self.log.info("Processor for %s finished", file_path)
|
self.log.info("Processor for %s finished", file_path)
|
||||||
now = datetime.utcnow()
|
now = timezone.utcnow()
|
||||||
finished_processors[file_path] = processor
|
finished_processors[file_path] = processor
|
||||||
self._last_runtime[file_path] = (now -
|
self._last_runtime[file_path] = (now -
|
||||||
processor.start_time).total_seconds()
|
processor.start_time).total_seconds()
|
||||||
|
@ -494,7 +494,7 @@ class DagFileProcessorManager(LoggingMixin):
|
||||||
# If the file path is already being processed, or if a file was
|
# If the file path is already being processed, or if a file was
|
||||||
# processed recently, wait until the next batch
|
# processed recently, wait until the next batch
|
||||||
file_paths_in_progress = self._processors.keys()
|
file_paths_in_progress = self._processors.keys()
|
||||||
now = datetime.utcnow()
|
now = timezone.utcnow()
|
||||||
file_paths_recently_processed = []
|
file_paths_recently_processed = []
|
||||||
for file_path in self._file_paths:
|
for file_path in self._file_paths:
|
||||||
last_finish_time = self.get_last_finish_time(file_path)
|
last_finish_time = self.get_last_finish_time(file_path)
|
||||||
|
|
|
@ -17,6 +17,7 @@ from __future__ import division
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
from airflow.utils import timezone
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from dateutil.relativedelta import relativedelta # for doctest
|
from dateutil.relativedelta import relativedelta # for doctest
|
||||||
import six
|
import six
|
||||||
|
@ -66,7 +67,7 @@ def date_range(
|
||||||
if end_date and num:
|
if end_date and num:
|
||||||
raise Exception("Wait. Either specify end_date OR num")
|
raise Exception("Wait. Either specify end_date OR num")
|
||||||
if not end_date and not num:
|
if not end_date and not num:
|
||||||
end_date = datetime.utcnow()
|
end_date = timezone.utcnow()
|
||||||
|
|
||||||
delta_iscron = False
|
delta_iscron = False
|
||||||
if isinstance(delta, six.string_types):
|
if isinstance(delta, six.string_types):
|
||||||
|
@ -219,7 +220,7 @@ def days_ago(n, hour=0, minute=0, second=0, microsecond=0):
|
||||||
Get a datetime object representing `n` days ago. By default the time is
|
Get a datetime object representing `n` days ago. By default the time is
|
||||||
set to midnight.
|
set to midnight.
|
||||||
"""
|
"""
|
||||||
today = datetime.utcnow().replace(
|
today = timezone.utcnow().replace(
|
||||||
hour=hour,
|
hour=hour,
|
||||||
minute=minute,
|
minute=minute,
|
||||||
second=second,
|
second=second,
|
||||||
|
|
|
@ -17,7 +17,7 @@ from __future__ import division
|
||||||
from __future__ import print_function
|
from __future__ import print_function
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
from datetime import datetime
|
from airflow.utils import timezone
|
||||||
from flask_admin.form import DateTimePickerWidget
|
from flask_admin.form import DateTimePickerWidget
|
||||||
from wtforms import DateTimeField, SelectField
|
from wtforms import DateTimeField, SelectField
|
||||||
from flask_wtf import Form
|
from flask_wtf import Form
|
||||||
|
@ -33,7 +33,7 @@ class DateTimeWithNumRunsForm(Form):
|
||||||
# Date time and number of runs form for tree view, task duration
|
# Date time and number of runs form for tree view, task duration
|
||||||
# and landing times
|
# and landing times
|
||||||
base_date = DateTimeField(
|
base_date = DateTimeField(
|
||||||
"Anchor date", widget=DateTimePickerWidget(), default=datetime.utcnow())
|
"Anchor date", widget=DateTimePickerWidget(), default=timezone.utcnow())
|
||||||
num_runs = SelectField("Number of runs", default=25, choices=(
|
num_runs = SelectField("Number of runs", default=25, choices=(
|
||||||
(5, "5"),
|
(5, "5"),
|
||||||
(25, "25"),
|
(25, "25"),
|
||||||
|
|
|
@ -21,7 +21,7 @@ from cgi import escape
|
||||||
from io import BytesIO as IO
|
from io import BytesIO as IO
|
||||||
import functools
|
import functools
|
||||||
import gzip
|
import gzip
|
||||||
import dateutil.parser as dateparser
|
import iso8601
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -46,6 +46,7 @@ DEFAULT_SENSITIVE_VARIABLE_FIELDS = (
|
||||||
'access_token',
|
'access_token',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def should_hide_value_for_key(key_name):
|
def should_hide_value_for_key(key_name):
|
||||||
return any(s in key_name.lower() for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) \
|
return any(s in key_name.lower() for s in DEFAULT_SENSITIVE_VARIABLE_FIELDS) \
|
||||||
and configuration.getboolean('admin', 'hide_sensitive_variable_fields')
|
and configuration.getboolean('admin', 'hide_sensitive_variable_fields')
|
||||||
|
@ -252,8 +253,8 @@ def action_logging(f):
|
||||||
dag_id=request.args.get('dag_id'))
|
dag_id=request.args.get('dag_id'))
|
||||||
|
|
||||||
if 'execution_date' in request.args:
|
if 'execution_date' in request.args:
|
||||||
log.execution_date = dateparser.parse(
|
log.execution_date = iso8601.parse_date(
|
||||||
request.args.get('execution_date'))
|
request.args.get('execution_date'), settings.TIMEZONE)
|
||||||
|
|
||||||
with create_session() as session:
|
with create_session() as session:
|
||||||
session.add(log)
|
session.add(log)
|
||||||
|
|
|
@ -21,7 +21,7 @@ import os
|
||||||
import pkg_resources
|
import pkg_resources
|
||||||
import socket
|
import socket
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
from datetime import datetime, timedelta
|
from datetime import timedelta
|
||||||
import dateutil.parser
|
import dateutil.parser
|
||||||
import copy
|
import copy
|
||||||
import math
|
import math
|
||||||
|
@ -72,6 +72,7 @@ from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, SCHEDULER_DEPS
|
||||||
from airflow.models import BaseOperator
|
from airflow.models import BaseOperator
|
||||||
from airflow.operators.subdag_operator import SubDagOperator
|
from airflow.operators.subdag_operator import SubDagOperator
|
||||||
|
|
||||||
|
from airflow.utils import timezone
|
||||||
from airflow.utils.json import json_ser
|
from airflow.utils.json import json_ser
|
||||||
from airflow.utils.state import State
|
from airflow.utils.state import State
|
||||||
from airflow.utils.db import create_session, provide_session
|
from airflow.utils.db import create_session, provide_session
|
||||||
|
@ -170,7 +171,7 @@ def duration_f(v, c, m, p):
|
||||||
def datetime_f(v, c, m, p):
|
def datetime_f(v, c, m, p):
|
||||||
attr = getattr(m, p)
|
attr = getattr(m, p)
|
||||||
dttm = attr.isoformat() if attr else ''
|
dttm = attr.isoformat() if attr else ''
|
||||||
if datetime.utcnow().isoformat()[:4] == dttm[:4]:
|
if timezone.utcnow().isoformat()[:4] == dttm[:4]:
|
||||||
dttm = dttm[5:]
|
dttm = dttm[5:]
|
||||||
return Markup("<nobr>{}</nobr>".format(dttm))
|
return Markup("<nobr>{}</nobr>".format(dttm))
|
||||||
|
|
||||||
|
@ -922,7 +923,7 @@ class Airflow(BaseView):
|
||||||
flash("Cannot find dag {}".format(dag_id))
|
flash("Cannot find dag {}".format(dag_id))
|
||||||
return redirect(origin)
|
return redirect(origin)
|
||||||
|
|
||||||
execution_date = datetime.utcnow()
|
execution_date = timezone.utcnow()
|
||||||
run_id = "manual__{0}".format(execution_date.isoformat())
|
run_id = "manual__{0}".format(execution_date.isoformat())
|
||||||
|
|
||||||
dr = DagRun.find(dag_id=dag_id, run_id=run_id)
|
dr = DagRun.find(dag_id=dag_id, run_id=run_id)
|
||||||
|
@ -1161,7 +1162,7 @@ class Airflow(BaseView):
|
||||||
if base_date:
|
if base_date:
|
||||||
base_date = dateutil.parser.parse(base_date)
|
base_date = dateutil.parser.parse(base_date)
|
||||||
else:
|
else:
|
||||||
base_date = dag.latest_execution_date or datetime.utcnow()
|
base_date = dag.latest_execution_date or timezone.utcnow()
|
||||||
|
|
||||||
dates = dag.date_range(base_date, num=-abs(num_runs))
|
dates = dag.date_range(base_date, num=-abs(num_runs))
|
||||||
min_date = dates[0] if dates else datetime(2000, 1, 1)
|
min_date = dates[0] if dates else datetime(2000, 1, 1)
|
||||||
|
@ -1217,7 +1218,7 @@ class Airflow(BaseView):
|
||||||
def set_duration(tid):
|
def set_duration(tid):
|
||||||
if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and
|
if (isinstance(tid, dict) and tid.get("state") == State.RUNNING and
|
||||||
tid["start_date"] is not None):
|
tid["start_date"] is not None):
|
||||||
d = datetime.utcnow() - dateutil.parser.parse(tid["start_date"])
|
d = timezone.utcnow() - dateutil.parser.parse(tid["start_date"])
|
||||||
tid["duration"] = d.total_seconds()
|
tid["duration"] = d.total_seconds()
|
||||||
return tid
|
return tid
|
||||||
|
|
||||||
|
@ -1314,7 +1315,7 @@ class Airflow(BaseView):
|
||||||
if dttm:
|
if dttm:
|
||||||
dttm = dateutil.parser.parse(dttm)
|
dttm = dateutil.parser.parse(dttm)
|
||||||
else:
|
else:
|
||||||
dttm = dag.latest_execution_date or datetime.utcnow().date()
|
dttm = dag.latest_execution_date or timezone.utcnow().date()
|
||||||
|
|
||||||
DR = models.DagRun
|
DR = models.DagRun
|
||||||
drs = (
|
drs = (
|
||||||
|
@ -1390,7 +1391,7 @@ class Airflow(BaseView):
|
||||||
if base_date:
|
if base_date:
|
||||||
base_date = dateutil.parser.parse(base_date)
|
base_date = dateutil.parser.parse(base_date)
|
||||||
else:
|
else:
|
||||||
base_date = dag.latest_execution_date or datetime.utcnow()
|
base_date = dag.latest_execution_date or timezone.utcnow()
|
||||||
|
|
||||||
dates = dag.date_range(base_date, num=-abs(num_runs))
|
dates = dag.date_range(base_date, num=-abs(num_runs))
|
||||||
min_date = dates[0] if dates else datetime(2000, 1, 1)
|
min_date = dates[0] if dates else datetime(2000, 1, 1)
|
||||||
|
@ -1497,7 +1498,7 @@ class Airflow(BaseView):
|
||||||
if base_date:
|
if base_date:
|
||||||
base_date = dateutil.parser.parse(base_date)
|
base_date = dateutil.parser.parse(base_date)
|
||||||
else:
|
else:
|
||||||
base_date = dag.latest_execution_date or datetime.utcnow()
|
base_date = dag.latest_execution_date or timezone.utcnow()
|
||||||
|
|
||||||
dates = dag.date_range(base_date, num=-abs(num_runs))
|
dates = dag.date_range(base_date, num=-abs(num_runs))
|
||||||
min_date = dates[0] if dates else datetime(2000, 1, 1)
|
min_date = dates[0] if dates else datetime(2000, 1, 1)
|
||||||
|
@ -1560,7 +1561,7 @@ class Airflow(BaseView):
|
||||||
if base_date:
|
if base_date:
|
||||||
base_date = dateutil.parser.parse(base_date)
|
base_date = dateutil.parser.parse(base_date)
|
||||||
else:
|
else:
|
||||||
base_date = dag.latest_execution_date or datetime.utcnow()
|
base_date = dag.latest_execution_date or timezone.utcnow()
|
||||||
|
|
||||||
dates = dag.date_range(base_date, num=-abs(num_runs))
|
dates = dag.date_range(base_date, num=-abs(num_runs))
|
||||||
min_date = dates[0] if dates else datetime(2000, 1, 1)
|
min_date = dates[0] if dates else datetime(2000, 1, 1)
|
||||||
|
@ -1651,7 +1652,7 @@ class Airflow(BaseView):
|
||||||
DagModel).filter(DagModel.dag_id == dag_id).first()
|
DagModel).filter(DagModel.dag_id == dag_id).first()
|
||||||
|
|
||||||
if orm_dag:
|
if orm_dag:
|
||||||
orm_dag.last_expired = datetime.utcnow()
|
orm_dag.last_expired = timezone.utcnow()
|
||||||
session.merge(orm_dag)
|
session.merge(orm_dag)
|
||||||
session.commit()
|
session.commit()
|
||||||
|
|
||||||
|
@ -1687,7 +1688,7 @@ class Airflow(BaseView):
|
||||||
if dttm:
|
if dttm:
|
||||||
dttm = dateutil.parser.parse(dttm)
|
dttm = dateutil.parser.parse(dttm)
|
||||||
else:
|
else:
|
||||||
dttm = dag.latest_execution_date or datetime.utcnow().date()
|
dttm = dag.latest_execution_date or timezone.utcnow().date()
|
||||||
|
|
||||||
form = DateTimeForm(data={'execution_date': dttm})
|
form = DateTimeForm(data={'execution_date': dttm})
|
||||||
|
|
||||||
|
@ -1698,7 +1699,7 @@ class Airflow(BaseView):
|
||||||
|
|
||||||
tasks = []
|
tasks = []
|
||||||
for ti in tis:
|
for ti in tis:
|
||||||
end_date = ti.end_date if ti.end_date else datetime.utcnow()
|
end_date = ti.end_date if ti.end_date else timezone.utcnow()
|
||||||
tasks.append({
|
tasks.append({
|
||||||
'startDate': wwwutils.epoch(ti.start_date),
|
'startDate': wwwutils.epoch(ti.start_date),
|
||||||
'endDate': wwwutils.epoch(end_date),
|
'endDate': wwwutils.epoch(end_date),
|
||||||
|
@ -2172,7 +2173,7 @@ class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView):
|
||||||
model.iteration_no += 1
|
model.iteration_no += 1
|
||||||
if not model.user_id and current_user and hasattr(current_user, 'id'):
|
if not model.user_id and current_user and hasattr(current_user, 'id'):
|
||||||
model.user_id = current_user.id
|
model.user_id = current_user.id
|
||||||
model.last_modified = datetime.utcnow()
|
model.last_modified = timezone.utcnow()
|
||||||
|
|
||||||
|
|
||||||
chart_mapping = (
|
chart_mapping = (
|
||||||
|
@ -2433,9 +2434,9 @@ class DagRunModelView(ModelViewOnly):
|
||||||
count += 1
|
count += 1
|
||||||
dr.state = target_state
|
dr.state = target_state
|
||||||
if target_state == State.RUNNING:
|
if target_state == State.RUNNING:
|
||||||
dr.start_date = datetime.utcnow()
|
dr.start_date = timezone.utcnow()
|
||||||
else:
|
else:
|
||||||
dr.end_date = datetime.utcnow()
|
dr.end_date = timezone.utcnow()
|
||||||
session.commit()
|
session.commit()
|
||||||
models.DagStat.update(dirty_ids, session=session)
|
models.DagStat.update(dirty_ids, session=session)
|
||||||
flash(
|
flash(
|
||||||
|
|
1
setup.py
1
setup.py
|
@ -222,6 +222,7 @@ def do_setup():
|
||||||
'future>=0.16.0, <0.17',
|
'future>=0.16.0, <0.17',
|
||||||
'gitpython>=2.0.2',
|
'gitpython>=2.0.2',
|
||||||
'gunicorn>=19.4.0, <20.0',
|
'gunicorn>=19.4.0, <20.0',
|
||||||
|
'iso8601>=0.1.12',
|
||||||
'jinja2>=2.7.3, <2.9.0',
|
'jinja2>=2.7.3, <2.9.0',
|
||||||
'lxml>=3.6.0, <4.0',
|
'lxml>=3.6.0, <4.0',
|
||||||
'markdown>=2.5.2, <3.0',
|
'markdown>=2.5.2, <3.0',
|
||||||
|
|
Загрузка…
Ссылка в новой задаче