[AIRFLOW-1807] Force use of time zone aware db fields

This change will check if all date times being stored are
indeed timezone aware.
This commit is contained in:
Bolke de Bruin 2017-11-11 13:32:02 +01:00
Родитель c857436b75
Коммит 2f168634aa
5 изменённых файлов: 45 добавлений и 27 удалений

Просмотреть файл

@ -33,9 +33,10 @@ import datetime
from collections import defaultdict
from past.builtins import basestring
from sqlalchemy import (
Column, Integer, String, DateTime, func, Index, or_, and_, not_)
Column, Integer, String, func, Index, or_, and_, not_)
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import make_transient
from sqlalchemy_utc import UtcDateTime
from tabulate import tabulate
from time import sleep
@ -77,9 +78,9 @@ class BaseJob(Base, LoggingMixin):
dag_id = Column(String(ID_LEN),)
state = Column(String(20))
job_type = Column(String(30))
start_date = Column(DateTime())
end_date = Column(DateTime())
latest_heartbeat = Column(DateTime())
start_date = Column(UtcDateTime())
end_date = Column(UtcDateTime())
latest_heartbeat = Column(UtcDateTime())
executor_class = Column(String(500))
hostname = Column(String(500))
unixname = Column(String(1000))

Просмотреть файл

@ -55,6 +55,7 @@ from sqlalchemy import func, or_, and_
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import reconstructor, relationship, synonym
from sqlalchemy_utc import UtcDateTime
from croniter import croniter
import six
@ -732,7 +733,7 @@ class DagPickle(Base):
"""
id = Column(Integer, primary_key=True)
pickle = Column(PickleType(pickler=dill))
created_dttm = Column(DateTime, default=func.now())
created_dttm = Column(UtcDateTime, default=func.now())
pickle_hash = Column(Text)
__tablename__ = "dag_pickle"
@ -763,9 +764,9 @@ class TaskInstance(Base, LoggingMixin):
task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, primary_key=True)
start_date = Column(DateTime)
end_date = Column(DateTime)
execution_date = Column(UtcDateTime, primary_key=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
duration = Column(Float)
state = Column(String(20))
try_number = Column(Integer, default=0)
@ -777,7 +778,7 @@ class TaskInstance(Base, LoggingMixin):
queue = Column(String(50))
priority_weight = Column(Integer)
operator = Column(String(1000))
queued_dttm = Column(DateTime)
queued_dttm = Column(UtcDateTime)
pid = Column(Integer)
__table_args__ = (
@ -1862,9 +1863,9 @@ class TaskFail(Base):
task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, primary_key=True)
start_date = Column(DateTime)
end_date = Column(DateTime)
execution_date = Column(UtcDateTime, primary_key=True)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
duration = Column(Float)
def __init__(self, task, execution_date, start_date, end_date):
@ -1884,11 +1885,11 @@ class Log(Base):
__tablename__ = "log"
id = Column(Integer, primary_key=True)
dttm = Column(DateTime)
dttm = Column(UtcDateTime)
dag_id = Column(String(ID_LEN))
task_id = Column(String(ID_LEN))
event = Column(String(30))
execution_date = Column(DateTime)
execution_date = Column(UtcDateTime)
owner = Column(String(500))
extra = Column(Text)
@ -2741,12 +2742,12 @@ class DagModel(Base):
# Whether that DAG was seen on the last DagBag load
is_active = Column(Boolean, default=False)
# Last time the scheduler started
last_scheduler_run = Column(DateTime)
last_scheduler_run = Column(UtcDateTime)
# Last time this DAG was pickled
last_pickled = Column(DateTime)
last_pickled = Column(UtcDateTime)
# Time when the DAG last received a refresh signal
# (e.g. the DAG's "refresh" button was clicked in the web UI)
last_expired = Column(DateTime)
last_expired = Column(UtcDateTime)
# Whether (one of) the scheduler is scheduling this DAG at the moment
scheduler_lock = Column(Boolean)
# Foreign key to the latest pickle_id
@ -3904,7 +3905,7 @@ class Chart(Base):
"User", cascade=False, cascade_backrefs=False, backref='charts')
x_is_date = Column(Boolean, default=True)
iteration_no = Column(Integer, default=0)
last_modified = Column(DateTime, default=func.now())
last_modified = Column(UtcDateTime, default=func.now())
def __repr__(self):
return self.label
@ -3925,8 +3926,8 @@ class KnownEvent(Base):
id = Column(Integer, primary_key=True)
label = Column(String(200))
start_date = Column(DateTime)
end_date = Column(DateTime)
start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
user_id = Column(Integer(), ForeignKey('users.id'),)
known_event_type_id = Column(Integer(), ForeignKey('known_event_type.id'),)
reported_by = relationship(
@ -4054,7 +4055,7 @@ class XCom(Base, LoggingMixin):
value = Column(LargeBinary)
timestamp = Column(
DateTime, default=func.now(), nullable=False)
execution_date = Column(DateTime, nullable=False)
execution_date = Column(UtcDateTime, nullable=False)
# source information
task_id = Column(String(ID_LEN), nullable=False)
@ -4372,9 +4373,9 @@ class DagRun(Base, LoggingMixin):
id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
execution_date = Column(DateTime, default=func.now())
start_date = Column(DateTime, default=func.now())
end_date = Column(DateTime)
execution_date = Column(UtcDateTime, default=func.now())
start_date = Column(UtcDateTime, default=func.now())
end_date = Column(UtcDateTime)
_state = Column('state', String(50), default=State.RUNNING)
run_id = Column(String(ID_LEN))
external_trigger = Column(Boolean, default=True)
@ -4790,9 +4791,9 @@ class SlaMiss(Base):
task_id = Column(String(ID_LEN), primary_key=True)
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, primary_key=True)
execution_date = Column(UtcDateTime, primary_key=True)
email_sent = Column(Boolean, default=False)
timestamp = Column(DateTime)
timestamp = Column(UtcDateTime)
description = Column(Text)
notification_sent = Column(Boolean, default=False)
@ -4804,6 +4805,6 @@ class SlaMiss(Base):
class ImportError(Base):
__tablename__ = "import_error"
id = Column(Integer, primary_key=True)
timestamp = Column(DateTime)
timestamp = Column(UtcDateTime)
filename = Column(String(1024))
stacktrace = Column(Text)

Просмотреть файл

@ -26,6 +26,8 @@ import json
import time
from flask import after_this_request, request, Response
from flask_admin.contrib.sqla.filters import FilterConverter
from flask_admin.model import filters
from flask_login import current_user
import wtforms
from wtforms.compat import text_type
@ -386,3 +388,9 @@ class AceEditorWidget(wtforms.widgets.TextArea):
form_name=field.id,
)
return wtforms.widgets.core.HTMLString(html)
class UtcFilterConverter(FilterConverter):
@filters.convert('utcdatetime')
def conv_utcdatetime(self, column, name, **kwargs):
return self.conv_datetime(column, name, **kwargs)

Просмотреть файл

@ -2055,6 +2055,7 @@ class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly):
column_searchable_list = ('dag_id', 'task_id',)
column_filters = (
'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date')
filter_converter = wwwutils.UtcFilterConverter()
form_widget_args = {
'email_sent': {'disabled': True},
'timestamp': {'disabled': True},
@ -2349,6 +2350,7 @@ class XComView(wwwutils.SuperUserMixin, AirflowModelView):
column_filters = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id')
column_searchable_list = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id')
filter_converter = wwwutils.UtcFilterConverter()
class JobModelView(ModelViewOnly):
@ -2365,6 +2367,7 @@ class JobModelView(ModelViewOnly):
hostname=nobr_f,
state=state_f,
latest_heartbeat=datetime_f)
filter_converter = wwwutils.UtcFilterConverter()
class DagRunModelView(ModelViewOnly):
@ -2387,6 +2390,7 @@ class DagRunModelView(ModelViewOnly):
column_list = (
'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
column_filters = column_list
filter_converter = wwwutils.UtcFilterConverter()
column_searchable_list = ('dag_id', 'state', 'run_id')
column_formatters = dict(
execution_date=datetime_f,
@ -2453,6 +2457,7 @@ class LogModelView(ModelViewOnly):
column_display_actions = False
column_default_sort = ('dttm', True)
column_filters = ('dag_id', 'task_id', 'execution_date')
filter_converter = wwwutils.UtcFilterConverter()
column_formatters = dict(
dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link)
@ -2463,6 +2468,7 @@ class TaskInstanceModelView(ModelViewOnly):
column_filters = (
'state', 'dag_id', 'task_id', 'execution_date', 'hostname',
'queue', 'pool', 'operator', 'start_date', 'end_date')
filter_converter = wwwutils.UtcFilterConverter()
named_filter_urls = True
column_formatters = dict(
log_url=log_url_formatter,
@ -2752,6 +2758,7 @@ class DagModelView(wwwutils.SuperUserMixin, ModelView):
column_filters = (
'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag',
'last_scheduler_run', 'last_expired')
filter_converter = wwwutils.UtcFilterConverter()
form_widget_args = {
'last_scheduler_run': {'disabled': True},
'fileloc': {'disabled': True},

Просмотреть файл

@ -236,6 +236,7 @@ def do_setup():
'requests>=2.5.1, <3',
'setproctitle>=1.1.8, <2',
'sqlalchemy>=0.9.8',
'sqlalchemy-utc>=0.9.0',
'tabulate>=0.7.5, <0.8.0',
'thrift>=0.9.2',
'tzlocal>=1.4',