Merge pull request #26 from airbnb/backfill_start_date

Backfill start_date to override the tasks's start_date
This commit is contained in:
Maxime Beauchemin 2015-06-13 15:44:20 -07:00
Родитель 62ad66ebb1 e894cb77f0
Коммит 2dc75fa4c0
4 изменённых файлов: 18 добавлений и 1 удалений

Просмотреть файл

@ -100,6 +100,10 @@ def run(args):
dag = dag_pickle.pickle
task = dag.get_task(task_id=args.task_id)
task_start_date = None
if args.task_start_date:
task_start_date = dateutil.parser.parse(args.task_start_date)
task.start_date = task_start_date
ti = TaskInstance(task, args.execution_date)
if args.local:
@ -109,6 +113,7 @@ def run(args):
mark_success=args.mark_success,
force=args.force,
pickle_id=args.pickle,
task_start_date=task_start_date,
ignore_dependencies=args.ignore_dependencies)
run_job.run()
elif args.raw:
@ -394,6 +399,9 @@ def get_parser():
parser_run.add_argument(
"-sd", "--subdir", help=subdir_help,
default=DAGS_FOLDER)
parser_run.add_argument(
"-s", "--task_start_date",
help="Override the tasks's start_date (used internally)",)
parser_run.add_argument(
"-m", "--mark_success", help=mark_success_help, action="store_true")
parser_run.add_argument(

Просмотреть файл

@ -36,12 +36,13 @@ class BaseExecutor(object):
def queue_task_instance(
self, task_instance, mark_success=False, pickle_id=None,
force=False, ignore_dependencies=False):
force=False, ignore_dependencies=False, task_start_date=None):
command = task_instance.command(
local=True,
mark_success=mark_success,
force=force,
ignore_dependencies=ignore_dependencies,
task_start_date=task_start_date,
pickle_id=pickle_id)
self.queue_command(
task_instance.key,

Просмотреть файл

@ -507,6 +507,7 @@ class BackfillJob(BaseJob):
executor.queue_task_instance(
ti,
mark_success=self.mark_success,
task_start_date=self.bf_start_date,
pickle_id=pickle_id)
ti.state = State.RUNNING
if key not in started:
@ -555,12 +556,14 @@ class LocalTaskJob(BaseJob):
force=False,
mark_success=False,
pickle_id=None,
task_start_date=None,
*args, **kwargs):
self.task_instance = task_instance
self.ignore_dependencies = ignore_dependencies
self.force = force
self.pickle_id = pickle_id
self.mark_success = mark_success
self.task_start_date = task_start_date
super(LocalTaskJob, self).__init__(*args, **kwargs)
def _execute(self):
@ -570,6 +573,7 @@ class LocalTaskJob(BaseJob):
force=self.force,
pickle_id=self.pickle_id,
mark_success=self.mark_success,
task_start_date=self.task_start_date,
job_id=self.id,
)
self.process = subprocess.Popen(['bash', '-c', command])

Просмотреть файл

@ -411,6 +411,7 @@ class TaskInstance(Base):
local=False,
pickle_id=None,
raw=False,
task_start_date=None,
job_id=None):
"""
Returns a command that can be executed anywhere where airflow is
@ -424,6 +425,8 @@ class TaskInstance(Base):
ignore_dependencies = "-i" if ignore_dependencies else ""
force = "--force" if force else ""
local = "--local" if local else ""
task_start_date = \
"-s " + task_start_date.isoformat() if task_start_date else ""
raw = "--raw" if raw else ""
subdir = ""
if not pickle and self.task.dag and self.task.dag.full_filepath:
@ -439,6 +442,7 @@ class TaskInstance(Base):
"{job_id} "
"{raw} "
"{subdir} "
"{task_start_date} "
).format(**locals())
@property