diff --git a/airflow/bin/airflow b/airflow/bin/airflow index ea0ca8919a..ac5568716b 100755 --- a/airflow/bin/airflow +++ b/airflow/bin/airflow @@ -1,6 +1,7 @@ #!/usr/bin/env python from airflow.configuration import conf +from airflow.executors import DEFAULT_EXECUTOR from airflow import settings from airflow import utils from airflow import jobs @@ -10,7 +11,6 @@ import dateutil.parser from datetime import datetime import logging import os -import signal import sys import argparse @@ -67,7 +67,6 @@ def run(args): logging.basicConfig( filename=filename, level=logging.INFO, format=settings.LOG_FORMAT) - print("Logging into: " + filename) if not args.pickle: dagbag = DagBag(args.subdir) @@ -85,13 +84,19 @@ def run(args): # TODO: add run_local and fire it with the right executor from run ti = TaskInstance(task, args.execution_date) - - run_job = jobs.LocalTaskJob( - task_instance=ti, - mark_success=args.mark_success, - force=args.force, - ignore_dependencies=args.ignore_dependencies) - run_job.run() + if args.local: + print("Logging into: " + filename) + run_job = jobs.LocalTaskJob( + task_instance=ti, + mark_success=args.mark_success, + force=args.force, + ignore_dependencies=args.ignore_dependencies) + run_job.run() + else: + executor = DEFAULT_EXECUTOR + executor.start() + executor.queue_command(ti.key, ti.command()) + executor.end() def list_dags(args): @@ -344,6 +349,10 @@ if __name__ == '__main__': "-f", "--force", help="Force a run regardless or previous success", action="store_true") + parser_run.add_argument( + "-l", "--local", + help="Runs the task locally, don't use the executor", + action="store_true") parser_run.add_argument( "-i", "--ignore_dependencies", help="Ignore upstream and depends_on_past dependencies", diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index e1a8a88547..3209f0cfd4 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -23,7 +23,8 @@ class SequentialExecutor(BaseExecutor): def heartbeat(self): for key, command in self.commands_to_run: try: - sp = subprocess.Popen(command, shell=True).wait() + sp = subprocess.Popen(command, shell=True) + sp.wait() except Exception as e: self.change_state(key, State.FAILED) raise e @@ -31,4 +32,4 @@ class SequentialExecutor(BaseExecutor): self.commands_to_run = [] def end(self): - pass + self.heartbeat() diff --git a/airflow/jobs.py b/airflow/jobs.py index c5b4d7ec1a..c35616a642 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -394,7 +394,6 @@ class LocalTaskJob(BaseJob): }) self.thr = thr - thr.start() while thr.is_alive(): self.heartbeat() diff --git a/airflow/models.py b/airflow/models.py index a2702f33a1..3fc7239408 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -256,18 +256,19 @@ class TaskInstance(Base): mark_success=False, ignore_dependencies=False, force=False, + local=True, pickle_id=None): """ Returns a command that can be executed anywhere where airflow is installed. This command is part of the message sent to executors by the orchestrator. """ - iso = self.execution_date.isoformat() mark_success = "--mark_success" if mark_success else "" pickle = "--pickle {0}".format(pickle_id) if pickle_id else "" ignore_dependencies = "-i" if ignore_dependencies else "" force = "--force" if force else "" + local = "--local" if local else "" subdir = "" if not pickle and self.task.dag and self.task.dag.full_filepath: subdir = "-sd {0}".format(self.task.dag.full_filepath) @@ -276,6 +277,7 @@ class TaskInstance(Base): "{self.dag_id} {self.task_id} {iso} " "{mark_success} " "{pickle} " + "{local} " "{ignore_dependencies} " "{force} " "{subdir} "