Merge pull request #61 from mistercrunch/backfill_dep_past

depends_on_past backfill fix
This commit is contained in:
Maxime Beauchemin 2014-12-17 08:37:37 -08:00
Родитель 224a9a0563 e8d005058b
Коммит ae92cde2e7
3 изменённых файлов: 19 добавлений и 10 удалений

Просмотреть файл

@ -248,6 +248,7 @@ class BackfillJob(BaseJob):
dag, start_date=None, end_date=None, mark_success=False, dag, start_date=None, end_date=None, mark_success=False,
*args, **kwargs): *args, **kwargs):
self.dag = dag self.dag = dag
dag.override_start_date(start_date)
self.dag_id = dag.dag_id self.dag_id = dag.dag_id
self.bf_start_date = start_date self.bf_start_date = start_date
self.bf_end_date = end_date self.bf_end_date = end_date

Просмотреть файл

@ -392,9 +392,7 @@ class TaskInstance(Base):
self.execution_date-task.schedule_interval, self.execution_date-task.schedule_interval,
TI.state == State.SUCCESS, TI.state == State.SUCCESS,
).first() ).first()
if previous_ti: if not previous_ti:
previous_ti.task = task
if previous_ti.state != State.SUCCESS:
return False return False
# Applying wait_for_downstream # Applying wait_for_downstream
@ -957,6 +955,15 @@ class DAG(Base):
def pickle(self): def pickle(self):
return pickle.dumps(self) return pickle.dumps(self)
def override_start_date(self, start_date):
"""
Sets start_date of all tasks and of the DAG itself to a certain date.
This is used by BackfillJob.
"""
for t in self.tasks:
t.start_date = start_date
self.start_date = start_date
def set_dependency(self, upstream_task_id, downstream_task_id): def set_dependency(self, upstream_task_id, downstream_task_id):
""" """
Simple utility method to set dependency between two tasks that Simple utility method to set dependency between two tasks that

Просмотреть файл

@ -5,6 +5,7 @@ from datetime import datetime
default_args = { default_args = {
'owner': 'mistercrunch', 'owner': 'mistercrunch',
'start_date': datetime(2014, 10, 1), 'start_date': datetime(2014, 10, 1),
'depends_on_past': True,
} }
dag = DAG(dag_id='example_2') dag = DAG(dag_id='example_2')
@ -21,8 +22,8 @@ for i in range(10):
i = str(i) i = str(i)
task = BashOperator( task = BashOperator(
task_id='runme_'+i, task_id='runme_'+i,
bash_command='echo "'+str(i)+': {{ ti.execution_date }}"', bash_command='sleep 10',
**default_args) default_args=default_args)
task.set_downstream(run_this) task.set_downstream(run_this)
dag.add_task(task) dag.add_task(task)