Adding --local option to run command

This commit is contained in:
Maxime Beauchemin 2015-01-21 21:54:21 -08:00
Родитель 100198983e
Коммит 4a12bd5838
4 изменённых файлов: 24 добавлений и 13 удалений

Просмотреть файл

@ -1,6 +1,7 @@
#!/usr/bin/env python
from airflow.configuration import conf
from airflow.executors import DEFAULT_EXECUTOR
from airflow import settings
from airflow import utils
from airflow import jobs
@ -10,7 +11,6 @@ import dateutil.parser
from datetime import datetime
import logging
import os
import signal
import sys
import argparse
@ -67,7 +67,6 @@ def run(args):
logging.basicConfig(
filename=filename, level=logging.INFO,
format=settings.LOG_FORMAT)
print("Logging into: " + filename)
if not args.pickle:
dagbag = DagBag(args.subdir)
@ -85,13 +84,19 @@ def run(args):
# TODO: add run_local and fire it with the right executor from run
ti = TaskInstance(task, args.execution_date)
run_job = jobs.LocalTaskJob(
task_instance=ti,
mark_success=args.mark_success,
force=args.force,
ignore_dependencies=args.ignore_dependencies)
run_job.run()
if args.local:
print("Logging into: " + filename)
run_job = jobs.LocalTaskJob(
task_instance=ti,
mark_success=args.mark_success,
force=args.force,
ignore_dependencies=args.ignore_dependencies)
run_job.run()
else:
executor = DEFAULT_EXECUTOR
executor.start()
executor.queue_command(ti.key, ti.command())
executor.end()
def list_dags(args):
@ -344,6 +349,10 @@ if __name__ == '__main__':
"-f", "--force",
help="Force a run regardless or previous success",
action="store_true")
parser_run.add_argument(
"-l", "--local",
help="Runs the task locally, don't use the executor",
action="store_true")
parser_run.add_argument(
"-i", "--ignore_dependencies",
help="Ignore upstream and depends_on_past dependencies",

Просмотреть файл

@ -23,7 +23,8 @@ class SequentialExecutor(BaseExecutor):
def heartbeat(self):
for key, command in self.commands_to_run:
try:
sp = subprocess.Popen(command, shell=True).wait()
sp = subprocess.Popen(command, shell=True)
sp.wait()
except Exception as e:
self.change_state(key, State.FAILED)
raise e
@ -31,4 +32,4 @@ class SequentialExecutor(BaseExecutor):
self.commands_to_run = []
def end(self):
pass
self.heartbeat()

Просмотреть файл

@ -394,7 +394,6 @@ class LocalTaskJob(BaseJob):
})
self.thr = thr
thr.start()
while thr.is_alive():
self.heartbeat()

Просмотреть файл

@ -256,18 +256,19 @@ class TaskInstance(Base):
mark_success=False,
ignore_dependencies=False,
force=False,
local=True,
pickle_id=None):
"""
Returns a command that can be executed anywhere where airflow is
installed. This command is part of the message sent to executors by
the orchestrator.
"""
iso = self.execution_date.isoformat()
mark_success = "--mark_success" if mark_success else ""
pickle = "--pickle {0}".format(pickle_id) if pickle_id else ""
ignore_dependencies = "-i" if ignore_dependencies else ""
force = "--force" if force else ""
local = "--local" if local else ""
subdir = ""
if not pickle and self.task.dag and self.task.dag.full_filepath:
subdir = "-sd {0}".format(self.task.dag.full_filepath)
@ -276,6 +277,7 @@ class TaskInstance(Base):
"{self.dag_id} {self.task_id} {iso} "
"{mark_success} "
"{pickle} "
"{local} "
"{ignore_dependencies} "
"{force} "
"{subdir} "