Fixing jobs

Lining up db revisions

Adding CRUD in the admin

Success, backend running, next is UI changes

Updating the docs to reflect the new behavior

Got the UI to show externaly triggered runs, root object for DAG Run

UI improvments, mostly functional

DAG concurrency limit

Commit resets dag runs

More unit tests

Adapting the UI

Fixing brutal amount of merge conflicts

Polish around UI and events

Adding DB migration script

Fixed the charts

Adding schedule info in the dag page

Adding cron presets

Fixing up the tests

Adding @once as a schedule_interval option

A layer of polish and bug fixes
This commit is contained in:
Maxime Beauchemin 2015-10-21 16:36:16 -07:00
Родитель f5e1ae7abc
Коммит b63b42429f
28 изменённых файлов: 742 добавлений и 353 удалений

Просмотреть файл

@ -1,5 +1,10 @@
TODO
-----
### Current
* Test @once
* Test ``run_id`` in template and ``dag_run``
#### UI
* Backfill form
* Better task filtering int duration and landing time charts (operator toggle, task regex, uncheck all button)

Просмотреть файл

@ -15,7 +15,7 @@ from airflow import jobs, settings, utils
from airflow import configuration
from airflow.executors import DEFAULT_EXECUTOR
from airflow.models import DagBag, TaskInstance, DagPickle, DagRun
from airflow.utils import AirflowException
from airflow.utils import AirflowException, State
DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER'))
@ -91,17 +91,23 @@ def backfill(args):
def trigger_dag(args):
log_to_stdout()
session = settings.Session()
# TODO: verify dag_id
dag_id = args.dag_id
run_id = args.run_id or None
execution_date = datetime.now()
trigger = DagRun(
dag_id=dag_id,
run_id=run_id,
execution_date=execution_date,
external_trigger=True)
session.add(trigger)
dr = session.query(DagRun).filter(
DagRun.dag_id==args.dag_id, DagRun.run_id==args.run_id).first()
if dr:
logging.error("This run_id already exists")
else:
trigger = DagRun(
dag_id=args.dag_id,
run_id=args.run_id,
execution_date=execution_date,
state=State.RUNNING,
external_trigger=True)
session.add(trigger)
logging.info("Created {}".format(trigger))
session.commit()

Просмотреть файл

@ -53,7 +53,8 @@ defaults = {
'plugins_folder': None,
'security': None,
'donot_pickle': False,
's3_log_folder': ''
's3_log_folder': '',
'dag_concurrency': 16,
},
'webserver': {
'base_url': 'http://localhost:8080',
@ -120,6 +121,9 @@ sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# Whether to load the examples that ship with Airflow. It's good to
# get started, but you probably want to set this to False in a production
# environment
@ -271,6 +275,7 @@ sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db
unit_test_mode = True
load_examples = True
donot_pickle = False
dag_concurrency = 16
[webserver]
base_url = http://localhost:8080

Просмотреть файл

@ -31,6 +31,6 @@ for i in range(3):
task = BashOperator(
task_id='also_run_this',
bash_command='echo "{{ macros.uuid.uuid1() }}"',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag)
task.set_downstream(run_this_last)

Просмотреть файл

@ -10,7 +10,10 @@ args = {
'start_date': seven_days_ago,
}
dag = DAG(dag_id='example_branch_operator', default_args=args)
dag = DAG(
dag_id='example_branch_operator',
default_args=args,
schedule_interval="@daily")
cmd = 'ls -l'
run_this_first = DummyOperator(task_id='run_this_first', dag=dag)

Просмотреть файл

@ -67,10 +67,9 @@ t4 = SimpleHttpOperator(
sensor = HttpSensor(
task_id='http_sensor_check',
conn_id='http_default',
endpoint='api/v1.0/apps',
endpoint='',
params={},
headers={"Content-Type": "application/json"},
response_check=lambda response: True if "collation" in response.content else False,
response_check=lambda response: True if "Google" in response.content else False,
poke_interval=5,
dag=dag)

Просмотреть файл

@ -7,15 +7,17 @@ from datetime import datetime, timedelta
import time
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
seven_days_ago = datetime.combine(
datetime.today() - timedelta(7), datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='example_python_operator', default_args=args)
dag = DAG(
dag_id='example_python_operator', default_args=args,
schedule_interval=None)
def my_sleeping_function(random_base):

Просмотреть файл

@ -1,11 +1,21 @@
from __future__ import print_function
import airflow
import datetime
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(
datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
'provide_context': True
}
dag = airflow.DAG(
'example_xcom',
start_date=datetime.datetime(2015, 1, 1),
default_args={'owner': 'airflow', 'provide_context': True})
start_date=datetime(2015, 1, 1),
schedule_interval="@once",
default_args=args)
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
@ -42,4 +52,4 @@ push2 = airflow.operators.PythonOperator(
pull = airflow.operators.PythonOperator(
task_id='puller', dag=dag, python_callable=puller)
pull.set_upstream([push1, push2])
pull.set_upstream([push1, push2])

Просмотреть файл

@ -342,31 +342,32 @@ class SchedulerJob(BaseJob):
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
"""
DagRun = models.DagRun
session = settings.Session()
qry = session.query(func.max(DagRun.execution_date)).filter(and_(
DagRun.dag_id == dag.dag_id,
DagRun.external_trigger == False
))
last_scheduled_run = qry.scalar()
if not last_scheduled_run or last_scheduled_run <= datetime.now():
if last_scheduled_run:
if dag.schedule_interval:
DagRun = models.DagRun
session = settings.Session()
qry = session.query(func.max(DagRun.execution_date)).filter(and_(
DagRun.dag_id == dag.dag_id,
DagRun.external_trigger == False
))
last_scheduled_run = qry.scalar()
next_run_date = None
if not last_scheduled_run:
next_run_date = min([t.start_date for t in dag.tasks])
elif dag.schedule_interval != '@once':
next_run_date = dag.following_schedule(last_scheduled_run)
else:
next_run_date = dag.default_args['start_date']
if not next_run_date:
raise Exception('no next_run_date defined!')
next_run = DagRun(
dag_id=dag.dag_id,
run_id='scheduled',
execution_date=next_run_date,
state=State.RUNNING,
external_trigger=False
)
session.add(next_run)
session.commit()
elif dag.schedule_interval == '@once' and not last_scheduled_run:
next_run_date = datetime.now()
if next_run_date and next_run_date <= datetime.now():
next_run = DagRun(
dag_id=dag.dag_id,
run_id='scheduled__' + next_run_date.isoformat(),
execution_date=next_run_date,
state=State.RUNNING,
external_trigger=False
)
session.add(next_run)
session.commit()
def process_dag(self, dag, executor):
"""
@ -387,8 +388,7 @@ class SchedulerJob(BaseJob):
executors.LocalExecutor, executors.SequentialExecutor):
pickle_id = dag.pickle(session).id
db_dag = session.query(
DagModel).filter(DagModel.dag_id == dag.dag_id).first()
db_dag = session.query(DagModel).filter_by(dag_id=dag.dag_id).first()
last_scheduler_run = db_dag.last_scheduler_run or datetime(2000, 1, 1)
secs_since_last = (
datetime.now() - last_scheduler_run).total_seconds()
@ -404,7 +404,6 @@ class SchedulerJob(BaseJob):
session.commit()
active_runs = dag.get_active_runs()
for task, dttm in product(dag.tasks, active_runs):
if task.adhoc:
continue
@ -413,7 +412,7 @@ class SchedulerJob(BaseJob):
if ti.state in (State.RUNNING, State.QUEUED, State.SUCCESS):
continue
elif ti.is_runnable(flag_upstream_failed=True):
logging.debug('Queuing next run: ' + str(ti))
logging.debug('Firing task: {}'.format(ti))
executor.queue_task_instance(ti, pickle_id=pickle_id)
# Releasing the lock
@ -454,15 +453,37 @@ class SchedulerJob(BaseJob):
for pool, tis in list(d.items()):
open_slots = pools[pool].open_slots(session=session)
if open_slots > 0:
tis = sorted(
tis, key=lambda ti: (-ti.priority_weight, ti.start_date))
for ti in tis[:open_slots]:
task = None
try:
task = dagbag.dags[ti.dag_id].get_task(ti.task_id)
except:
logging.error("Queued task {} seems gone".format(ti))
if not open_slots:
return
tis = sorted(
tis, key=lambda ti: (-ti.priority_weight, ti.start_date))
for ti in tis:
if not open_slots:
return
task = None
try:
task = dagbag.dags[ti.dag_id].get_task(ti.task_id)
except:
logging.error("Queued task {} seems gone".format(ti))
session.delete(ti)
if task:
ti.task = task
# picklin'
dag = dagbag.dags[ti.dag_id]
pickle_id = None
if self.do_pickle and self.executor.__class__ not in (
executors.LocalExecutor,
executors.SequentialExecutor):
pickle_id = dag.pickle(session).id
if (
ti.are_dependencies_met() and
not task.dag.concurrency_reached):
executor.queue_task_instance(
ti, force=True, pickle_id=pickle_id)
open_slots -= 1
else:
session.delete(ti)
continue
ti.task = task
@ -627,8 +648,7 @@ class BackfillJob(BaseJob):
start_date = start_date or task.start_date
end_date = end_date or task.end_date or datetime.now()
for dttm in utils.date_range(
start_date, end_date, task.dag.schedule_interval):
for dttm in self.dag.date_range(start_date, end_date=end_date):
ti = models.TaskInstance(task, dttm)
tasks_to_run[ti.key] = ti

Просмотреть файл

@ -25,6 +25,8 @@ target_metadata = models.Base.metadata
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
COMPARE_TYPE = False
def run_migrations_offline():
"""Run migrations in 'offline' mode.
@ -40,7 +42,8 @@ def run_migrations_offline():
"""
url = configuration.get('core', 'SQL_ALCHEMY_CONN')
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True)
url=url, target_metadata=target_metadata, literal_binds=True,
compare_type=COMPARE_TYPE)
with context.begin_transaction():
context.run_migrations()
@ -58,7 +61,8 @@ def run_migrations_online():
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
target_metadata=target_metadata,
compare_type=COMPARE_TYPE,
)
with context.begin_transaction():

Просмотреть файл

@ -1,32 +0,0 @@
"""add DagRun
Revision ID: 19054f4ff36
Revises: 338e90f54d61
Create Date: 2015-10-12 09:55:52.475712
"""
# revision identifiers, used by Alembic.
revision = '19054f4ff36'
down_revision = '338e90f54d61'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'dag_run',
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.Column('execution_date', sa.DateTime(), nullable=False),
sa.Column('state', sa.String(length=50), nullable=True),
sa.Column('run_id', sa.String(length=250), nullable=True),
sa.Column('external_trigger', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('dag_id', 'execution_date')
)
def downgrade():
op.drop_table('dag_run')

Просмотреть файл

@ -0,0 +1,35 @@
"""add dagrun
Revision ID: 1b38cef5b76e
Revises: 52d714495f0
Create Date: 2015-10-27 08:31:48.475140
"""
# revision identifiers, used by Alembic.
revision = '1b38cef5b76e'
down_revision = '52d714495f0'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade():
op.create_table('dag_run',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=True),
sa.Column('execution_date', sa.DateTime(), nullable=True),
sa.Column('state', sa.String(length=50), nullable=True),
sa.Column('run_id', sa.String(length=250), nullable=True),
sa.Column('external_trigger', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('dag_id', 'execution_date'),
sa.UniqueConstraint('dag_id', 'run_id'),
)
def downgrade():
op.drop_table('dag_run')

Просмотреть файл

@ -0,0 +1,28 @@
"""task_duration
Revision ID: 2e541a1dcfed
Revises: 1b38cef5b76e
Create Date: 2015-10-28 20:38:41.266143
"""
# revision identifiers, used by Alembic.
revision = '2e541a1dcfed'
down_revision = '1b38cef5b76e'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade():
op.alter_column('task_instance', 'duration',
existing_type=mysql.INTEGER(display_width=11),
type_=sa.Float(),
existing_nullable=True)
def downgrade():
pass

Просмотреть файл

@ -0,0 +1,25 @@
"""dagrun_config
Revision ID: 40e67319e3a9
Revises: 2e541a1dcfed
Create Date: 2015-10-29 08:36:31.726728
"""
# revision identifiers, used by Alembic.
revision = '40e67319e3a9'
down_revision = '2e541a1dcfed'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
def upgrade():
op.add_column('dag_run', sa.Column('conf', sa.PickleType(), nullable=True))
def downgrade():
op.drop_column('dag_run', 'conf')

Просмотреть файл

@ -6,7 +6,6 @@ from __future__ import unicode_literals
from future.standard_library import install_aliases
install_aliases()
from builtins import str
from past.builtins import basestring
from builtins import object, bytes
import copy
from datetime import datetime, timedelta
@ -26,7 +25,7 @@ from urllib.parse import urlparse
from sqlalchemy import (
Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType,
Index, BigInteger)
Index, Float)
from sqlalchemy import case, func, or_, and_
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.dialects.mysql import LONGTEXT
@ -62,7 +61,7 @@ else:
LongText = Text
def clear_task_instances(tis, session):
def clear_task_instances(tis, session, activate_dag_runs=True):
'''
Clears a set of task instances, but makes sure the running ones
get killed.
@ -79,6 +78,15 @@ def clear_task_instances(tis, session):
from airflow.jobs import BaseJob as BJ
for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all():
job.state = State.SHUTDOWN
if activate_dag_runs:
execution_dates = {ti.execution_date for ti in tis}
dag_ids = {ti.dag_id for ti in tis}
drs = session.query(DagRun).filter(
DagRun.dag_id.in_(dag_ids),
DagRun.execution_date.in_(execution_dates),
).all()
for dr in drs:
dr.state = State.RUNNING
class DagBag(object):
@ -195,7 +203,6 @@ class DagBag(object):
dag.full_filepath = filepath
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
# dag.pickle()
self.file_last_changed[filepath] = dttm
@ -504,7 +511,7 @@ class TaskInstance(Base):
execution_date = Column(DateTime, primary_key=True)
start_date = Column(DateTime)
end_date = Column(DateTime)
duration = Column(Integer)
duration = Column(Float)
state = Column(String(20))
try_number = Column(Integer)
hostname = Column(String(1000))
@ -909,7 +916,7 @@ class TaskInstance(Base):
logging.info(msg.format(**locals()))
self.start_date = datetime.now()
if not force and self.pool:
if not force and (self.pool or self.task.dag.concurrency_reached):
# If a pool is set for this task, marking the task instance
# as QUEUED
self.state = State.QUEUED
@ -1043,7 +1050,8 @@ class TaskInstance(Base):
session.commit()
logging.error(str(error))
def get_template_context(self):
@provide_session
def get_template_context(self, session=None):
task = self.task
from airflow import macros
tables = None
@ -1057,8 +1065,22 @@ class TaskInstance(Base):
ti_key_str = ti_key_str.format(**locals())
params = {}
if hasattr(task, 'dag') and task.dag.params:
params.update(task.dag.params)
run_id = ''
dag_run = None
if hasattr(task, 'dag'):
if task.dag.params:
params.update(task.dag.params)
dag_run = (
session.query(DagRun)
.filter_by(
dag_id=task.dag.dag_id,
execution_date=self.execution_date)
.first()
)
run_id = dag_run.run_id if dag_run else None
session.expunge_all()
session.commit()
if task.params:
params.update(task.params)
@ -1070,6 +1092,8 @@ class TaskInstance(Base):
'END_DATE': ds,
'ds_nodash': ds_nodash,
'end_date': ds,
'dag_run': dag_run,
'run_id': run_id,
'execution_date': self.execution_date,
'latest_date': ds,
'macros': macros,
@ -1114,7 +1138,7 @@ class TaskInstance(Base):
def set_duration(self):
if self.end_date and self.start_date:
self.duration = (self.end_date - self.start_date).seconds
self.duration = (self.end_date - self.start_date).total_seconds()
else:
self.duration = None
@ -1481,7 +1505,7 @@ class BaseOperator(object):
schedule_interval as it may not be attached to a DAG.
"""
if hasattr(self, 'dag') and self.dag:
return self.dag.schedule_interval
return self.dag._schedule_interval
else:
return self._schedule_interval
@ -1549,7 +1573,7 @@ class BaseOperator(object):
all strings in it.
'''
rt = self.render_template
if isinstance(content, basestring):
if isinstance(content, six.string_types):
result = jinja_env.from_string(content).render(**context)
elif isinstance(content, (list, tuple)):
result = [rt(e, context) for e in content]
@ -1575,9 +1599,12 @@ class BaseOperator(object):
else jinja2.Environment(cache_size=0)
exts = self.__class__.template_ext
return jinja_env.get_template(content).render(**context) \
if isinstance(content, basestring) and any([content.endswith(ext) for ext in exts]) \
else self.render_template_from_field(content, context, jinja_env)
if (
isinstance(content, six.string_types) and
any([content.endswith(ext) for ext in exts])):
return jinja_env.get_template(content).render(**context)
else:
return self.render_template_from_field(content, context, jinja_env)
def prepare_template(self):
'''
@ -1592,7 +1619,7 @@ class BaseOperator(object):
# Getting the content of files for template_field / template_ext
for attr in self.template_fields:
content = getattr(self, attr)
if (content and isinstance(content, basestring) and
if (content and isinstance(content, six.string_types) and
any([content.endswith(ext) for ext in self.template_ext])):
env = self.dag.get_template_env()
try:
@ -1698,8 +1725,7 @@ class BaseOperator(object):
start_date = start_date or self.start_date
end_date = end_date or self.end_date or datetime.now()
for dt in utils.date_range(
start_date, end_date, self.schedule_interval):
for dt in self.dag.date_range(start_date, end_date=end_date):
TaskInstance(self, dt).run(
mark_success=mark_success,
ignore_dependencies=ignore_dependencies,
@ -1709,7 +1735,7 @@ class BaseOperator(object):
logging.info('Dry run')
for attr in self.template_fields:
content = getattr(self, attr)
if content and isinstance(content, basestring):
if content and isinstance(content, six.string_types):
logging.info('Rendering template for {0}'.format(attr))
logging.info(content)
@ -1891,6 +1917,9 @@ class DAG(object):
accessible in templates, namespaced under `params`. These
params can be overridden at the task level.
:type params: dict
:param concurrency: the number of task instances allowed to run
concurrently
:type concurrency: int
"""
def __init__(
@ -1901,6 +1930,7 @@ class DAG(object):
template_searchpath=None,
user_defined_macros=None,
default_args=None,
concurrency=configuration.getint('core', 'dag_concurrency'),
params=None):
self.user_defined_macros = user_defined_macros
@ -1912,13 +1942,20 @@ class DAG(object):
self.start_date = start_date
self.end_date = end_date or datetime.now()
self.schedule_interval = schedule_interval
if schedule_interval in utils.cron_presets:
self._schedule_interval = utils.cron_presets.get(schedule_interval)
elif schedule_interval == '@once':
self._schedule_interval = None
else:
self._schedule_interval = schedule_interval
self.full_filepath = full_filepath if full_filepath else ''
if isinstance(template_searchpath, basestring):
if isinstance(template_searchpath, six.string_types):
template_searchpath = [template_searchpath]
self.template_searchpath = template_searchpath
self.parent_dag = None # Gets set when DAGs are loaded
self.last_loaded = datetime.now()
self.safe_dag_id = dag_id.replace('.', '__dot__')
self.concurrency = concurrency
self._comps = {
'dag_id',
@ -1957,19 +1994,26 @@ class DAG(object):
hash_components.append(repr(val))
return hash(tuple(hash_components))
def date_range(self, start_date, num=None, end_date=datetime.now()):
if num:
end_date = None
return utils.date_range(
start_date=start_date, end_date=end_date,
num=num, delta=self._schedule_interval)
def following_schedule(self, dttm):
if isinstance(self.schedule_interval, six.string_types):
cron = croniter(self.schedule_interval, dttm)
if isinstance(self._schedule_interval, six.string_types):
cron = croniter(self._schedule_interval, dttm)
return cron.get_next(datetime)
else:
return dttm + self.schedule_interval
elif isinstance(self._schedule_interval, timedelta):
return dttm + self._schedule_interval
def previous_schedule(self, dttm):
if isinstance(self.schedule_interval, six.string_types):
cron = croniter(self.schedule_interval, dttm)
if isinstance(self._schedule_interval, six.string_types):
cron = croniter(self._schedule_interval, dttm)
return cron.get_prev(datetime)
else:
return dttm - self.schedule_interval
elif isinstance(self._schedule_interval, timedelta):
return dttm - self._schedule_interval
@property
def task_ids(self):
@ -1995,6 +2039,21 @@ class DAG(object):
def owner(self):
return ", ".join(list(set([t.owner for t in self.tasks])))
@property
@provide_session
def concurrency_reached(self, session=None):
"""
Returns a boolean as to whether the concurrency limit for this DAG
has been reached
"""
TI = TaskInstance
qry = session.query(func.count(TI)).filter(
TI.dag_id == self.dag_id,
TI.task_id.in_(self.task_ids),
TI.state == State.RUNNING,
)
return qry.scalar() >= self.concurrency
@property
def latest_execution_date(self):
"""
@ -2030,8 +2089,7 @@ class DAG(object):
"""
TI = TaskInstance
session = settings.Session()
# Checking state of active DagRuns
active_runs = []
active_dates = []
active_runs = (
session.query(DagRun)
.filter(
@ -2040,7 +2098,7 @@ class DAG(object):
.all()
)
for run in active_runs:
logging.info("Checking state for " + str(run))
logging.info("Checking state for {}".format(run))
task_instances = session.query(TI).filter(
TI.dag_id == run.dag_id,
TI.task_id.in_(self.task_ids),
@ -2051,13 +2109,18 @@ class DAG(object):
if State.FAILED in task_states:
logging.info('Marking run {} failed'.format(run))
run.state = State.FAILED
elif set(task_states) == set([State.SUCCESS]):
elif len(
set(task_states) |
set([State.SUCCESS, State.SKIPPED])
) == 2:
logging.info('Marking run {} successful'.format(run))
run.state = State.SUCCESS
else:
active_runs.append(run.execution_date)
active_dates.append(run.execution_date)
else:
active_dates.append(run.execution_date)
session.commit()
return active_runs
return active_dates
def resolve_template_files(self):
for t in self.tasks:
@ -2129,19 +2192,27 @@ class DAG(object):
def roots(self):
return [t for t in self.tasks if not t.downstream_list]
@provide_session
def set_dag_runs_state(
self, start_date, end_date, state=State.RUNNING, session=None):
dates = utils.date_range(start_date, end_date)
drs = session.query(DagModel).filter_by(dag_id=self.dag_id).all()
for dr in drs:
dr.state = State.RUNNING
def clear(
self, start_date=None, end_date=None,
only_failed=False,
only_running=False,
confirm_prompt=False,
include_subdags=True,
reset_dag_runs=True,
dry_run=False):
session = settings.Session()
"""
Clears a set of task instances associated with the current dag for
a specified date range.
"""
TI = TaskInstance
tis = session.query(TI)
if include_subdags:
@ -2171,6 +2242,7 @@ class DAG(object):
return tis
count = tis.count()
do_it = True
if count == 0:
print("Nothing to clear.")
return 0
@ -2180,13 +2252,15 @@ class DAG(object):
"You are about to delete these {count} tasks:\n"
"{ti_list}\n\n"
"Are you sure? (yes/no): ").format(**locals())
if utils.ask_yesno(question):
clear_task_instances(tis, session)
else:
count = 0
print("Bail. Nothing was cleared.")
else:
do_it = utils.ask_yesno(question)
if do_it:
clear_task_instances(tis, session)
if reset_dag_runs:
self.set_dag_runs_state(start_date, end_date, session=session)
else:
count = 0
print("Bail. Nothing was cleared.")
session.commit()
session.close()
@ -2571,15 +2645,23 @@ class DagRun(Base):
"""
__tablename__ = "dag_run"
dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, primary_key=True)
state = Column(String(50))
id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
execution_date = Column(DateTime, default=datetime.now())
state = Column(String(50), default=State.RUNNING)
run_id = Column(String(ID_LEN))
external_trigger = Column(Boolean, default=False)
external_trigger = Column(Boolean, default=True)
conf = Column(PickleType)
__table_args__ = (
Index('dr_run_id', dag_id, run_id, unique=True),
)
def __repr__(self):
return '<DagRun {dag_id} @ {execution_date}: {run_id}, \
externally triggered: {external_trigger}>'.format(
return (
'<DagRun {dag_id} @ {execution_date}: {run_id}, '
'externally triggered: {external_trigger}>'
).format(
dag_id=self.dag_id,
execution_date=self.execution_date,
run_id=self.run_id,

Просмотреть файл

@ -71,12 +71,9 @@ class SqlSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, conn_id, sql, *args, **kwargs):
super(SqlSensor, self).__init__(*args, **kwargs)
self.sql = sql
self.conn_id = conn_id
super(SqlSensor, self).__init__(*args, **kwargs)
def poke(self, context):
hook = BaseHook.get_connection(self.conn_id).get_hook()
@ -447,10 +444,9 @@ class TimeDeltaSensor(BaseSensorOperator):
self.delta = delta
def poke(self, context):
target_dttm = (
context['execution_date'] +
context['dag'].schedule_interval +
self.delta)
dag = context['dag']
target_dttm = dag.following_schedule(context['execution_date'])
target_dttm += self.delta
logging.info('Checking if the time ({0}) has come'.format(target_dttm))
return datetime.now() > target_dttm

Просмотреть файл

@ -85,7 +85,10 @@ class State(object):
@classmethod
def color(cls, state):
return cls.state_color[state]
if state in cls.state_color:
return cls.state_color[state]
else:
return 'white'
@classmethod
def runnable(cls):
@ -94,6 +97,14 @@ class State(object):
cls.SKIPPED]
cron_presets = {
'@hourly': '0 * * * *',
'@daily': '0 0 * * *',
'@weekly': '0 0 * * 0',
'@monthly': '0 0 1 * *',
'@yearly': '0 0 1 1 *',
}
def provide_session(func):
"""
Function decorator that provides a session if it isn't provided.
@ -284,31 +295,49 @@ def validate_key(k, max_length=250):
return True
def date_range(start_date, end_date=datetime.now(), delta=timedelta(1)):
def date_range(
start_date,
end_date=None,
num=None,
delta=None):
"""
Get a set of dates as a list based on a start, end and delta, delta
can be something that can be added to ``datetime.datetime``
or a cron expression as a ``str``
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3))
[datetime.datetime(2016, 1, 1, 0, 0),
datetime.datetime(2016, 1, 2, 0, 0),
datetime.datetime(2016, 1, 3, 0, 0)]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), '0 0 * * *')
[datetime.datetime(2016, 1, 1, 0, 0),
datetime.datetime(2016, 1, 2, 0, 0),
datetime.datetime(2016, 1, 3, 0, 0)]
:param start_date: anchor date to start the series from
:type start_date: datetime.datetime
:param end_date: right boundary for the date range
:type end_date: datetime.datetime
:param num: alternatively to end_date, you can specify the number of
number of entries you want in the range. This number can be negative,
output will always be sorted regardless
:type num: int
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1))
[datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), datetime.datetime(2016, 1, 3, 0, 0)]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta='0 0 * * *')
[datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), datetime.datetime(2016, 1, 3, 0, 0)]
>>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *")
[datetime.datetime(2016, 1, 1, 0, 0),
datetime.datetime(2016, 2, 1, 0, 0),
datetime.datetime(2016, 3, 1, 0, 0)]
[datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0), datetime.datetime(2016, 3, 1, 0, 0)]
"""
if not delta:
return []
if end_date and start_date > end_date:
raise Exception("Wait. start_date needs to be before end_date")
if end_date and num:
raise Exception("Wait. Either specify end_date OR num")
if not end_date and not num:
end_date = datetime.now()
delta_iscron = False
if isinstance(delta, six.string_types):
delta_iscron = True
cron = croniter(delta, start_date)
else:
delta = abs(delta)
l = []
if end_date >= start_date:
if end_date:
while start_date <= end_date:
l.append(start_date)
if delta_iscron:
@ -316,8 +345,19 @@ def date_range(start_date, end_date=datetime.now(), delta=timedelta(1)):
else:
start_date += delta
else:
raise AirflowException("start_date can't be after end_date")
return l
for i in range(abs(num)):
l.append(start_date)
if delta_iscron:
if num > 0:
start_date = cron.get_next(datetime)
else:
start_date = cron.get_prev(datetime)
else:
if num > 0:
start_date += delta
else:
start_date -= delta
return sorted(l)
def json_ser(obj):

Просмотреть файл

@ -41,25 +41,46 @@ def create_app(config=None):
index_view=views.HomeView(endpoint='', url='/admin'),
template_mode='bootstrap3',
)
av = admin.add_view
vs = views
av(vs.Airflow(name='DAGs'))
admin.add_view(views.Airflow(name='DAGs'))
av(vs.QueryView(name='Ad Hoc Query', category="Data Profiling"))
av(vs.ChartModelView(
models.Chart, Session, name="Charts", category="Data Profiling"))
av(vs.KnowEventView(
models.KnownEvent,
Session, name="Known Events", category="Data Profiling"))
av(vs.SlaMissModelView(
models.SlaMiss,
Session, name="SLA Misses", category="Browse"))
av(vs.TaskInstanceModelView(models.TaskInstance,
Session, name="Task Instances", category="Browse"))
av(vs.LogModelView(
models.Log, Session, name="Logs", category="Browse"))
av(vs.JobModelView(
jobs.BaseJob, Session, name="Jobs", category="Browse"))
av(vs.PoolModelView(
models.Pool, Session, name="Pools", category="Admin"))
av(vs.ConfigurationView(
name='Configuration', category="Admin"))
av(vs.UserModelView(
models.User, Session, name="Users", category="Admin"))
av(vs.ConnectionModelView(
models.Connection, Session, name="Connections", category="Admin"))
av(vs.VariableView(
models.Variable, Session, name="Variables", category="Admin"))
admin.add_view(views.SlaMissModelView(models.SlaMiss, Session, name="SLA Misses", category="Browse"))
admin.add_view(views.TaskInstanceModelView(models.TaskInstance, Session, name="Task Instances", category="Browse"))
admin.add_view(views.LogModelView(models.Log, Session, name="Logs", category="Browse"))
admin.add_view(views.JobModelView(jobs.BaseJob, Session, name="Jobs", category="Browse"))
admin.add_view(views.QueryView(name='Ad Hoc Query', category="Data Profiling"))
admin.add_view(views.ChartModelView(models.Chart, Session, name="Charts", category="Data Profiling"))
admin.add_view(views.KnowEventView(models.KnownEvent, Session, name="Known Events", category="Data Profiling"))
admin.add_view(views.PoolModelView(models.Pool, Session, name="Pools", category="Admin"))
admin.add_view(views.ConfigurationView(name='Configuration', category="Admin"))
admin.add_view(views.UserModelView(models.User, Session, name="Users", category="Admin"))
admin.add_view(views.ConnectionModelView(models.Connection, Session, name="Connections", category="Admin"))
admin.add_view(views.VariableView(models.Variable, Session, name="Variables", category="Admin"))
admin.add_link(base.MenuLink(category='Docs', name='Documentation', url='http://pythonhosted.org/airflow/'))
admin.add_link(base.MenuLink(category='Docs',name='Github',url='https://github.com/airbnb/airflow'))
admin.add_link(base.MenuLink(
category='Docs', name='Documentation',
url='http://pythonhosted.org/airflow/'))
admin.add_link(
base.MenuLink(category='Docs',
name='Github',url='https://github.com/airbnb/airflow'))
admin.add_view(views.DagModelView(models.DagModel, Session, name=None))
av(vs.DagRunModelView(
models.DagRun, Session, name="DAG Runs", category="Browse"))
av(vs.DagModelView(models.DagModel, Session, name=None))
# Hack to not add this view to the menu
admin._menu = admin._menu[:-1]
@ -70,7 +91,6 @@ def create_app(config=None):
for v in admin_views:
admin.add_view(v)
for bp in flask_blueprints:
print(bp)
app.register_blueprint(bp)
for ml in menu_links:
admin.add_link(ml)

Просмотреть файл

@ -16,17 +16,6 @@ class DateTimeForm(Form):
"Execution date", widget=DateTimePickerWidget())
class GraphForm(Form):
execution_date = DateTimeField(
"Execution date", widget=DateTimePickerWidget())
arrange = SelectField("Layout", choices=(
('LR', "Left->Right"),
('RL', "Right->Left"),
('TB', "Top->Bottom"),
('BT', "Bottom->Top"),
))
class TreeForm(Form):
base_date = DateTimeField(
"Anchor date", widget=DateTimePickerWidget(), default=datetime.now())

Просмотреть файл

@ -7,54 +7,64 @@
{% endblock %}
{% block body %}
<h3>
{% if dag.parent_dag %}
<span style='color:#AAA;'>SUBDAG:</span> <span>{{ dag.dag_id }}</span>
{% else %}
<span style='color:#AAA;'>DAG:</span> <span>{{ dag.dag_id }}</span>
{% endif %}
{% if root %}
<span style='color:#AAA;'>ROOT:</span> <span>{{ root }}</span>
{% endif %}
</h3>
<ul class="nav nav-pills">
{% if dag.parent_dag %}
<li class="never_active"><a href="{{ url_for("airflow.graph", dag_id=dag.parent_dag.dag_id) }}">
<span class="glyphicon glyphicon-arrow-left" aria-hidden="true"></span>
Back to {{ dag.parent_dag.dag_id }}</a>
</li>
{% endif %}
<li><a href="{{ url_for("airflow.graph", dag_id=dag.dag_id, root=root) }}">
<span class="glyphicon glyphicon-certificate" aria-hidden="true"></span>
Graph View</a></li>
<li><a href="{{ url_for("airflow.tree", dag_id=dag.dag_id, num_runs=25, root=root) }}">
<span class="glyphicon glyphicon-tree-deciduous" aria-hidden="true"></span>
Tree View
</a></li>
<li><a href="{{ url_for("airflow.duration", dag_id=dag.dag_id, days=30, root=root) }}">
<span class="glyphicon glyphicon-stats" aria-hidden="true"></span>
Task Duration
</a></li>
<li>
<a href="{{ url_for("airflow.landing_times", dag_id=dag.dag_id, days=30, root=root) }}">
<span class="glyphicon glyphicon-plane" aria-hidden="true"></span>
Landing Times
<div>
<h3 class="pull-left">
{% if dag.parent_dag %}
<span class="pull-left" style='color:#AAA;'>SUBDAG: </span> <span> {{ dag.dag_id }}</span>
{% else %}
<span class="pull-left" style='color:#AAA;'>DAG: </span> <span> {{ dag.dag_id }}</span>
{% endif %}
{% if root %}
<span class="pull-left" style='color:#AAA;'>ROOT: </span> <span> {{ root }}</span>
{% endif %}
</h3>
<h4 class="pull-right">
<a class="label label-default" href="/admin/dagrun/?flt2_dag_id_equals={{ dag.dag_id }}">
schedule: {{ dag.schedule_interval }}
</a>
</li>
<li>
<a href="{{ url_for("airflow.gantt", dag_id=dag.dag_id, root=root) }}">
<span class="glyphicon glyphicon-align-left" aria-hidden="true"></span>
<i class="icon-align-left"></i>
Gantt
</a>
</li>
<li>
<a href="{{ url_for("airflow.code", dag_id=dag.dag_id, root=root) }}">
<span class="glyphicon glyphicon-flash" aria-hidden="true"></span>
Code
</a>
</li>
</ul>
</h4>
</div>
<div class="clearfix"></div>
<div>
<ul class="nav nav-pills">
{% if dag.parent_dag %}
<li class="never_active"><a href="{{ url_for("airflow.graph", dag_id=dag.parent_dag.dag_id) }}">
<span class="glyphicon glyphicon-arrow-left" aria-hidden="true"></span>
Back to {{ dag.parent_dag.dag_id }}</a>
</li>
{% endif %}
<li><a href="{{ url_for("airflow.graph", dag_id=dag.dag_id, root=root) }}">
<span class="glyphicon glyphicon-certificate" aria-hidden="true"></span>
Graph View</a></li>
<li><a href="{{ url_for("airflow.tree", dag_id=dag.dag_id, num_runs=25, root=root) }}">
<span class="glyphicon glyphicon-tree-deciduous" aria-hidden="true"></span>
Tree View
</a></li>
<li><a href="{{ url_for("airflow.duration", dag_id=dag.dag_id, days=30, root=root) }}">
<span class="glyphicon glyphicon-stats" aria-hidden="true"></span>
Task Duration
</a></li>
<li>
<a href="{{ url_for("airflow.landing_times", dag_id=dag.dag_id, days=30, root=root) }}">
<span class="glyphicon glyphicon-plane" aria-hidden="true"></span>
Landing Times
</a>
</li>
<li>
<a href="{{ url_for("airflow.gantt", dag_id=dag.dag_id, root=root) }}">
<span class="glyphicon glyphicon-align-left" aria-hidden="true"></span>
<i class="icon-align-left"></i>
Gantt
</a>
</li>
<li>
<a href="{{ url_for("airflow.code", dag_id=dag.dag_id, root=root) }}">
<span class="glyphicon glyphicon-flash" aria-hidden="true"></span>
Code
</a>
</li>
</ul>
</div>
<hr>
<!-- Modal -->
<div class="modal fade" id="myModal"

Просмотреть файл

@ -18,6 +18,7 @@
<th></th>
<th width="12"><span id="pause_header"class="glyphicon glyphicon-info-sign" title="Use this toggle to pause a DAG. The scheduler won't schedule new tasks instances for a paused DAG. Tasks already running at pause time won't be affected."></span></th>
<th>DAG</th>
<th>Schedule</th>
<th>Owner</th>
<th style="padding-left: 5px;">Statuses
<img id="loading" width="15" src="{{ url_for("static", filename="loading.gif") }}">
@ -54,7 +55,10 @@
<span class="glyphicon glyphicon-info-sign" class="info" aria-hidden="true" title="This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence."></span>
{% endif %}
</td>
<td>{{ dag.owner if dag else orm_dags[dag_id].owners }}</td>
<td>
<a class="label label-default" href="/admin/dagrun/?flt2_dag_id_equals={{ dag.dag_id }}">
{{ dag.schedule_interval }}</a></td>
<td>{{ dag.owner if dag else orm_dags[dag_id].owners }}</td>
<td style="padding:0px; width:180px; height:10px;">
<svg height="10" width="10" id='{{ dag.safe_dag_id }}' style="display: block;"></svg>
</td>

Просмотреть файл

@ -20,6 +20,7 @@
{% endif %}
<div class="form-inline">
<form method="get" style="float:left;">
{{ state_token }}
Run:
{{ form.execution_date(class_="form-control") | safe }}
Layout:

Просмотреть файл

@ -119,7 +119,7 @@ var svg = d3.select("svg")
d3.svg.axis()
.scale(xScale)
.orient("top")
.ticks(num_square/5)
.ticks(2)
);
function node_class(d) {
@ -174,13 +174,16 @@ function update(source) {
.attr("class", "task")
.attr("data-toggle", "tooltip")
.attr("title", function(d){
tt = "operator: " + d.operator + "<br/>";
tt += "depends_on_past: " + d.depends_on_past + "<br/>";
tt += "upstream: " + d.num_dep + "<br/>";
tt += "retries: " + d.retries + "<br/>";
tt += "owner: " + d.owner + "<br/>";
tt += "start_date: " + d.start_date + "<br/>";
tt += "end_date: " + d.end_date + "<br/>";
var tt = "";
if (d.operator != undefined) {
tt += "operator: " + d.operator + "<br/>";
tt += "depends_on_past: " + d.depends_on_past + "<br/>";
tt += "upstream: " + d.num_dep + "<br/>";
tt += "retries: " + d.retries + "<br/>";
tt += "owner: " + d.owner + "<br/>";
tt += "start_date: " + d.start_date + "<br/>";
tt += "end_date: " + d.end_date + "<br/>";
}
return tt;
})
.attr("height", barHeight)
@ -205,15 +208,25 @@ function update(source) {
.enter()
.append('rect')
.on("click", function(d){
if(nodeobj[d.task_id].operator=='SubDagOperator')
if(d.task_id === undefined){
window.location = '/admin/dagrun/edit/?id=' + d.id;
}
else if(nodeobj[d.task_id].operator=='SubDagOperator')
call_modal(d.task_id, d.execution_date, true);
else
call_modal(d.task_id, d.execution_date);
})
.attr("class", function(d) {return "state " + d.state})
.attr("data-toggle", "tooltip")
.attr("rx", function(d) {return (d.run_id != undefined)? "5": "0"})
.attr("ry", function(d) {return (d.run_id != undefined)? "5": "0"})
.style("stroke-width", function(d) {return (d.run_id != undefined)? "2": "1"})
.style("stroke-opacity", function(d) {return d.external_trigger ? "0": "1"})
.attr("title", function(d){
s = "Run: " + d.execution_date + "<br>";
if(d.run_id != undefined){
s += "run_id: <nobr>" + d.run_id + "</nobr><br>";
}
if(d.start_date != undefined){
s += "Started: " + d.start_date + "<br>";
s += "Ended: " + d.end_date + "<br>";
@ -232,8 +245,7 @@ function update(source) {
})
.on('mouseout', function(d,i) {
d3.select(this).transition()
.style('stroke', "black")
.style('stroke-width', 1)
.style("stroke-width", function(d) {return (d.run_id != undefined)? "2": "1"})
}) ;

Просмотреть файл

@ -22,6 +22,7 @@ from sqlalchemy import or_
from flask import redirect, url_for, request, Markup, Response, current_app, render_template
from flask.ext.admin import BaseView, expose, AdminIndexView
from flask.ext.admin.contrib.sqla import ModelView
from flask.ext.admin.actions import action
from flask.ext.login import current_user, flash, logout_user, login_required
from flask._compat import PY2
@ -31,7 +32,7 @@ import json
from wtforms import (
widgets,
Form, DateTimeField, SelectField, TextAreaField, PasswordField, StringField)
Form, SelectField, TextAreaField, PasswordField, StringField)
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
@ -40,13 +41,14 @@ import airflow
from airflow import models
from airflow.settings import Session
from airflow import configuration
from airflow import login
from airflow import utils
from airflow.utils import AirflowException
from airflow.www import utils as wwwutils
from airflow import settings
from airflow.models import State
from airflow.www.forms import DateTimeForm, TreeForm, GraphForm
from airflow.www.forms import DateTimeForm, TreeForm
QUERY_LIMIT = 100000
CHART_LIMIT = 200000
@ -106,11 +108,15 @@ def task_instance_link(v, c, m, p):
""".format(**locals()))
def state_f(v, c, m, p):
color = State.color(m.state)
def state_token(state):
color = State.color(state)
return Markup(
'<span class="label" style="background-color:{color};">'
'{m.state}</span>'.format(**locals()))
'{state}</span>'.format(**locals()))
def state_f(v, c, m, p):
return state_token(m.state)
def duration_f(v, c, m, p):
@ -930,7 +936,6 @@ class Airflow(BaseView):
downstream = request.args.get('downstream') == "true"
future = request.args.get('future') == "true"
past = request.args.get('past') == "true"
MAX_PERIODS = 1000
# Flagging tasks as successful
@ -1042,47 +1047,42 @@ class Airflow(BaseView):
session = settings.Session()
start_date = dag.start_date
if not start_date and 'start_date' in dag.default_args:
start_date = dag.default_args['start_date']
base_date = request.args.get('base_date')
num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else 25
if not base_date:
# New DAGs will not have a latest execution date
if dag.latest_execution_date:
base_date = dag.latest_execution_date + 2 * dag.schedule_interval
else:
base_date = datetime.now()
else:
if base_date:
base_date = dateutil.parser.parse(base_date)
else:
base_date = dag.latest_execution_date or datetime.now()
start_date = dag.start_date
if not start_date and 'start_date' in dag.default_args:
start_date = dag.default_args['start_date']
dates = dag.date_range(base_date, num=-abs(num_runs))
min_date = dates[0] if dates else datetime(2000, 1, 1)
# if a specific base_date is requested, don't round it
if not request.args.get('base_date'):
if start_date:
base_date = utils.round_time(
base_date, dag.schedule_interval, start_date)
else:
base_date = utils.round_time(base_date, dag.schedule_interval)
DR = models.DagRun
dag_runs = (
session.query(DR)
.filter(
DR.dag_id==dag.dag_id,
DR.execution_date<=base_date,
DR.execution_date>=min_date)
.all()
)
dag_runs = {
dr.execution_date: utils.alchemy_to_dict(dr) for dr in dag_runs}
form = TreeForm(data={'base_date': base_date, 'num_runs': num_runs})
from_date = (base_date - (num_runs * dag.schedule_interval))
dates = utils.date_range(
from_date, base_date, dag.schedule_interval)
tis = dag.get_task_instances(
session, start_date=min_date, end_date=base_date)
dates = sorted(list({ti.execution_date for ti in tis}))
max_date = max([ti.execution_date for ti in tis]) if dates else None
task_instances = {}
for ti in dag.get_task_instances(session, from_date):
task_instances[(ti.task_id, ti.execution_date)] = ti
for ti in tis:
tid = utils.alchemy_to_dict(ti)
dr = dag_runs.get(ti.execution_date)
tid['external_trigger'] = dr['external_trigger'] if dr else False
task_instances[(ti.task_id, ti.execution_date)] = tid
expanded = []
# The default recursion traces every path so that tree view has full
# expand/collapse functionality. After 5,000 nodes we stop and fall
# back on a quick DFS search for performance. See PR #320.
@ -1109,11 +1109,10 @@ class Airflow(BaseView):
return {
'name': task.task_id,
'instances': [
utils.alchemy_to_dict(
task_instances.get((task.task_id, d))) or {
'execution_date': d.isoformat(),
'task_id': task.task_id
}
task_instances.get((task.task_id, d)) or {
'execution_date': d.isoformat(),
'task_id': task.task_id
}
for d in dates],
children_key: children,
'num_dep': len(task.upstream_list),
@ -1125,24 +1124,19 @@ class Airflow(BaseView):
'depends_on_past': task.depends_on_past,
'ui_color': task.ui_color,
}
if len(dag.roots) > 1:
# d3 likes a single root
data = {
'name': 'root',
'instances': [],
'children': [recurse_nodes(t, set()) for t in dag.roots]
}
elif len(dag.roots) == 1:
data = recurse_nodes(dag.roots[0], set())
else:
flash("No tasks found.", "error")
data = []
data = {
'name': '[DAG]',
'children': [recurse_nodes(t, set()) for t in dag.roots],
'instances': [
dag_runs.get(d) or {'execution_date': d.isoformat()}
for d in dates],
}
data = json.dumps(data, indent=4, default=utils.json_ser)
session.commit()
session.close()
form = TreeForm(data={'base_date': max_date, 'num_runs': num_runs})
return self.render(
'airflow/tree.html',
operators=sorted(
@ -1205,7 +1199,25 @@ class Airflow(BaseView):
else:
dttm = dag.latest_execution_date or datetime.now().date()
form = GraphForm(data={'execution_date': dttm, 'arrange': arrange})
DR = models.DagRun
drs = session.query(DR).filter_by(dag_id=dag_id).all()
dr_choices = []
dr_state = None
for dr in drs:
dr_choices.append((dr.execution_date.isoformat(), dr.run_id))
if dttm == dr.execution_date:
dr_state = dr.state
class GraphForm(Form):
execution_date = SelectField("DAG run", choices=dr_choices)
arrange = SelectField("Layout", choices=(
('LR', "Left->Right"),
('RL', "Right->Left"),
('TB', "Top->Bottom"),
('BT', "Bottom->Top"),
))
form = GraphForm(
data={'execution_date': dttm.isoformat(), 'arrange': arrange})
task_instances = {
ti.task_id: utils.alchemy_to_dict(ti)
@ -1229,6 +1241,7 @@ class Airflow(BaseView):
width=request.args.get('width', "100%"),
height=request.args.get('height', "800"),
execution_date=dttm.isoformat(),
state_token=state_token(dr_state),
doc_md=doc_md,
arrange=arrange,
operators=sorted(
@ -1308,12 +1321,11 @@ class Airflow(BaseView):
data = []
for ti in task.get_task_instances(session, from_date):
if ti.end_date:
data.append([
ti.execution_date.isoformat(), old_div((
ti.end_date - (
ti.execution_date + task.schedule_interval)
).total_seconds(),(60*60))
])
ts = ti.execution_date
if dag.schedule_interval:
ts = dag.following_schedule(ts)
secs = old_div((ti.end_date - ts).total_seconds(), 60*60)
data.append([ti.execution_date.isoformat(), secs])
all_data.append({'data': data, 'name': task.task_id})
session.commit()
@ -1797,6 +1809,46 @@ class JobModelView(ModelViewOnly):
latest_heartbeat=datetime_f)
class DagRunModelView(ModelViewOnly):
verbose_name_plural = "DAG Runs"
can_delete = True
can_edit = True
can_create = True
column_editable_list = ('state',)
verbose_name = "dag run"
column_default_sort = ('execution_date', True)
form_choices = {
'state': [
('success', 'success'),
('running', 'running'),
('failed', 'failed'),
],
}
column_list = (
'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger')
column_filters = column_list
column_searchable_list = ('dag_id', 'state', 'run_id')
column_formatters = dict(
execution_date=datetime_f,
state=state_f,
start_date=datetime_f,
dag_id=dag_link)
@action(
'set_running', "Set state to 'running'", None)
@utils.provide_session
def action_set_running(self, ids, session=None):
try:
DR = models.DagRun
count = 0
for dr in session.query(DR).filter(DR.id.in_(ids)).all():
count += 1
flash("{} dag runs were set to 'running'".format(ids))
except Exception as ex:
if not self.handle_view_exception(ex):
raise Exception("Ooops")
flash('Failed to set state', 'error')
class LogModelView(ModelViewOnly):
verbose_name_plural = "logs"
verbose_name = "log"

Просмотреть файл

@ -1,5 +1,5 @@
Code / API
==========
API Reference
=============
Operators
---------
@ -124,6 +124,8 @@ Variable Description
``airflow.configuration.conf`` which
represents the content of your
``airflow.cfg``
``run_id`` the ``run_id`` of the current DAG run
``dag_run`` a reference to the DAG run object
================================= ====================================
Note that you can access the object's attributes and methods with simple

Просмотреть файл

@ -1,11 +1,16 @@
The Scheduler
=============
Scheduling & Triggers
=====================
The Airflow scheduler monitors all tasks and all DAGs and schedules the
The Airflow scheduler monitors all tasks and all DAGs, and triggers the
task instances whose dependencies have been met. Behind the scenes,
it monitors a folder for all DAG objects it may contain,
and periodically inspects all tasks to see whether it can schedule the
next run.
it monitors and stays in sync with a folder for all DAG objects it may contain,
and periodically (every minute or so) inspects active tasks to see whether
they can be triggered.
The Airflow scheduler is designed to run as a persistent service in an
Airflow production environment. To kick it off, all you need to do is
execute ``airflow scheduler``. It will use the configuration specified in
``airflow.cfg``.
Note that if you run a DAG on a ``schedule_interval`` of one day,
the run stamped ``2016-01-01`` will be trigger soon after ``2016-01-01T23:59``.
@ -13,9 +18,9 @@ In other words, the job instance is started once the period it covers
has ended.
The scheduler starts an instance of the executor specified in the your
``airflow.cfg``. If it happens to be the LocalExecutor, tasks will be
executed as subprocesses; in the case of CeleryExecutor and MesosExecutor, tasks are
executed remotely.
``airflow.cfg``. If it happens to be the ``LocalExecutor``, tasks will be
executed as subprocesses; in the case of ``CeleryExecutor`` and
``MesosExecutor``, tasks are executed remotely.
To start a scheduler, simply run the command:
@ -23,15 +28,66 @@ To start a scheduler, simply run the command:
airflow scheduler
Note that:
* It **won't parallelize** multiple instances of the same tasks; it always wait for the previous schedule to be done before moving forward
* It will **not fill in gaps**; it only moves forward in time from the latest task instance on that task
* If a task instance failed and the task is set to ``depends_on_past=True``, it won't move forward from that point until the error state is cleared and the task runs successfully, or is marked as successful
* If no task history exists for a task, it will attempt to run it on the task's ``start_date``
DAG Runs
''''''''
Understanding this, you should be able to comprehend what is keeping your
tasks from running or moving forward. To allow the scheduler to move forward, you may want to clear the state of some task instances, or mark them as successful.
A DAG Run is an object representing an instantiation of the DAG in time.
Each DAG may or may not have a schedule, which informs how ``DAG Runs`` are
created. ``schedule_interval`` is defined as a DAG arguments, and receives
preferably a
[cron expression](https://en.wikipedia.org/wiki/Cron#CRON_expression) as
a ``str``, or a ``datetime.timedelta`` object. Alternatively, you can also
use one of these cron "preset":
+--------------+---------------------------------------------------------------- +---------------+
| preset | Run once a year at midnight of January 1 | cron |
+==============+================================================================ +===============+
| ``@once`` | Schedule once and only once | |
+--------------+---------------------------------------------------------------- +---------------+
| ``@hourly`` | Run once an hour at the beginning of the hour | ``0 * * * *`` |
+--------------+---------------------------------------------------------------- +---------------+
| ``@daily`` | Run once a day at midnight | ``0 0 * * *`` |
+--------------+---------------------------------------------------------------- +---------------+
| ``@weekly`` | Run once a week at midnight on Sunday morning | ``0 0 * * 0`` |
+--------------+---------------------------------------------------------------- +---------------+
| ``@monthly`` | Run once a month at midnight of the first day of the month | ``0 0 1 * *`` |
+--------------+---------------------------------------------------------------- +---------------+
| ``@yearly`` | Run once a year at midnight of January 1 | ``0 0 1 1 *`` |
+--------------+---------------------------------------------------------------- +---------------+
Your DAG will be instantiated
for each schedule, while creating a ``DAG Run`` entry for each schedule.
DAG runs have a state associated to them (running, failed, success) and
informs the scheduler on which set of schedules should be evaluated for
task submissions. Without the metadata at the DAG run level, the Airflow
scheduler would have much more work to do in order to figure out what tasks
should be triggered and come to a crawl. It might also create undesired
processing when changing the shape of your DAG, by say adding in new
tasks.
External Triggers
'''''''''''''''''
Note that ``DAG Runs`` can also be created manually through the CLI while
running an ``airflow trigger_dag`` command, where you can define a
specific ``run_id``. The ``DAG Runs`` created externally to the
scheduler get associated to the trigger's timestamp, and will be displayed
in the UI alongside scheduled ``DAG runs``.
To Keep in Mind
'''''''''''''''
* The first ``DAG Run`` is created based on the minimum ``start_date`` for the
tasks in your DAG.
* Subsequent ``DAG Runs`` are created by the scheduler process, based on
your DAG's ``schedule_interval``, sequentially.
* When clearing a set of tasks' state in hope of getting them to re-run,
it is important to keep in mind the ``DAG Run``'s state too as it defines
whether the scheduler should look into triggering tasks for that run.
Here are some of the ways you can **unblock tasks**:
@ -40,7 +96,3 @@ Here are some of the ways you can **unblock tasks**:
* Marking task instances as successful can be done through the UI. This is mostly to fix false negatives, or for instance when the fix has been applied outside of Airflow.
* The ``airflow backfill`` CLI subcommand has a flag to ``--mark_success`` and allows selecting subsections of the DAG as well as specifying date ranges.
The Airflow scheduler is designed to run as a persistent service in an
Airflow production environment. To kick it off, all you need to do is
execute ``airflow scheduler``. It will use the configuration specified in
``airflow.cfg``.

Просмотреть файл

@ -26,6 +26,7 @@ pygments
pyhive
pydruid
PySmbClient
psycopg2
python-dateutil
# pyhs2 -> not compatible with Python 3 because of sasl
redis

Просмотреть файл

@ -13,6 +13,8 @@ from airflow.settings import Session
NUM_EXAMPLE_DAGS = 7
DEV_NULL = '/dev/null'
DEFAULT_DATE = datetime(2015, 1, 1)
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
TEST_DAG_ID = 'unit_tests'
configuration.test_mode()
@ -39,7 +41,7 @@ class CoreTest(unittest.TestCase):
configuration.test_mode()
self.dagbag = models.DagBag(
dag_folder=DEV_NULL, include_examples=True)
self.args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
dag = DAG(TEST_DAG_ID, default_args=self.args)
self.dag = dag
self.dag_bash = self.dagbag.dags['example_bash_operator']
@ -379,16 +381,17 @@ class WebUiTests(unittest.TestCase):
response = self.app.get(
'/admin/airflow/rendered?'
'task_id=runme_1&dag_id=example_bash_operator&'
'execution_date=2015-01-07T00:00:00')
assert "example_bash_operator__runme_1__20150107" in response.data.decode('utf-8')
'execution_date={}'.format(DEFAULT_DATE_DS))
assert "example_bash_operator__runme_1" in response.data.decode('utf-8')
response = self.app.get(
'/admin/airflow/log?task_id=run_this_last&'
'dag_id=example_bash_operator&execution_date=2015-01-01T00:00:00')
'dag_id=example_bash_operator&execution_date={}'
''.format(DEFAULT_DATE_ISO))
assert "run_this_last" in response.data.decode('utf-8')
response = self.app.get(
'/admin/airflow/task?'
'task_id=runme_0&dag_id=example_bash_operator&'
'execution_date=2015-01-01')
'execution_date='.format(DEFAULT_DATE_DS))
assert "Attributes" in response.data.decode('utf-8')
response = self.app.get(
'/admin/airflow/dag_stats')
@ -412,15 +415,22 @@ class WebUiTests(unittest.TestCase):
"/admin/airflow/clear?task_id=runme_1&"
"dag_id=example_bash_operator&future=false&past=false&"
"upstream=false&downstream=true&"
"execution_date=2017-01-12T00:00:00&"
"origin=/admin")
response = self.app.get(url)
"execution_date={}&"
"origin=/admin".format(DEFAULT_DATE_DS))
clear_url_confirmed = clear_url + "&confirmed=true"
response = self.app.get(success_url_confirmed)
response = self.app.get(clear_url)
assert "Wait a minute" in response.data.decode('utf-8')
response = self.app.get(clear_url_confirmed)
response = self.app.get(success_url)
print("clear_url_confirmed: " + clear_url_confirmed)
print("success_url: " + success_url)
assert "Wait a minute" in response.data.decode('utf-8')
response = self.app.get(url + "&confirmed=true")
url = (
"/admin/airflow/run?task_id=runme_0&"
"dag_id=example_bash_operator&force=true&deps=true&"
"execution_date=2015-08-12T00:00:00&origin=/admin")
"execution_date={}&origin=/admin").format(DEFAULT_DATE_ISO)
response = self.app.get(url)
response = self.app.get(
"/admin/airflow/refresh?dag_id=example_bash_operator")
@ -512,7 +522,7 @@ if 'MySqlOperator' in dir(operators):
args = {
'owner': 'airflow',
'mysql_conn_id': 'airflow_db',
'start_date': datetime(2015, 1, 1)
'start_date': DEFAULT_DATE
}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
@ -557,6 +567,14 @@ if 'MySqlOperator' in dir(operators):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_sql_sensor(self):
t = operators.SqlSensor(
task_id='sql_sensor_check',
conn_id='mysql_default',
sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
if 'PostgresOperator' in dir(operators):
# Only testing if the operator is installed
@ -564,7 +582,7 @@ if 'PostgresOperator' in dir(operators):
def setUp(self):
configuration.test_mode()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
@ -593,7 +611,7 @@ class HttpOpSensorTest(unittest.TestCase):
def setUp(self):
configuration.test_mode()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
@ -696,7 +714,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
def setUp(self):
configuration.test_mode()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
@ -721,7 +739,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
mysql_conn_id='airflow_db',
sql=sql,
hive_table='airflow.test_mysql_to_hive_part',
partition={'ds': '2015-01-02'},
partition={'ds': DEFAULT_DATE_DS},
recreate=False,
create=True,
dag=self.dag)
@ -732,7 +750,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
def setUp(self):
configuration.test_mode()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
dag = DAG(TEST_DAG_ID, default_args=args)
self.dag = dag
self.hql = """
@ -801,7 +819,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
t = operators.HiveStatsCollectionOperator(
task_id='hive_stats_check',
table="airflow.static_babynames_partitioned",
partition={'ds': '2015-01-01'},
partition={'ds': DEFAULT_DATE_DS},
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
@ -816,7 +834,7 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
t = operators.MetastorePartitionSensor(
task_id='hive_partition_check',
table='airflow.static_babynames_partitioned',
partition_name='ds=2015-01-01',
partition_name='ds={}'.format(DEFAULT_DATE_DS),
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)