Reviewing the master command's logic

This commit is contained in:
Maxime Beauchemin 2014-11-20 09:09:12 -08:00
Родитель 74018b52bf
Коммит 477e2460da
3 изменённых файлов: 13 добавлений и 11 удалений

Просмотреть файл

@ -152,7 +152,6 @@ def master(args):
for task in dag.tasks:
if task.task_id not in ti_dict:
# Brand new task, let's get started
print "SD:" + str(task.start_date)
ti = TI(task, task.start_date)
executor.queue_command(ti.key, ti.command())
else:

Просмотреть файл

@ -1,5 +1,7 @@
import logging
from airflow.utils import State
class BaseExecutor(object):
@ -17,9 +19,10 @@ class BaseExecutor(object):
def queue_command(self, key, command):
"""
"""
logging.info("Adding to queue: " + command)
self.commands[key] = "running"
self.execute_async(key, command)
if key not in self.commands or self.commands[key] in State.runnable():
logging.info("Adding to queue: " + command)
self.commands[key] = State.RUNNING
self.execute_async(key, command)
def change_state(self, key, state):
self.commands[key] = state

Просмотреть файл

@ -17,15 +17,13 @@ class LocalWorker(multiprocessing.Process):
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
key, command = self.task_queue.get()
if key is None:
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
BASE_FOLDER = getconf().get('core', 'BASE_FOLDER')
print command
logging.info(command)
command = (
"exec bash -c '"
"cd $AIRFLOW_HOME;\n" +
@ -34,11 +32,13 @@ class LocalWorker(multiprocessing.Process):
"'"
).format(**locals())
try:
sp = subprocess.Popen(command, shell=True).wait()
subprocess.Popen(command, shell=True).wait()
except Exception as e:
self.result_queue.put((key, State.FAILED))
raise e
self.result_queue.put((key, State.SUCCESS))
state = State.FAILED
logging.error(str(e))
# raise e
state = State.SUCCESS
self.result_queue.put((key, state))
self.task_queue.task_done()
time.sleep(1)