diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 36fb6a997a..b4a53ca957 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -4,6 +4,7 @@ import logging import os import subprocess import sys +from datetime import datetime from builtins import input import argparse @@ -13,7 +14,7 @@ import airflow from airflow import jobs, settings, utils from airflow import configuration from airflow.executors import DEFAULT_EXECUTOR -from airflow.models import DagBag, TaskInstance, DagPickle +from airflow.models import DagBag, TaskInstance, DagPickle, DagRun from airflow.utils import AirflowException DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')) @@ -89,6 +90,21 @@ def backfill(args): pool=args.pool) +def trigger_dag(args): + session = settings.Session() + # TODO: verify dag_id + dag_id = args.dag_id + run_id = args.run_id or None + execution_date = datetime.now() + trigger = DagRun( + dag_id=dag_id, + run_id=run_id, + execution_date=execution_date, + external_trigger=True) + session.add(trigger) + session.commit() + + def run(args): utils.pessimistic_connection_handling() @@ -487,6 +503,14 @@ def get_parser(): "-c", "--no_confirm", help=ht, action="store_true") parser_clear.set_defaults(func=clear) + ht = "Trigger a DAG" + parser_trigger_dag = subparsers.add_parser('trigger_dag', help=ht) + parser_trigger_dag.add_argument("dag_id", help="The id of the dag to run") + parser_trigger_dag.add_argument( + "-r", "--run_id", + help="Helps to indentify this run") + parser_trigger_dag.set_defaults(func=trigger_dag) + ht = "Run a single task instance" parser_run = subparsers.add_parser('run', help=ht) parser_run.add_argument("dag_id", help="The id of the dag to run") diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 94dd2fc572..e3dafbfa20 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -10,7 +10,9 @@ args = { 'start_date': seven_days_ago, } -dag = DAG(dag_id='example_bash_operator', default_args=args) +dag = DAG( + dag_id='example_bash_operator', default_args=args, + schedule_interval='0 0 * * *') cmd = 'ls -l' run_this_last = DummyOperator(task_id='run_this_last', dag=dag) diff --git a/airflow/jobs.py b/airflow/jobs.py index ee448e6b4f..4cb23b3e8c 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -7,6 +7,7 @@ from builtins import str from past.builtins import basestring from collections import defaultdict from datetime import datetime +from itertools import product import getpass import logging import signal @@ -259,15 +260,16 @@ class SchedulerJob(BaseJob): task = dag.get_task(ti.task_id) dttm = ti.execution_date if task.sla: - dttm += dag.schedule_interval + dttm = dag.following_schedule(dttm) + following_schedule = dag.following_schedule(dttm) while dttm < datetime.now(): - if dttm + task.sla + dag.schedule_interval < datetime.now(): + if following_schedule + task.sla < datetime.now(): session.merge(models.SlaMiss( task_id=ti.task_id, dag_id=ti.dag_id, execution_date=dttm, timestamp=ts)) - dttm += dag.schedule_interval + dttm = dag.following_schedule(dttm) session.commit() slas = ( @@ -349,7 +351,7 @@ class SchedulerJob(BaseJob): last_scheduled_run = qry.scalar() if not last_scheduled_run or last_scheduled_run <= datetime.now(): if last_scheduled_run: - next_run_date = last_scheduled_run + dag.schedule_interval + next_run_date = dag.following_schedule(last_scheduled_run) else: next_run_date = dag.default_args['start_date'] if not next_run_date: @@ -358,6 +360,7 @@ class SchedulerJob(BaseJob): dag_id=dag.dag_id, run_id='scheduled', execution_date=next_run_date, + state=State.RUNNING, external_trigger=False ) session.add(next_run) @@ -374,6 +377,7 @@ class SchedulerJob(BaseJob): function takes a lock on the DAG and timestamps the last run in ``last_scheduler_run``. """ + TI = models.TaskInstance DagModel = models.DagModel session = settings.Session() @@ -399,76 +403,19 @@ class SchedulerJob(BaseJob): db_dag.last_scheduler_run = datetime.now() session.commit() - TI = models.TaskInstance - logging.info( - "Getting latest instance " - "for all tasks in dag " + dag.dag_id) - sq = ( - session - .query( - TI.task_id, - func.max(TI.execution_date).label('max_ti')) - .filter(TI.dag_id == dag.dag_id) - .group_by(TI.task_id).subquery('sq') - ) + active_runs = dag.get_active_runs() - qry = session.query(TI).filter( - TI.dag_id == dag.dag_id, - TI.task_id == sq.c.task_id, - TI.execution_date == sq.c.max_ti, - ) - logging.debug("Querying max dates for each task") - latest_ti = qry.all() - ti_dict = {ti.task_id: ti for ti in latest_ti} - session.expunge_all() - session.commit() - logging.debug("{} rows returned".format(len(latest_ti))) - - for task in dag.tasks: + for task, dttm in product(dag.tasks, active_runs): if task.adhoc: continue - if task.task_id not in ti_dict: - # TODO: Needs this be changed with DagRun refactoring - # Brand new task, let's get started - ti = TI(task, task.start_date) - ti.refresh_from_db() - if ti.is_queueable(flag_upstream_failed=True): - logging.info( - 'First run for {ti}'.format(**locals())) - executor.queue_task_instance(ti, pickle_id=pickle_id) - else: - ti = ti_dict[task.task_id] - ti.task = task # Hacky but worky - if ti.state == State.RUNNING: - continue # Only one task at a time - elif ti.state == State.UP_FOR_RETRY: - # If task instance if up for retry, make sure - # the retry delay is met - if ti.is_runnable(): - logging.debug('Triggering retry: ' + str(ti)) - executor.queue_task_instance(ti, pickle_id=pickle_id) - elif ti.state == State.QUEUED: - # If was queued we skipped so that in gets prioritized - # in self.prioritize_queued - continue - else: - # Checking whether there is a dag for which no task exists - # up to now - qry = session.query(func.min(models.DagRun.execution_date)).filter( - and_(models.DagRun.dag_id == dag.dag_id, - models.DagRun.execution_date > ti.execution_date)) - next_schedule = qry.scalar() - if not next_schedule: - continue + ti = TI(task, dttm) + ti.refresh_from_db() + if ti.state in (State.RUNNING, State.QUEUED, State.SUCCESS): + continue + elif ti.is_runnable(flag_upstream_failed=True): + logging.debug('Queuing next run: ' + str(ti)) + executor.queue_task_instance(ti, pickle_id=pickle_id) - ti = TI( - task=task, - execution_date=next_schedule, - ) - ti.refresh_from_db() - if ti.is_queueable(flag_upstream_failed=True): - logging.debug('Queuing next run: ' + str(ti)) - executor.queue_task_instance(ti, pickle_id=pickle_id) # Releasing the lock logging.debug("Unlocking DAG (scheduler_lock)") db_dag = ( diff --git a/airflow/migrations/versions/19054f4ff36_add_dagrun.py b/airflow/migrations/versions/19054f4ff36_add_dagrun.py index 3be07c6306..01803ea4db 100644 --- a/airflow/migrations/versions/19054f4ff36_add_dagrun.py +++ b/airflow/migrations/versions/19054f4ff36_add_dagrun.py @@ -21,6 +21,7 @@ def upgrade(): 'dag_run', sa.Column('dag_id', sa.String(length=250), nullable=False), sa.Column('execution_date', sa.DateTime(), nullable=False), + sa.Column('state', sa.String(length=50), nullable=True), sa.Column('run_id', sa.String(length=250), nullable=True), sa.Column('external_trigger', sa.Boolean(), nullable=True), sa.PrimaryKeyConstraint('dag_id', 'execution_date') diff --git a/airflow/models.py b/airflow/models.py index 59f3bf99a5..535115d501 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -32,6 +32,9 @@ from sqlalchemy.ext.declarative import declarative_base, declared_attr from sqlalchemy.dialects.mysql import LONGTEXT from sqlalchemy.orm import relationship, synonym +from croniter import croniter +import six + from airflow import settings, utils from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor from airflow import configuration @@ -677,7 +680,7 @@ class TaskInstance(Base): return False elif self.task.end_date and self.execution_date > self.task.end_date: return False - elif self.state == State.SKIPPED: + elif self.state in (State.SKIPPED, State.QUEUED): return False elif ( self.state in State.runnable() and @@ -748,7 +751,7 @@ class TaskInstance(Base): TI.dag_id == self.dag_id, TI.task_id == task.task_id, TI.execution_date == - self.execution_date-task.schedule_interval, + self.task.dag.previous_schedule(self.execution_date), TI.state == State.SUCCESS, ).first() if not previous_ti: @@ -1858,7 +1861,8 @@ class DAG(object): timedelta object gets added to your latest task instance's execution_date to figure out the next schedule :type schedule_interval: datetime.timedelta or - dateutil.relativedelta.relativedelta + dateutil.relativedelta.relativedelta or str that acts as a cron + expression :param start_date: The timestamp from which the scheduler will attempt to backfill :type start_date: datetime.datetime @@ -1953,6 +1957,20 @@ class DAG(object): hash_components.append(repr(val)) return hash(tuple(hash_components)) + def following_schedule(self, dttm): + if isinstance(self.schedule_interval, six.string_types): + cron = croniter(self.schedule_interval, dttm) + return cron.get_next(datetime) + else: + return dttm + self.schedule_interval + + def previous_schedule(self, dttm): + if isinstance(self.schedule_interval, six.string_types): + cron = croniter(self.schedule_interval, dttm) + return cron.get_prev(datetime) + else: + return dttm - self.schedule_interval + @property def task_ids(self): return [t.task_id for t in self.tasks] @@ -2006,6 +2024,41 @@ class DAG(object): l += task.subdag.subdags return l + def get_active_runs(self): + """ + Maintains and returns the currently active runs as a list of dates + """ + TI = TaskInstance + session = settings.Session() + # Checking state of active DagRuns + active_runs = [] + active_runs = ( + session.query(DagRun) + .filter( + DagRun.dag_id == self.dag_id, + DagRun.state == State.RUNNING) + .all() + ) + for run in active_runs: + logging.info("Checking state for " + str(run)) + task_instances = session.query(TI).filter( + TI.dag_id == run.dag_id, + TI.task_id.in_(self.task_ids), + TI.execution_date == run.execution_date, + ).all() + if len(task_instances) == len(self.tasks): + task_states = [ti.state for ti in task_instances] + if State.FAILED in task_states: + logging.info('Marking run {} failed'.format(run)) + run.state = State.FAILED + elif set(task_states) == set([State.SUCCESS]): + logging.info('Marking run {} successful'.format(run)) + run.state = State.SUCCESS + else: + active_runs.append(run.execution_date) + session.commit() + return active_runs + def resolve_template_files(self): for t in self.tasks: t.resolve_template_files() @@ -2298,7 +2351,7 @@ class Chart(Base): id = Column(Integer, primary_key=True) label = Column(String(200)) conn_id = Column(String(ID_LEN), nullable=False) - user_id = Column(Integer(), ForeignKey('user.id'),) + user_id = Column(Integer(), ForeignKey('user.id'), nullable=True) chart_type = Column(String(100), default="line") sql_layout = Column(String(50), default="series") sql = Column(Text, default="SELECT series, x, y FROM table") @@ -2514,24 +2567,23 @@ class XCom(Base): class DagRun(Base): """ DagRun describes an instance of a Dag. It can be created - by a scheduled of a Dag or by an external trigger + by the scheduler (for regular runs) or by an external trigger """ __tablename__ = "dag_run" dag_id = Column(String(ID_LEN), primary_key=True) execution_date = Column(DateTime, primary_key=True) + state = Column(String(50)) run_id = Column(String(ID_LEN)) external_trigger = Column(Boolean, default=False) def __repr__(self): return ''.format( - task_id=self.task_id, + dag_id=self.dag_id, execution_date=self.execution_date, run_id=self.run_id, external_trigger=self.external_trigger) - return str(( - self.dag_id, self.run_id, self.execution_date.isoformat())) class Pool(Base): diff --git a/airflow/utils.py b/airflow/utils.py index 5a40b3e49e..483da6eb82 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -21,6 +21,7 @@ import os import re import shutil import signal +import six import smtplib from tempfile import mkdtemp @@ -34,6 +35,7 @@ from sqlalchemy import event, exc from sqlalchemy.pool import Pool import numpy as np +from croniter import croniter from airflow import settings from airflow import configuration @@ -283,11 +285,36 @@ def validate_key(k, max_length=250): def date_range(start_date, end_date=datetime.now(), delta=timedelta(1)): + """ + Get a set of dates as a list based on a start, end and delta, delta + can be something that can be added to ``datetime.datetime`` + or a cron expression as a ``str`` + + >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3)) + [datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 1, 2, 0, 0), + datetime.datetime(2016, 1, 3, 0, 0)] + >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), '0 0 * * *') + [datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 1, 2, 0, 0), + datetime.datetime(2016, 1, 3, 0, 0)] + >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *") + [datetime.datetime(2016, 1, 1, 0, 0), + datetime.datetime(2016, 2, 1, 0, 0), + datetime.datetime(2016, 3, 1, 0, 0)] + """ + delta_iscron = False + if isinstance(delta, six.string_types): + delta_iscron = True + cron = croniter(delta, start_date) l = [] if end_date >= start_date: while start_date <= end_date: l.append(start_date) - start_date += delta + if delta_iscron: + start_date = cron.get_next(datetime) + else: + start_date += delta else: raise AirflowException("start_date can't be after end_date") return l @@ -566,6 +593,16 @@ def round_time(dt, delta, start_date=datetime.min): >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) datetime.datetime(2015, 9, 14, 0, 0) """ + + if isinstance(delta, six.string_types): + # It's cron based, so it's easy + cron = croniter(delta, start_date) + prev = cron.get_prev(datetime) + if prev == start_date: + return start_date + else: + return prev + # Ignore the microseconds of dt dt -= timedelta(microseconds = dt.microsecond) diff --git a/airflow/www/app.py b/airflow/www/app.py index bfa53308de..75282df731 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -45,23 +45,17 @@ def create_app(config=None): admin.add_view(views.Airflow(name='DAGs')) admin.add_view(views.SlaMissModelView(models.SlaMiss, Session, name="SLA Misses", category="Browse")) - admin.add_view( - views.TaskInstanceModelView(models.TaskInstance, Session, name="Task Instances", category="Browse") - ) - + admin.add_view(views.TaskInstanceModelView(models.TaskInstance, Session, name="Task Instances", category="Browse")) admin.add_view(views.LogModelView(models.Log, Session, name="Logs", category="Browse")) admin.add_view(views.JobModelView(jobs.BaseJob, Session, name="Jobs", category="Browse")) - admin.add_view(views.QueryView(name='Ad Hoc Query', category="Data Profiling")) admin.add_view(views.ChartModelView(models.Chart, Session, name="Charts", category="Data Profiling")) admin.add_view(views.KnowEventView(models.KnownEvent, Session, name="Known Events", category="Data Profiling")) - admin.add_view(views.PoolModelView(models.Pool, Session, name="Pools", category="Admin")) admin.add_view(views.ConfigurationView(name='Configuration', category="Admin")) admin.add_view(views.UserModelView(models.User, Session, name="Users", category="Admin")) admin.add_view(views.ConnectionModelView(models.Connection, Session, name="Connections", category="Admin")) admin.add_view(views.VariableView(models.Variable, Session, name="Variables", category="Admin")) - admin.add_link(base.MenuLink(category='Docs', name='Documentation', url='http://pythonhosted.org/airflow/')) admin.add_link(base.MenuLink(category='Docs',name='Github',url='https://github.com/airbnb/airflow')) diff --git a/requirements.txt b/requirements.txt index a71a05995c..5d42fc1cff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ chartkick cryptography coverage coveralls +croniter dill flake8 flask diff --git a/setup.py b/setup.py index 66771f4d55..29dd0b0eb4 100644 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ setup( install_requires=[ 'alembic>=0.8.0, <0.9', 'chartkick>=0.4.2, < 0.5', + 'croniter>=0.3.8, <0.4', 'dill>=0.2.2, <0.3', 'flask>=0.10.1, <0.11', 'flask-admin==1.2.0',