diff --git a/TODO.md b/TODO.md index 431d8b6b2c..a22e68bbca 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,10 @@ TODO ----- + +### Current +* Test @once +* Test ``run_id`` in template and ``dag_run`` + #### UI * Backfill form * Better task filtering int duration and landing time charts (operator toggle, task regex, uncheck all button) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index b4a53ca957..4429f1a7f6 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -15,7 +15,7 @@ from airflow import jobs, settings, utils from airflow import configuration from airflow.executors import DEFAULT_EXECUTOR from airflow.models import DagBag, TaskInstance, DagPickle, DagRun -from airflow.utils import AirflowException +from airflow.utils import AirflowException, State DAGS_FOLDER = os.path.expanduser(configuration.get('core', 'DAGS_FOLDER')) @@ -91,17 +91,23 @@ def backfill(args): def trigger_dag(args): + log_to_stdout() session = settings.Session() # TODO: verify dag_id - dag_id = args.dag_id - run_id = args.run_id or None execution_date = datetime.now() - trigger = DagRun( - dag_id=dag_id, - run_id=run_id, - execution_date=execution_date, - external_trigger=True) - session.add(trigger) + dr = session.query(DagRun).filter( + DagRun.dag_id==args.dag_id, DagRun.run_id==args.run_id).first() + if dr: + logging.error("This run_id already exists") + else: + trigger = DagRun( + dag_id=args.dag_id, + run_id=args.run_id, + execution_date=execution_date, + state=State.RUNNING, + external_trigger=True) + session.add(trigger) + logging.info("Created {}".format(trigger)) session.commit() diff --git a/airflow/configuration.py b/airflow/configuration.py index 63f6dbbf0a..82fa3a40ae 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -53,7 +53,8 @@ defaults = { 'plugins_folder': None, 'security': None, 'donot_pickle': False, - 's3_log_folder': '' + 's3_log_folder': '', + 'dag_concurrency': 16, }, 'webserver': { 'base_url': 'http://localhost:8080', @@ -120,6 +121,9 @@ sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/airflow.db # on this airflow installation parallelism = 32 +# The number of task instances allowed to run concurrently by the scheduler +dag_concurrency = 16 + # Whether to load the examples that ship with Airflow. It's good to # get started, but you probably want to set this to False in a production # environment @@ -271,6 +275,7 @@ sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db unit_test_mode = True load_examples = True donot_pickle = False +dag_concurrency = 16 [webserver] base_url = http://localhost:8080 diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index e3dafbfa20..2a7144bea6 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -31,6 +31,6 @@ for i in range(3): task = BashOperator( task_id='also_run_this', - bash_command='echo "{{ macros.uuid.uuid1() }}"', + bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"', dag=dag) task.set_downstream(run_this_last) diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index fa94e087fc..f576d20866 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -10,7 +10,10 @@ args = { 'start_date': seven_days_ago, } -dag = DAG(dag_id='example_branch_operator', default_args=args) +dag = DAG( + dag_id='example_branch_operator', + default_args=args, + schedule_interval="@daily") cmd = 'ls -l' run_this_first = DummyOperator(task_id='run_this_first', dag=dag) diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py index 04869373d2..2e20516ffa 100644 --- a/airflow/example_dags/example_http_operator.py +++ b/airflow/example_dags/example_http_operator.py @@ -67,10 +67,9 @@ t4 = SimpleHttpOperator( sensor = HttpSensor( task_id='http_sensor_check', conn_id='http_default', - endpoint='api/v1.0/apps', + endpoint='', params={}, - headers={"Content-Type": "application/json"}, - response_check=lambda response: True if "collation" in response.content else False, + response_check=lambda response: True if "Google" in response.content else False, poke_interval=5, dag=dag) diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 8355cc1232..0fc2180d63 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -7,15 +7,17 @@ from datetime import datetime, timedelta import time from pprint import pprint -seven_days_ago = datetime.combine(datetime.today() - timedelta(7), - datetime.min.time()) +seven_days_ago = datetime.combine( + datetime.today() - timedelta(7), datetime.min.time()) args = { 'owner': 'airflow', 'start_date': seven_days_ago, } -dag = DAG(dag_id='example_python_operator', default_args=args) +dag = DAG( + dag_id='example_python_operator', default_args=args, + schedule_interval=None) def my_sleeping_function(random_base): diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 4bd334b0fa..2d9c087c7a 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -1,11 +1,21 @@ from __future__ import print_function import airflow -import datetime +from datetime import datetime, timedelta + +seven_days_ago = datetime.combine( + datetime.today() - timedelta(7), + datetime.min.time()) +args = { + 'owner': 'airflow', + 'start_date': seven_days_ago, + 'provide_context': True +} dag = airflow.DAG( 'example_xcom', - start_date=datetime.datetime(2015, 1, 1), - default_args={'owner': 'airflow', 'provide_context': True}) + start_date=datetime(2015, 1, 1), + schedule_interval="@once", + default_args=args) value_1 = [1, 2, 3] value_2 = {'a': 'b'} @@ -42,4 +52,4 @@ push2 = airflow.operators.PythonOperator( pull = airflow.operators.PythonOperator( task_id='puller', dag=dag, python_callable=puller) -pull.set_upstream([push1, push2]) \ No newline at end of file +pull.set_upstream([push1, push2]) diff --git a/airflow/jobs.py b/airflow/jobs.py index 4cb23b3e8c..bbe0cd7966 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -342,31 +342,32 @@ class SchedulerJob(BaseJob): This method checks whether a new DagRun needs to be created for a DAG based on scheduling interval """ - DagRun = models.DagRun - session = settings.Session() - qry = session.query(func.max(DagRun.execution_date)).filter(and_( - DagRun.dag_id == dag.dag_id, - DagRun.external_trigger == False - )) - last_scheduled_run = qry.scalar() - if not last_scheduled_run or last_scheduled_run <= datetime.now(): - if last_scheduled_run: + if dag.schedule_interval: + DagRun = models.DagRun + session = settings.Session() + qry = session.query(func.max(DagRun.execution_date)).filter(and_( + DagRun.dag_id == dag.dag_id, + DagRun.external_trigger == False + )) + last_scheduled_run = qry.scalar() + next_run_date = None + if not last_scheduled_run: + next_run_date = min([t.start_date for t in dag.tasks]) + elif dag.schedule_interval != '@once': next_run_date = dag.following_schedule(last_scheduled_run) - else: - next_run_date = dag.default_args['start_date'] - if not next_run_date: - raise Exception('no next_run_date defined!') - next_run = DagRun( - dag_id=dag.dag_id, - run_id='scheduled', - execution_date=next_run_date, - state=State.RUNNING, - external_trigger=False - ) - session.add(next_run) - session.commit() - + elif dag.schedule_interval == '@once' and not last_scheduled_run: + next_run_date = datetime.now() + if next_run_date and next_run_date <= datetime.now(): + next_run = DagRun( + dag_id=dag.dag_id, + run_id='scheduled__' + next_run_date.isoformat(), + execution_date=next_run_date, + state=State.RUNNING, + external_trigger=False + ) + session.add(next_run) + session.commit() def process_dag(self, dag, executor): """ @@ -387,8 +388,7 @@ class SchedulerJob(BaseJob): executors.LocalExecutor, executors.SequentialExecutor): pickle_id = dag.pickle(session).id - db_dag = session.query( - DagModel).filter(DagModel.dag_id == dag.dag_id).first() + db_dag = session.query(DagModel).filter_by(dag_id=dag.dag_id).first() last_scheduler_run = db_dag.last_scheduler_run or datetime(2000, 1, 1) secs_since_last = ( datetime.now() - last_scheduler_run).total_seconds() @@ -404,7 +404,6 @@ class SchedulerJob(BaseJob): session.commit() active_runs = dag.get_active_runs() - for task, dttm in product(dag.tasks, active_runs): if task.adhoc: continue @@ -413,7 +412,7 @@ class SchedulerJob(BaseJob): if ti.state in (State.RUNNING, State.QUEUED, State.SUCCESS): continue elif ti.is_runnable(flag_upstream_failed=True): - logging.debug('Queuing next run: ' + str(ti)) + logging.debug('Firing task: {}'.format(ti)) executor.queue_task_instance(ti, pickle_id=pickle_id) # Releasing the lock @@ -454,15 +453,37 @@ class SchedulerJob(BaseJob): for pool, tis in list(d.items()): open_slots = pools[pool].open_slots(session=session) - if open_slots > 0: - tis = sorted( - tis, key=lambda ti: (-ti.priority_weight, ti.start_date)) - for ti in tis[:open_slots]: - task = None - try: - task = dagbag.dags[ti.dag_id].get_task(ti.task_id) - except: - logging.error("Queued task {} seems gone".format(ti)) + if not open_slots: + return + tis = sorted( + tis, key=lambda ti: (-ti.priority_weight, ti.start_date)) + for ti in tis: + if not open_slots: + return + task = None + try: + task = dagbag.dags[ti.dag_id].get_task(ti.task_id) + except: + logging.error("Queued task {} seems gone".format(ti)) + session.delete(ti) + if task: + ti.task = task + + # picklin' + dag = dagbag.dags[ti.dag_id] + pickle_id = None + if self.do_pickle and self.executor.__class__ not in ( + executors.LocalExecutor, + executors.SequentialExecutor): + pickle_id = dag.pickle(session).id + + if ( + ti.are_dependencies_met() and + not task.dag.concurrency_reached): + executor.queue_task_instance( + ti, force=True, pickle_id=pickle_id) + open_slots -= 1 + else: session.delete(ti) continue ti.task = task @@ -627,8 +648,7 @@ class BackfillJob(BaseJob): start_date = start_date or task.start_date end_date = end_date or task.end_date or datetime.now() - for dttm in utils.date_range( - start_date, end_date, task.dag.schedule_interval): + for dttm in self.dag.date_range(start_date, end_date=end_date): ti = models.TaskInstance(task, dttm) tasks_to_run[ti.key] = ti diff --git a/airflow/migrations/env.py b/airflow/migrations/env.py index 80a9a1b440..42ca29fdac 100644 --- a/airflow/migrations/env.py +++ b/airflow/migrations/env.py @@ -25,6 +25,8 @@ target_metadata = models.Base.metadata # my_important_option = config.get_main_option("my_important_option") # ... etc. +COMPARE_TYPE = False + def run_migrations_offline(): """Run migrations in 'offline' mode. @@ -40,7 +42,8 @@ def run_migrations_offline(): """ url = configuration.get('core', 'SQL_ALCHEMY_CONN') context.configure( - url=url, target_metadata=target_metadata, literal_binds=True) + url=url, target_metadata=target_metadata, literal_binds=True, + compare_type=COMPARE_TYPE) with context.begin_transaction(): context.run_migrations() @@ -58,7 +61,8 @@ def run_migrations_online(): with connectable.connect() as connection: context.configure( connection=connection, - target_metadata=target_metadata + target_metadata=target_metadata, + compare_type=COMPARE_TYPE, ) with context.begin_transaction(): diff --git a/airflow/migrations/versions/19054f4ff36_add_dagrun.py b/airflow/migrations/versions/19054f4ff36_add_dagrun.py deleted file mode 100644 index 01803ea4db..0000000000 --- a/airflow/migrations/versions/19054f4ff36_add_dagrun.py +++ /dev/null @@ -1,32 +0,0 @@ -"""add DagRun - -Revision ID: 19054f4ff36 -Revises: 338e90f54d61 -Create Date: 2015-10-12 09:55:52.475712 - -""" - -# revision identifiers, used by Alembic. -revision = '19054f4ff36' -down_revision = '338e90f54d61' -branch_labels = None -depends_on = None - -from alembic import op -import sqlalchemy as sa - - -def upgrade(): - op.create_table( - 'dag_run', - sa.Column('dag_id', sa.String(length=250), nullable=False), - sa.Column('execution_date', sa.DateTime(), nullable=False), - sa.Column('state', sa.String(length=50), nullable=True), - sa.Column('run_id', sa.String(length=250), nullable=True), - sa.Column('external_trigger', sa.Boolean(), nullable=True), - sa.PrimaryKeyConstraint('dag_id', 'execution_date') - ) - - -def downgrade(): - op.drop_table('dag_run') diff --git a/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py new file mode 100644 index 0000000000..d6f18af966 --- /dev/null +++ b/airflow/migrations/versions/1b38cef5b76e_add_dagrun.py @@ -0,0 +1,35 @@ +"""add dagrun + +Revision ID: 1b38cef5b76e +Revises: 52d714495f0 +Create Date: 2015-10-27 08:31:48.475140 + +""" + +# revision identifiers, used by Alembic. +revision = '1b38cef5b76e' +down_revision = '52d714495f0' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + + +def upgrade(): + op.create_table('dag_run', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('dag_id', sa.String(length=250), nullable=True), + sa.Column('execution_date', sa.DateTime(), nullable=True), + sa.Column('state', sa.String(length=50), nullable=True), + sa.Column('run_id', sa.String(length=250), nullable=True), + sa.Column('external_trigger', sa.Boolean(), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('dag_id', 'execution_date'), + sa.UniqueConstraint('dag_id', 'run_id'), + ) + + +def downgrade(): + op.drop_table('dag_run') diff --git a/airflow/migrations/versions/2e541a1dcfed_task_duration.py b/airflow/migrations/versions/2e541a1dcfed_task_duration.py new file mode 100644 index 0000000000..851d863547 --- /dev/null +++ b/airflow/migrations/versions/2e541a1dcfed_task_duration.py @@ -0,0 +1,28 @@ +"""task_duration + +Revision ID: 2e541a1dcfed +Revises: 1b38cef5b76e +Create Date: 2015-10-28 20:38:41.266143 + +""" + +# revision identifiers, used by Alembic. +revision = '2e541a1dcfed' +down_revision = '1b38cef5b76e' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + + +def upgrade(): + op.alter_column('task_instance', 'duration', + existing_type=mysql.INTEGER(display_width=11), + type_=sa.Float(), + existing_nullable=True) + + +def downgrade(): + pass diff --git a/airflow/migrations/versions/40e67319e3a9_dagrun_config.py b/airflow/migrations/versions/40e67319e3a9_dagrun_config.py new file mode 100644 index 0000000000..7a6eed2eea --- /dev/null +++ b/airflow/migrations/versions/40e67319e3a9_dagrun_config.py @@ -0,0 +1,25 @@ +"""dagrun_config + +Revision ID: 40e67319e3a9 +Revises: 2e541a1dcfed +Create Date: 2015-10-29 08:36:31.726728 + +""" + +# revision identifiers, used by Alembic. +revision = '40e67319e3a9' +down_revision = '2e541a1dcfed' +branch_labels = None +depends_on = None + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + + +def upgrade(): + op.add_column('dag_run', sa.Column('conf', sa.PickleType(), nullable=True)) + + +def downgrade(): + op.drop_column('dag_run', 'conf') diff --git a/airflow/models.py b/airflow/models.py index 535115d501..198488e68a 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -6,7 +6,6 @@ from __future__ import unicode_literals from future.standard_library import install_aliases install_aliases() from builtins import str -from past.builtins import basestring from builtins import object, bytes import copy from datetime import datetime, timedelta @@ -26,7 +25,7 @@ from urllib.parse import urlparse from sqlalchemy import ( Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType, - Index, BigInteger) + Index, Float) from sqlalchemy import case, func, or_, and_ from sqlalchemy.ext.declarative import declarative_base, declared_attr from sqlalchemy.dialects.mysql import LONGTEXT @@ -62,7 +61,7 @@ else: LongText = Text -def clear_task_instances(tis, session): +def clear_task_instances(tis, session, activate_dag_runs=True): ''' Clears a set of task instances, but makes sure the running ones get killed. @@ -79,6 +78,15 @@ def clear_task_instances(tis, session): from airflow.jobs import BaseJob as BJ for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all(): job.state = State.SHUTDOWN + if activate_dag_runs: + execution_dates = {ti.execution_date for ti in tis} + dag_ids = {ti.dag_id for ti in tis} + drs = session.query(DagRun).filter( + DagRun.dag_id.in_(dag_ids), + DagRun.execution_date.in_(execution_dates), + ).all() + for dr in drs: + dr.state = State.RUNNING class DagBag(object): @@ -195,7 +203,6 @@ class DagBag(object): dag.full_filepath = filepath dag.is_subdag = False self.bag_dag(dag, parent_dag=dag, root_dag=dag) - # dag.pickle() self.file_last_changed[filepath] = dttm @@ -504,7 +511,7 @@ class TaskInstance(Base): execution_date = Column(DateTime, primary_key=True) start_date = Column(DateTime) end_date = Column(DateTime) - duration = Column(Integer) + duration = Column(Float) state = Column(String(20)) try_number = Column(Integer) hostname = Column(String(1000)) @@ -909,7 +916,7 @@ class TaskInstance(Base): logging.info(msg.format(**locals())) self.start_date = datetime.now() - if not force and self.pool: + if not force and (self.pool or self.task.dag.concurrency_reached): # If a pool is set for this task, marking the task instance # as QUEUED self.state = State.QUEUED @@ -1043,7 +1050,8 @@ class TaskInstance(Base): session.commit() logging.error(str(error)) - def get_template_context(self): + @provide_session + def get_template_context(self, session=None): task = self.task from airflow import macros tables = None @@ -1057,8 +1065,22 @@ class TaskInstance(Base): ti_key_str = ti_key_str.format(**locals()) params = {} - if hasattr(task, 'dag') and task.dag.params: - params.update(task.dag.params) + run_id = '' + dag_run = None + if hasattr(task, 'dag'): + if task.dag.params: + params.update(task.dag.params) + dag_run = ( + session.query(DagRun) + .filter_by( + dag_id=task.dag.dag_id, + execution_date=self.execution_date) + .first() + ) + run_id = dag_run.run_id if dag_run else None + session.expunge_all() + session.commit() + if task.params: params.update(task.params) @@ -1070,6 +1092,8 @@ class TaskInstance(Base): 'END_DATE': ds, 'ds_nodash': ds_nodash, 'end_date': ds, + 'dag_run': dag_run, + 'run_id': run_id, 'execution_date': self.execution_date, 'latest_date': ds, 'macros': macros, @@ -1114,7 +1138,7 @@ class TaskInstance(Base): def set_duration(self): if self.end_date and self.start_date: - self.duration = (self.end_date - self.start_date).seconds + self.duration = (self.end_date - self.start_date).total_seconds() else: self.duration = None @@ -1481,7 +1505,7 @@ class BaseOperator(object): schedule_interval as it may not be attached to a DAG. """ if hasattr(self, 'dag') and self.dag: - return self.dag.schedule_interval + return self.dag._schedule_interval else: return self._schedule_interval @@ -1549,7 +1573,7 @@ class BaseOperator(object): all strings in it. ''' rt = self.render_template - if isinstance(content, basestring): + if isinstance(content, six.string_types): result = jinja_env.from_string(content).render(**context) elif isinstance(content, (list, tuple)): result = [rt(e, context) for e in content] @@ -1575,9 +1599,12 @@ class BaseOperator(object): else jinja2.Environment(cache_size=0) exts = self.__class__.template_ext - return jinja_env.get_template(content).render(**context) \ - if isinstance(content, basestring) and any([content.endswith(ext) for ext in exts]) \ - else self.render_template_from_field(content, context, jinja_env) + if ( + isinstance(content, six.string_types) and + any([content.endswith(ext) for ext in exts])): + return jinja_env.get_template(content).render(**context) + else: + return self.render_template_from_field(content, context, jinja_env) def prepare_template(self): ''' @@ -1592,7 +1619,7 @@ class BaseOperator(object): # Getting the content of files for template_field / template_ext for attr in self.template_fields: content = getattr(self, attr) - if (content and isinstance(content, basestring) and + if (content and isinstance(content, six.string_types) and any([content.endswith(ext) for ext in self.template_ext])): env = self.dag.get_template_env() try: @@ -1698,8 +1725,7 @@ class BaseOperator(object): start_date = start_date or self.start_date end_date = end_date or self.end_date or datetime.now() - for dt in utils.date_range( - start_date, end_date, self.schedule_interval): + for dt in self.dag.date_range(start_date, end_date=end_date): TaskInstance(self, dt).run( mark_success=mark_success, ignore_dependencies=ignore_dependencies, @@ -1709,7 +1735,7 @@ class BaseOperator(object): logging.info('Dry run') for attr in self.template_fields: content = getattr(self, attr) - if content and isinstance(content, basestring): + if content and isinstance(content, six.string_types): logging.info('Rendering template for {0}'.format(attr)) logging.info(content) @@ -1891,6 +1917,9 @@ class DAG(object): accessible in templates, namespaced under `params`. These params can be overridden at the task level. :type params: dict + :param concurrency: the number of task instances allowed to run + concurrently + :type concurrency: int """ def __init__( @@ -1901,6 +1930,7 @@ class DAG(object): template_searchpath=None, user_defined_macros=None, default_args=None, + concurrency=configuration.getint('core', 'dag_concurrency'), params=None): self.user_defined_macros = user_defined_macros @@ -1912,13 +1942,20 @@ class DAG(object): self.start_date = start_date self.end_date = end_date or datetime.now() self.schedule_interval = schedule_interval + if schedule_interval in utils.cron_presets: + self._schedule_interval = utils.cron_presets.get(schedule_interval) + elif schedule_interval == '@once': + self._schedule_interval = None + else: + self._schedule_interval = schedule_interval self.full_filepath = full_filepath if full_filepath else '' - if isinstance(template_searchpath, basestring): + if isinstance(template_searchpath, six.string_types): template_searchpath = [template_searchpath] self.template_searchpath = template_searchpath self.parent_dag = None # Gets set when DAGs are loaded self.last_loaded = datetime.now() self.safe_dag_id = dag_id.replace('.', '__dot__') + self.concurrency = concurrency self._comps = { 'dag_id', @@ -1957,19 +1994,26 @@ class DAG(object): hash_components.append(repr(val)) return hash(tuple(hash_components)) + def date_range(self, start_date, num=None, end_date=datetime.now()): + if num: + end_date = None + return utils.date_range( + start_date=start_date, end_date=end_date, + num=num, delta=self._schedule_interval) + def following_schedule(self, dttm): - if isinstance(self.schedule_interval, six.string_types): - cron = croniter(self.schedule_interval, dttm) + if isinstance(self._schedule_interval, six.string_types): + cron = croniter(self._schedule_interval, dttm) return cron.get_next(datetime) - else: - return dttm + self.schedule_interval + elif isinstance(self._schedule_interval, timedelta): + return dttm + self._schedule_interval def previous_schedule(self, dttm): - if isinstance(self.schedule_interval, six.string_types): - cron = croniter(self.schedule_interval, dttm) + if isinstance(self._schedule_interval, six.string_types): + cron = croniter(self._schedule_interval, dttm) return cron.get_prev(datetime) - else: - return dttm - self.schedule_interval + elif isinstance(self._schedule_interval, timedelta): + return dttm - self._schedule_interval @property def task_ids(self): @@ -1995,6 +2039,21 @@ class DAG(object): def owner(self): return ", ".join(list(set([t.owner for t in self.tasks]))) + @property + @provide_session + def concurrency_reached(self, session=None): + """ + Returns a boolean as to whether the concurrency limit for this DAG + has been reached + """ + TI = TaskInstance + qry = session.query(func.count(TI)).filter( + TI.dag_id == self.dag_id, + TI.task_id.in_(self.task_ids), + TI.state == State.RUNNING, + ) + return qry.scalar() >= self.concurrency + @property def latest_execution_date(self): """ @@ -2030,8 +2089,7 @@ class DAG(object): """ TI = TaskInstance session = settings.Session() - # Checking state of active DagRuns - active_runs = [] + active_dates = [] active_runs = ( session.query(DagRun) .filter( @@ -2040,7 +2098,7 @@ class DAG(object): .all() ) for run in active_runs: - logging.info("Checking state for " + str(run)) + logging.info("Checking state for {}".format(run)) task_instances = session.query(TI).filter( TI.dag_id == run.dag_id, TI.task_id.in_(self.task_ids), @@ -2051,13 +2109,18 @@ class DAG(object): if State.FAILED in task_states: logging.info('Marking run {} failed'.format(run)) run.state = State.FAILED - elif set(task_states) == set([State.SUCCESS]): + elif len( + set(task_states) | + set([State.SUCCESS, State.SKIPPED]) + ) == 2: logging.info('Marking run {} successful'.format(run)) run.state = State.SUCCESS else: - active_runs.append(run.execution_date) + active_dates.append(run.execution_date) + else: + active_dates.append(run.execution_date) session.commit() - return active_runs + return active_dates def resolve_template_files(self): for t in self.tasks: @@ -2129,19 +2192,27 @@ class DAG(object): def roots(self): return [t for t in self.tasks if not t.downstream_list] + @provide_session + def set_dag_runs_state( + self, start_date, end_date, state=State.RUNNING, session=None): + dates = utils.date_range(start_date, end_date) + drs = session.query(DagModel).filter_by(dag_id=self.dag_id).all() + for dr in drs: + dr.state = State.RUNNING + def clear( self, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, + reset_dag_runs=True, dry_run=False): session = settings.Session() """ Clears a set of task instances associated with the current dag for a specified date range. """ - TI = TaskInstance tis = session.query(TI) if include_subdags: @@ -2171,6 +2242,7 @@ class DAG(object): return tis count = tis.count() + do_it = True if count == 0: print("Nothing to clear.") return 0 @@ -2180,13 +2252,15 @@ class DAG(object): "You are about to delete these {count} tasks:\n" "{ti_list}\n\n" "Are you sure? (yes/no): ").format(**locals()) - if utils.ask_yesno(question): - clear_task_instances(tis, session) - else: - count = 0 - print("Bail. Nothing was cleared.") - else: + do_it = utils.ask_yesno(question) + + if do_it: clear_task_instances(tis, session) + if reset_dag_runs: + self.set_dag_runs_state(start_date, end_date, session=session) + else: + count = 0 + print("Bail. Nothing was cleared.") session.commit() session.close() @@ -2571,15 +2645,23 @@ class DagRun(Base): """ __tablename__ = "dag_run" - dag_id = Column(String(ID_LEN), primary_key=True) - execution_date = Column(DateTime, primary_key=True) - state = Column(String(50)) + id = Column(Integer, primary_key=True) + dag_id = Column(String(ID_LEN)) + execution_date = Column(DateTime, default=datetime.now()) + state = Column(String(50), default=State.RUNNING) run_id = Column(String(ID_LEN)) - external_trigger = Column(Boolean, default=False) + external_trigger = Column(Boolean, default=True) + conf = Column(PickleType) + + __table_args__ = ( + Index('dr_run_id', dag_id, run_id, unique=True), + ) def __repr__(self): - return ''.format( + return ( + '' + ).format( dag_id=self.dag_id, execution_date=self.execution_date, run_id=self.run_id, diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 6c6d92b1ca..b1c14ed30e 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -71,12 +71,9 @@ class SqlSensor(BaseSensorOperator): @apply_defaults def __init__(self, conn_id, sql, *args, **kwargs): - - super(SqlSensor, self).__init__(*args, **kwargs) - self.sql = sql self.conn_id = conn_id - + super(SqlSensor, self).__init__(*args, **kwargs) def poke(self, context): hook = BaseHook.get_connection(self.conn_id).get_hook() @@ -447,10 +444,9 @@ class TimeDeltaSensor(BaseSensorOperator): self.delta = delta def poke(self, context): - target_dttm = ( - context['execution_date'] + - context['dag'].schedule_interval + - self.delta) + dag = context['dag'] + target_dttm = dag.following_schedule(context['execution_date']) + target_dttm += self.delta logging.info('Checking if the time ({0}) has come'.format(target_dttm)) return datetime.now() > target_dttm diff --git a/airflow/utils.py b/airflow/utils.py index 483da6eb82..21a6e7a989 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -85,7 +85,10 @@ class State(object): @classmethod def color(cls, state): - return cls.state_color[state] + if state in cls.state_color: + return cls.state_color[state] + else: + return 'white' @classmethod def runnable(cls): @@ -94,6 +97,14 @@ class State(object): cls.SKIPPED] +cron_presets = { + '@hourly': '0 * * * *', + '@daily': '0 0 * * *', + '@weekly': '0 0 * * 0', + '@monthly': '0 0 1 * *', + '@yearly': '0 0 1 1 *', +} + def provide_session(func): """ Function decorator that provides a session if it isn't provided. @@ -284,31 +295,49 @@ def validate_key(k, max_length=250): return True -def date_range(start_date, end_date=datetime.now(), delta=timedelta(1)): +def date_range( + start_date, + end_date=None, + num=None, + delta=None): """ Get a set of dates as a list based on a start, end and delta, delta can be something that can be added to ``datetime.datetime`` or a cron expression as a ``str`` - >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3)) - [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 1, 2, 0, 0), - datetime.datetime(2016, 1, 3, 0, 0)] - >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), '0 0 * * *') - [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 1, 2, 0, 0), - datetime.datetime(2016, 1, 3, 0, 0)] + :param start_date: anchor date to start the series from + :type start_date: datetime.datetime + :param end_date: right boundary for the date range + :type end_date: datetime.datetime + :param num: alternatively to end_date, you can specify the number of + number of entries you want in the range. This number can be negative, + output will always be sorted regardless + :type num: int + + >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1)) + [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), datetime.datetime(2016, 1, 3, 0, 0)] + >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta='0 0 * * *') + [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), datetime.datetime(2016, 1, 3, 0, 0)] >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *") - [datetime.datetime(2016, 1, 1, 0, 0), - datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0), datetime.datetime(2016, 3, 1, 0, 0)] """ + if not delta: + return [] + if end_date and start_date > end_date: + raise Exception("Wait. start_date needs to be before end_date") + if end_date and num: + raise Exception("Wait. Either specify end_date OR num") + if not end_date and not num: + end_date = datetime.now() + delta_iscron = False if isinstance(delta, six.string_types): delta_iscron = True cron = croniter(delta, start_date) + else: + delta = abs(delta) l = [] - if end_date >= start_date: + if end_date: while start_date <= end_date: l.append(start_date) if delta_iscron: @@ -316,8 +345,19 @@ def date_range(start_date, end_date=datetime.now(), delta=timedelta(1)): else: start_date += delta else: - raise AirflowException("start_date can't be after end_date") - return l + for i in range(abs(num)): + l.append(start_date) + if delta_iscron: + if num > 0: + start_date = cron.get_next(datetime) + else: + start_date = cron.get_prev(datetime) + else: + if num > 0: + start_date += delta + else: + start_date -= delta + return sorted(l) def json_ser(obj): diff --git a/airflow/www/app.py b/airflow/www/app.py index 75282df731..9e485ba2ff 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -41,25 +41,46 @@ def create_app(config=None): index_view=views.HomeView(endpoint='', url='/admin'), template_mode='bootstrap3', ) + av = admin.add_view + vs = views + av(vs.Airflow(name='DAGs')) - admin.add_view(views.Airflow(name='DAGs')) + av(vs.QueryView(name='Ad Hoc Query', category="Data Profiling")) + av(vs.ChartModelView( + models.Chart, Session, name="Charts", category="Data Profiling")) + av(vs.KnowEventView( + models.KnownEvent, + Session, name="Known Events", category="Data Profiling")) + av(vs.SlaMissModelView( + models.SlaMiss, + Session, name="SLA Misses", category="Browse")) + av(vs.TaskInstanceModelView(models.TaskInstance, + Session, name="Task Instances", category="Browse")) + av(vs.LogModelView( + models.Log, Session, name="Logs", category="Browse")) + av(vs.JobModelView( + jobs.BaseJob, Session, name="Jobs", category="Browse")) + av(vs.PoolModelView( + models.Pool, Session, name="Pools", category="Admin")) + av(vs.ConfigurationView( + name='Configuration', category="Admin")) + av(vs.UserModelView( + models.User, Session, name="Users", category="Admin")) + av(vs.ConnectionModelView( + models.Connection, Session, name="Connections", category="Admin")) + av(vs.VariableView( + models.Variable, Session, name="Variables", category="Admin")) - admin.add_view(views.SlaMissModelView(models.SlaMiss, Session, name="SLA Misses", category="Browse")) - admin.add_view(views.TaskInstanceModelView(models.TaskInstance, Session, name="Task Instances", category="Browse")) - admin.add_view(views.LogModelView(models.Log, Session, name="Logs", category="Browse")) - admin.add_view(views.JobModelView(jobs.BaseJob, Session, name="Jobs", category="Browse")) - admin.add_view(views.QueryView(name='Ad Hoc Query', category="Data Profiling")) - admin.add_view(views.ChartModelView(models.Chart, Session, name="Charts", category="Data Profiling")) - admin.add_view(views.KnowEventView(models.KnownEvent, Session, name="Known Events", category="Data Profiling")) - admin.add_view(views.PoolModelView(models.Pool, Session, name="Pools", category="Admin")) - admin.add_view(views.ConfigurationView(name='Configuration', category="Admin")) - admin.add_view(views.UserModelView(models.User, Session, name="Users", category="Admin")) - admin.add_view(views.ConnectionModelView(models.Connection, Session, name="Connections", category="Admin")) - admin.add_view(views.VariableView(models.Variable, Session, name="Variables", category="Admin")) - admin.add_link(base.MenuLink(category='Docs', name='Documentation', url='http://pythonhosted.org/airflow/')) - admin.add_link(base.MenuLink(category='Docs',name='Github',url='https://github.com/airbnb/airflow')) + admin.add_link(base.MenuLink( + category='Docs', name='Documentation', + url='http://pythonhosted.org/airflow/')) + admin.add_link( + base.MenuLink(category='Docs', + name='Github',url='https://github.com/airbnb/airflow')) - admin.add_view(views.DagModelView(models.DagModel, Session, name=None)) + av(vs.DagRunModelView( + models.DagRun, Session, name="DAG Runs", category="Browse")) + av(vs.DagModelView(models.DagModel, Session, name=None)) # Hack to not add this view to the menu admin._menu = admin._menu[:-1] @@ -70,7 +91,6 @@ def create_app(config=None): for v in admin_views: admin.add_view(v) for bp in flask_blueprints: - print(bp) app.register_blueprint(bp) for ml in menu_links: admin.add_link(ml) diff --git a/airflow/www/forms.py b/airflow/www/forms.py index 74a5729968..be74589af2 100644 --- a/airflow/www/forms.py +++ b/airflow/www/forms.py @@ -16,17 +16,6 @@ class DateTimeForm(Form): "Execution date", widget=DateTimePickerWidget()) -class GraphForm(Form): - execution_date = DateTimeField( - "Execution date", widget=DateTimePickerWidget()) - arrange = SelectField("Layout", choices=( - ('LR', "Left->Right"), - ('RL', "Right->Left"), - ('TB', "Top->Bottom"), - ('BT', "Bottom->Top"), - )) - - class TreeForm(Form): base_date = DateTimeField( "Anchor date", widget=DateTimePickerWidget(), default=datetime.now()) diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 7773f7027a..f73ac8686b 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -7,54 +7,64 @@ {% endblock %} {% block body %} -

- {% if dag.parent_dag %} - SUBDAG: {{ dag.dag_id }} - {% else %} - DAG: {{ dag.dag_id }} - {% endif %} - {% if root %} - ROOT: {{ root }} - {% endif %} -

- + + +
+
+ +