From 477e2460dada072986e758943859699ca0021ac2 Mon Sep 17 00:00:00 2001 From: Maxime Beauchemin Date: Thu, 20 Nov 2014 09:09:12 -0800 Subject: [PATCH] Reviewing the master command's logic --- airflow/bin/airflow | 1 - airflow/executors/base_executor.py | 9 ++++++--- airflow/executors/local_executor.py | 14 +++++++------- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/airflow/bin/airflow b/airflow/bin/airflow index 8267162660..7ff587ec58 100755 --- a/airflow/bin/airflow +++ b/airflow/bin/airflow @@ -152,7 +152,6 @@ def master(args): for task in dag.tasks: if task.task_id not in ti_dict: # Brand new task, let's get started - print "SD:" + str(task.start_date) ti = TI(task, task.start_date) executor.queue_command(ti.key, ti.command()) else: diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 7422bdb942..2240a0b500 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -1,5 +1,7 @@ import logging +from airflow.utils import State + class BaseExecutor(object): @@ -17,9 +19,10 @@ class BaseExecutor(object): def queue_command(self, key, command): """ """ - logging.info("Adding to queue: " + command) - self.commands[key] = "running" - self.execute_async(key, command) + if key not in self.commands or self.commands[key] in State.runnable(): + logging.info("Adding to queue: " + command) + self.commands[key] = State.RUNNING + self.execute_async(key, command) def change_state(self, key, state): self.commands[key] = state diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 6fbcfd714f..d157fbbecc 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -17,15 +17,13 @@ class LocalWorker(multiprocessing.Process): self.result_queue = result_queue def run(self): - proc_name = self.name while True: key, command = self.task_queue.get() if key is None: # Received poison pill, no more tasks to run self.task_queue.task_done() break - BASE_FOLDER = getconf().get('core', 'BASE_FOLDER') - print command + logging.info(command) command = ( "exec bash -c '" "cd $AIRFLOW_HOME;\n" + @@ -34,11 +32,13 @@ class LocalWorker(multiprocessing.Process): "'" ).format(**locals()) try: - sp = subprocess.Popen(command, shell=True).wait() + subprocess.Popen(command, shell=True).wait() except Exception as e: - self.result_queue.put((key, State.FAILED)) - raise e - self.result_queue.put((key, State.SUCCESS)) + state = State.FAILED + logging.error(str(e)) + # raise e + state = State.SUCCESS + self.result_queue.put((key, state)) self.task_queue.task_done() time.sleep(1)