Merge pull request #13 from mistercrunch/tweaks

Fixes
This commit is contained in:
Maxime Beauchemin 2014-11-18 13:45:42 -08:00
Родитель 9ef4fbdaf6 e2b8a3cffc
Коммит 13fb79cf7b
3 изменённых файлов: 18 добавлений и 16 удалений

Просмотреть файл

@ -332,13 +332,14 @@ class TaskInstance(Base):
# Checking that all upstream dependencies have succeeded
if task._upstream_list:
upstream_task_ids = [t.task_id for t in task._upstream_list]
ti = session.query(TaskInstance).filter(
ti = session.query(func.count(TaskInstance.task_id)).filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id.in_(upstream_task_ids),
TaskInstance.execution_date == self.execution_date,
TaskInstance.state == State.SUCCESS,
)
if ti.count() < len(task._upstream_list):
count = ti[0][0]
if count < len(task._upstream_list):
return False
if not main_session:
@ -590,16 +591,16 @@ class BackfillJob(BaseJob):
for dttm in utils.date_range(
start_date, end_date, task.dag.schedule_interval):
ti = TaskInstance(task, dttm)
ti.refresh_from_db()
if ti.state != State.SUCCESS:
task_instances[ti.key] = ti
task_instances[ti.key] = ti
# Triggering what is ready to get triggered
while task_instances:
for key, ti in task_instances.items():
ti.refresh_from_db()
if ti.state == State.SUCCESS and key in task_instances:
del task_instances[key]
elif ti.is_runnable():
print "Runnable: " + ti.task_id
executor.queue_command(
key=ti.key, command=ti.command(
mark_success=mark_success,
@ -616,15 +617,12 @@ class BackfillJob(BaseJob):
ti = task_instances[key]
ti.refresh_from_db()
if ti.state == State.FAILED:
# Removing downstream tasks from the one that has failed
logging.error("Task instance " + str(key) + " failed")
downstream = [
t.task_id
for t in dag.get_task(task_id).get_flat_relatives(
upstream=False)]
del task_instances[key]
for task_id in downstream:
key = (ti.dag_id, task_id, execution_date)
# Removing downstream tasks from the one that has failed
for t in dag.get_task(task_id).get_flat_relatives(
upstream=False):
key = (ti.dag_id, t.task_id, execution_date)
if key in task_instances:
del task_instances[key]
elif ti.state == State.SUCCESS:
@ -990,18 +988,22 @@ class DAG(Base):
self.get_task(downstream_task_id))
def get_task_instances(self, start_date=None, end_date=None):
session = settings.Session()
session = settings.Session(expire_on_commit=False)
TI = TaskInstance
if not start_date:
start_date = (datetime.today()-timedelta(30)).date()
start_date = datetime.combine(start_date, datetime.min.time())
if not end_date:
end_date = datetime.now()
return session.query(TI).filter(
tis = session.query(TI).filter(
TI.dag_id == self.dag_id,
TI.execution_date >= start_date,
TI.execution_date <= end_date,
).all()
session.commit()
session.close()
return tis
@property
def roots(self):

Просмотреть файл

@ -189,8 +189,8 @@ class Airflow(BaseView):
@expose('/tree')
def tree(self):
dag_id = request.args.get('dag_id')
dag = dagbag.dags[dag_id]
action = request.args.get('action')
dag = dagbag.dags[dag_id]
base_date = request.args.get('base_date')
if not base_date:

Просмотреть файл

@ -57,7 +57,7 @@
// Assigning css classes based on state to nodes for border coloring
$.each(task_instances, function(task_id, ti) {
$("tspan:contains(" + task_id + ")")
$('tspan').filter(function(index) { return $(this).text() === task_id; })
.parent().parent().parent()
.attr("class", "node enter " + ti.state)
.attr("data-toggle", "tooltip")