add CLI to insert new DagRuns

fix typos in comments

add missing import

add state to DagRun

Adding cron support for schedule_interval

Allow null on charts

Refactoring like a mad man

add dag_run table
This commit is contained in:
Christian Trebing 2015-10-12 15:12:20 +02:00 коммит произвёл Maxime Beauchemin
Родитель 566da59ecf
Коммит f5e1ae7abc
9 изменённых файлов: 147 добавлений и 88 удалений

Просмотреть файл

@ -4,6 +4,7 @@ import logging
import os
import subprocess
import sys
from datetime import datetime
from builtins import input
import argparse
@ -13,7 +14,7 @@ import airflow
from airflow import jobs, settings, utils
from airflow import configuration
from airflow.executors import DEFAULT_EXECUTOR
from airflow.models import DagBag, TaskInstance, DagPickle
from airflow.models import DagBag, TaskInstance, DagPickle, DagRun
from airflow.utils import AirflowException
DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER'))
@ -89,6 +90,21 @@ def backfill(args):
pool=args.pool)
def trigger_dag(args):
session = settings.Session()
# TODO: verify dag_id
dag_id = args.dag_id
run_id = args.run_id or None
execution_date = datetime.now()
trigger = DagRun(
dag_id=dag_id,
run_id=run_id,
execution_date=execution_date,
external_trigger=True)
session.add(trigger)
session.commit()
def run(args):
utils.pessimistic_connection_handling()
@ -487,6 +503,14 @@ def get_parser():
"-c", "--no_confirm", help=ht, action="store_true")
parser_clear.set_defaults(func=clear)
ht = "Trigger a DAG"
parser_trigger_dag = subparsers.add_parser('trigger_dag', help=ht)
parser_trigger_dag.add_argument("dag_id", help="The id of the dag to run")
parser_trigger_dag.add_argument(
"-r", "--run_id",
help="Helps to indentify this run")
parser_trigger_dag.set_defaults(func=trigger_dag)
ht = "Run a single task instance"
parser_run = subparsers.add_parser('run', help=ht)
parser_run.add_argument("dag_id", help="The id of the dag to run")

Просмотреть файл

@ -10,7 +10,9 @@ args = {
'start_date': seven_days_ago,
}
dag = DAG(dag_id='example_bash_operator', default_args=args)
dag = DAG(
dag_id='example_bash_operator', default_args=args,
schedule_interval='0 0 * * *')
cmd = 'ls -l'
run_this_last = DummyOperator(task_id='run_this_last', dag=dag)

Просмотреть файл

@ -7,6 +7,7 @@ from builtins import str
from past.builtins import basestring
from collections import defaultdict
from datetime import datetime
from itertools import product
import getpass
import logging
import signal
@ -259,15 +260,16 @@ class SchedulerJob(BaseJob):
task = dag.get_task(ti.task_id)
dttm = ti.execution_date
if task.sla:
dttm += dag.schedule_interval
dttm = dag.following_schedule(dttm)
following_schedule = dag.following_schedule(dttm)
while dttm < datetime.now():
if dttm + task.sla + dag.schedule_interval < datetime.now():
if following_schedule + task.sla < datetime.now():
session.merge(models.SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
execution_date=dttm,
timestamp=ts))
dttm += dag.schedule_interval
dttm = dag.following_schedule(dttm)
session.commit()
slas = (
@ -349,7 +351,7 @@ class SchedulerJob(BaseJob):
last_scheduled_run = qry.scalar()
if not last_scheduled_run or last_scheduled_run <= datetime.now():
if last_scheduled_run:
next_run_date = last_scheduled_run + dag.schedule_interval
next_run_date = dag.following_schedule(last_scheduled_run)
else:
next_run_date = dag.default_args['start_date']
if not next_run_date:
@ -358,6 +360,7 @@ class SchedulerJob(BaseJob):
dag_id=dag.dag_id,
run_id='scheduled',
execution_date=next_run_date,
state=State.RUNNING,
external_trigger=False
)
session.add(next_run)
@ -374,6 +377,7 @@ class SchedulerJob(BaseJob):
function takes a lock on the DAG and timestamps the last run
in ``last_scheduler_run``.
"""
TI = models.TaskInstance
DagModel = models.DagModel
session = settings.Session()
@ -399,76 +403,19 @@ class SchedulerJob(BaseJob):
db_dag.last_scheduler_run = datetime.now()
session.commit()
TI = models.TaskInstance
logging.info(
"Getting latest instance "
"for all tasks in dag " + dag.dag_id)
sq = (
session
.query(
TI.task_id,
func.max(TI.execution_date).label('max_ti'))
.filter(TI.dag_id == dag.dag_id)
.group_by(TI.task_id).subquery('sq')
)
active_runs = dag.get_active_runs()
qry = session.query(TI).filter(
TI.dag_id == dag.dag_id,
TI.task_id == sq.c.task_id,
TI.execution_date == sq.c.max_ti,
)
logging.debug("Querying max dates for each task")
latest_ti = qry.all()
ti_dict = {ti.task_id: ti for ti in latest_ti}
session.expunge_all()
session.commit()
logging.debug("{} rows returned".format(len(latest_ti)))
for task in dag.tasks:
for task, dttm in product(dag.tasks, active_runs):
if task.adhoc:
continue
if task.task_id not in ti_dict:
# TODO: Needs this be changed with DagRun refactoring
# Brand new task, let's get started
ti = TI(task, task.start_date)
ti.refresh_from_db()
if ti.is_queueable(flag_upstream_failed=True):
logging.info(
'First run for {ti}'.format(**locals()))
executor.queue_task_instance(ti, pickle_id=pickle_id)
else:
ti = ti_dict[task.task_id]
ti.task = task # Hacky but worky
if ti.state == State.RUNNING:
continue # Only one task at a time
elif ti.state == State.UP_FOR_RETRY:
# If task instance if up for retry, make sure
# the retry delay is met
if ti.is_runnable():
logging.debug('Triggering retry: ' + str(ti))
executor.queue_task_instance(ti, pickle_id=pickle_id)
elif ti.state == State.QUEUED:
# If was queued we skipped so that in gets prioritized
# in self.prioritize_queued
continue
else:
# Checking whether there is a dag for which no task exists
# up to now
qry = session.query(func.min(models.DagRun.execution_date)).filter(
and_(models.DagRun.dag_id == dag.dag_id,
models.DagRun.execution_date > ti.execution_date))
next_schedule = qry.scalar()
if not next_schedule:
continue
ti = TI(task, dttm)
ti.refresh_from_db()
if ti.state in (State.RUNNING, State.QUEUED, State.SUCCESS):
continue
elif ti.is_runnable(flag_upstream_failed=True):
logging.debug('Queuing next run: ' + str(ti))
executor.queue_task_instance(ti, pickle_id=pickle_id)
ti = TI(
task=task,
execution_date=next_schedule,
)
ti.refresh_from_db()
if ti.is_queueable(flag_upstream_failed=True):
logging.debug('Queuing next run: ' + str(ti))
executor.queue_task_instance(ti, pickle_id=pickle_id)
# Releasing the lock
logging.debug("Unlocking DAG (scheduler_lock)")
db_dag = (

Просмотреть файл

@ -21,6 +21,7 @@ def upgrade():
'dag_run',
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('execution_date', sa.DateTime(), nullable=False),
sa.Column('state', sa.String(length=50), nullable=True),
sa.Column('run_id', sa.String(length=250), nullable=True),
sa.Column('external_trigger', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('dag_id', 'execution_date')

Просмотреть файл

@ -32,6 +32,9 @@ from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import relationship, synonym
from croniter import croniter
import six
from airflow import settings, utils
from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor
from airflow import configuration
@ -677,7 +680,7 @@ class TaskInstance(Base):
return False
elif self.task.end_date and self.execution_date > self.task.end_date:
return False
elif self.state == State.SKIPPED:
elif self.state in (State.SKIPPED, State.QUEUED):
return False
elif (
self.state in State.runnable() and
@ -748,7 +751,7 @@ class TaskInstance(Base):
TI.dag_id == self.dag_id,
TI.task_id == task.task_id,
TI.execution_date ==
self.execution_date-task.schedule_interval,
self.task.dag.previous_schedule(self.execution_date),
TI.state == State.SUCCESS,
).first()
if not previous_ti:
@ -1858,7 +1861,8 @@ class DAG(object):
timedelta object gets added to your latest task instance's
execution_date to figure out the next schedule
:type schedule_interval: datetime.timedelta or
dateutil.relativedelta.relativedelta
dateutil.relativedelta.relativedelta or str that acts as a cron
expression
:param start_date: The timestamp from which the scheduler will
attempt to backfill
:type start_date: datetime.datetime
@ -1953,6 +1957,20 @@ class DAG(object):
hash_components.append(repr(val))
return hash(tuple(hash_components))
def following_schedule(self, dttm):
if isinstance(self.schedule_interval, six.string_types):
cron = croniter(self.schedule_interval, dttm)
return cron.get_next(datetime)
else:
return dttm + self.schedule_interval
def previous_schedule(self, dttm):
if isinstance(self.schedule_interval, six.string_types):
cron = croniter(self.schedule_interval, dttm)
return cron.get_prev(datetime)
else:
return dttm - self.schedule_interval
@property
def task_ids(self):
return [t.task_id for t in self.tasks]
@ -2006,6 +2024,41 @@ class DAG(object):
l += task.subdag.subdags
return l
def get_active_runs(self):
"""
Maintains and returns the currently active runs as a list of dates
"""
TI = TaskInstance
session = settings.Session()
# Checking state of active DagRuns
active_runs = []
active_runs = (
session.query(DagRun)
.filter(
DagRun.dag_id == self.dag_id,
DagRun.state == State.RUNNING)
.all()
)
for run in active_runs:
logging.info("Checking state for " + str(run))
task_instances = session.query(TI).filter(
TI.dag_id == run.dag_id,
TI.task_id.in_(self.task_ids),
TI.execution_date == run.execution_date,
).all()
if len(task_instances) == len(self.tasks):
task_states = [ti.state for ti in task_instances]
if State.FAILED in task_states:
logging.info('Marking run {} failed'.format(run))
run.state = State.FAILED
elif set(task_states) == set([State.SUCCESS]):
logging.info('Marking run {} successful'.format(run))
run.state = State.SUCCESS
else:
active_runs.append(run.execution_date)
session.commit()
return active_runs
def resolve_template_files(self):
for t in self.tasks:
t.resolve_template_files()
@ -2298,7 +2351,7 @@ class Chart(Base):
id = Column(Integer, primary_key=True)
label = Column(String(200))
conn_id = Column(String(ID_LEN), nullable=False)
user_id = Column(Integer(), ForeignKey('user.id'),)
user_id = Column(Integer(), ForeignKey('user.id'), nullable=True)
chart_type = Column(String(100), default="line")
sql_layout = Column(String(50), default="series")
sql = Column(Text, default="SELECT series, x, y FROM table")
@ -2514,24 +2567,23 @@ class XCom(Base):
class DagRun(Base):
"""
DagRun describes an instance of a Dag. It can be created
by a scheduled of a Dag or by an external trigger
by the scheduler (for regular runs) or by an external trigger
"""
__tablename__ = "dag_run"
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, primary_key=True)
state = Column(String(50))
run_id = Column(String(ID_LEN))
external_trigger = Column(Boolean, default=False)
def __repr__(self):
return '<DagRun {dag_id} @ {execution_date}: {run_id}, \
externally triggered: {external_trigger}>'.format(
task_id=self.task_id,
dag_id=self.dag_id,
execution_date=self.execution_date,
run_id=self.run_id,
external_trigger=self.external_trigger)
return str((
self.dag_id, self.run_id, self.execution_date.isoformat()))
class Pool(Base):

Просмотреть файл

@ -21,6 +21,7 @@ import os
import re
import shutil
import signal
import six
import smtplib
from tempfile import mkdtemp
@ -34,6 +35,7 @@ from sqlalchemy import event, exc
from sqlalchemy.pool import Pool
import numpy as np
from croniter import croniter
from airflow import settings
from airflow import configuration
@ -283,11 +285,36 @@ def validate_key(k, max_length=250):
def date_range(start_date, end_date=datetime.now(), delta=timedelta(1)):
"""
Get a set of dates as a list based on a start, end and delta, delta
can be something that can be added to ``datetime.datetime``
or a cron expression as a ``str``
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3))
[datetime.datetime(2016, 1, 1, 0, 0),
datetime.datetime(2016, 1, 2, 0, 0),
datetime.datetime(2016, 1, 3, 0, 0)]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), '0 0 * * *')
[datetime.datetime(2016, 1, 1, 0, 0),
datetime.datetime(2016, 1, 2, 0, 0),
datetime.datetime(2016, 1, 3, 0, 0)]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
[datetime.datetime(2016, 1, 1, 0, 0),
datetime.datetime(2016, 2, 1, 0, 0),
datetime.datetime(2016, 3, 1, 0, 0)]
"""
delta_iscron = False
if isinstance(delta, six.string_types):
delta_iscron = True
cron = croniter(delta, start_date)
l = []
if end_date >= start_date:
while start_date <= end_date:
l.append(start_date)
start_date += delta
if delta_iscron:
start_date = cron.get_next(datetime)
else:
start_date += delta
else:
raise AirflowException("start_date can't be after end_date")
return l
@ -566,6 +593,16 @@ def round_time(dt, delta, start_date=datetime.min):
>>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0))
datetime.datetime(2015, 9, 14, 0, 0)
"""
if isinstance(delta, six.string_types):
# It's cron based, so it's easy
cron = croniter(delta, start_date)
prev = cron.get_prev(datetime)
if prev == start_date:
return start_date
else:
return prev
# Ignore the microseconds of dt
dt -= timedelta(microseconds = dt.microsecond)

Просмотреть файл

@ -45,23 +45,17 @@ def create_app(config=None):
admin.add_view(views.Airflow(name='DAGs'))
admin.add_view(views.SlaMissModelView(models.SlaMiss, Session, name="SLA Misses", category="Browse"))
admin.add_view(
views.TaskInstanceModelView(models.TaskInstance, Session, name="Task Instances", category="Browse")
)
admin.add_view(views.TaskInstanceModelView(models.TaskInstance, Session, name="Task Instances", category="Browse"))
admin.add_view(views.LogModelView(models.Log, Session, name="Logs", category="Browse"))
admin.add_view(views.JobModelView(jobs.BaseJob, Session, name="Jobs", category="Browse"))
admin.add_view(views.QueryView(name='Ad Hoc Query', category="Data Profiling"))
admin.add_view(views.ChartModelView(models.Chart, Session, name="Charts", category="Data Profiling"))
admin.add_view(views.KnowEventView(models.KnownEvent, Session, name="Known Events", category="Data Profiling"))
admin.add_view(views.PoolModelView(models.Pool, Session, name="Pools", category="Admin"))
admin.add_view(views.ConfigurationView(name='Configuration', category="Admin"))
admin.add_view(views.UserModelView(models.User, Session, name="Users", category="Admin"))
admin.add_view(views.ConnectionModelView(models.Connection, Session, name="Connections", category="Admin"))
admin.add_view(views.VariableView(models.Variable, Session, name="Variables", category="Admin"))
admin.add_link(base.MenuLink(category='Docs', name='Documentation', url='http://pythonhosted.org/airflow/'))
admin.add_link(base.MenuLink(category='Docs',name='Github',url='https://github.com/airbnb/airflow'))

Просмотреть файл

@ -5,6 +5,7 @@ chartkick
cryptography
coverage
coveralls
croniter
dill
flake8
flask

Просмотреть файл

@ -68,6 +68,7 @@ setup(
install_requires=[
'alembic>=0.8.0, <0.9',
'chartkick>=0.4.2, < 0.5',
'croniter>=0.3.8, <0.4',
'dill>=0.2.2, <0.3',
'flask>=0.10.1, <0.11',
'flask-admin==1.2.0',