This commit is contained in:
Maxime Beauchemin 2015-05-26 15:47:58 -04:00
Родитель bffb403077
Коммит 59d17609a7
5 изменённых файлов: 125 добавлений и 33 удалений

Просмотреть файл

@ -1,3 +1,4 @@
from collections import defaultdict
from datetime import datetime
import getpass
import logging
@ -305,7 +306,7 @@ class SchedulerJob(BaseJob):
execution_date=next_schedule,
)
ti.refresh_from_db()
if ti.is_runnable():
if ti.is_queueable():
logging.debug('Queuing next run: ' + str(ti))
executor.queue_task_instance(ti)
# Releasing the lock
@ -321,6 +322,37 @@ class SchedulerJob(BaseJob):
session.close()
@utils.provide_session
def prioritize_queued(self, session, executor, dagbag):
# Prioritizing queued task instances
pools = {p.pool: p for p in session.query(models.Pool).all()}
TI = models.TaskInstance
queued_tis = (
session.query(TI)
.filter(TI.state == State.QUEUED)
.all()
)
d = defaultdict(list)
for ti in queued_tis:
d[ti.pool].append(ti)
for pool, tis in d.items():
open_slots = pools[ti.pool].open_slots(session=session)
if open_slots > 0:
tis = sorted(
tis, key=lambda ti: ti.priority_weight, reverse=True)
for ti in tis[:open_slots]:
task = None
try:
task = dagbag.dags[ti.dag_id].get_task(ti.task_id)
except:
logging.error("Queued task {} seems gone".format(ti))
if task:
ti.task = task
executor.queue_task_instance(ti)
def _execute(self):
dag_id = self.dag_id
@ -331,17 +363,19 @@ class SchedulerJob(BaseJob):
utils.pessimistic_connection_handling()
# Sleep time (seconds) between scheduler runs
logging.basicConfig(level=logging.DEBUG)
logging.info("Starting the scheduler")
# This should get new code
dagbag = models.DagBag(self.subdir, sync_to_db=True)
executor = dagbag.executor
executor.start()
i = 0
while (not self.test_mode) or i < 1:
try:
self.prioritize_queued(executor=executor, dagbag=dagbag)
except Exception as e:
logging.exception(e)
i += 1
try:
if i % self.refresh_dags_every == 0:

Просмотреть файл

@ -24,7 +24,7 @@ from airflow.configuration import conf
from airflow import settings
from airflow import utils
from airflow.utils import State
from airflow.utils import apply_defaults
from airflow.utils import apply_defaults, provide_session
Base = declarative_base()
ID_LEN = 250
@ -314,6 +314,8 @@ class Connection(Base):
return hooks.PrestoHook(presto_conn_id=self.conn_id)
elif self.conn_type == 'hiveserver2':
return hooks.HiveServer2Hook(hiveserver2_conn_id=self.conn_id)
elif self.conn_type == 'sqlite':
return hooks.SqliteHook(sqlite_conn_id=self.conn_id)
except:
return None
@ -515,12 +517,13 @@ class TaskInstance(Base):
"""
return (self.dag_id, self.task_id, self.execution_date)
def is_runnable(self):
def is_queueable(self):
"""
Returns a boolean on whether the task instance has met all dependencies
and is ready to run. It considers the task's state, the state
of its dependencies, depends_on_past and makes sure the execution
isn't in the future.
isn't in the future. It doesn't take into
account whether the pool has a slot for it to run.
"""
if self.execution_date > datetime.now() - self.task.schedule_interval:
return False
@ -530,12 +533,18 @@ class TaskInstance(Base):
return False
elif (
self.state in State.runnable() and
self.are_dependencies_met() and
not self.pool_full()):
self.are_dependencies_met()):
return True
else:
return False
def is_runnable(self):
"""
Returns whether a task is ready to run AND there's room in the
queue.
"""
return self.is_queueable() and not self.pool_full()
def are_dependents_done(self, main_session=None):
"""
Checks whether the dependents of this task instance have all succeeded.
@ -626,7 +635,8 @@ class TaskInstance(Base):
return self.state == State.UP_FOR_RETRY and \
self.end_date + self.task.retry_delay < datetime.now()
def pool_full(self, session=None):
@provide_session
def pool_full(self, session):
"""
Returns a boolean as to whether the slot pool has room for this
task to run
@ -634,18 +644,6 @@ class TaskInstance(Base):
if not self.task.pool:
return False
close_session = False
if not session:
session = settings.Session()
close_session = True
running = (
session
.query(TaskInstance)
.filter(TaskInstance.pool == self.task.pool)
.filter(TaskInstance.state == State.RUNNING)
.count()
)
pool = (
session
.query(Pool)
@ -654,12 +652,9 @@ class TaskInstance(Base):
)
if not pool:
return False
open_slots = pool.open_slots(session=session)
if close_session:
session.expunge_all()
session.commit()
session.close()
return running >= pool.slots
return open_slots <= 0
def run(
@ -707,6 +702,7 @@ class TaskInstance(Base):
"Next run after {0}".format(next_run)
)
elif force or self.state in State.runnable():
self.start_date = datetime.now()
if not force and task.pool and self.pool_full(session):
self.state = State.QUEUED
session.commit()
@ -719,7 +715,6 @@ class TaskInstance(Base):
if not test_mode:
session.add(Log(State.RUNNING, self))
self.state = State.RUNNING
self.start_date = datetime.now()
self.end_date = None
if not test_mode:
session.merge(self)
@ -1804,4 +1799,26 @@ class Pool(Base):
description = Column(Text)
def __repr__(self):
return self.id
return self.pool
@provide_session
def used_slots(self, session):
"""
Returns the number of slots used at the moment
"""
running = (
session
.query(TaskInstance)
.filter(TaskInstance.pool == self.pool)
.filter(TaskInstance.state == State.RUNNING)
.count()
)
return running
@provide_session
def open_slots(self, session):
"""
Returns the number of slots open at the moment
"""
used_slots = self.used_slots(session=session)
return self.slots - used_slots

Просмотреть файл

@ -35,7 +35,7 @@ class State(object):
UP_FOR_RETRY = "up_for_retry"
state_color = {
QUEUED: 'grey',
QUEUED: 'gray',
RUNNING: 'lime',
SUCCESS: 'green',
SHUTDOWN: 'orange',
@ -120,6 +120,14 @@ def initdb():
port=10001))
session.commit()
conn = session.query(C).filter(C.conn_id == 'mysql_default').first()
if not conn:
session.add(
models.Connection(
conn_id='mysql_default', conn_type='mysql',
host='localhost'))
session.commit()
conn = session.query(C).filter(C.conn_id == 'sqlite_default').first()
if not conn:
home = conf.get('core', 'AIRFLOW_HOME')
@ -215,8 +223,31 @@ def readfile(filepath):
return content
def provide_session(func):
"""
Function decorator that provides a session if it isn't provided.
If you want to reuse a session or run the function as part of a
database transaction, you pass it to the function, if not this wrapper
will create one and close it for you.
"""
@wraps(func)
def wrapper(*args, **kwargs):
needs_session = False
if 'session' not in kwargs:
needs_session = True
session = settings.Session()
kwargs['session'] = session
result = func(*args, **kwargs)
if needs_session:
session.expunge_all()
session.commit()
session.close()
return result
return wrapper
def apply_defaults(func):
'''
"""
Function decorator that Looks for an argument named "default_args", and
fills the unspecified arguments from it.
@ -224,7 +255,7 @@ def apply_defaults(func):
calling a function, and that this can be quite confusing with multi-level
inheritance and argument defaults, this decorator also alerts with
specific information about the missing arguments.
'''
"""
@wraps(func)
def wrapper(*args, **kwargs):
if len(args) > 1:

Просмотреть файл

@ -1509,6 +1509,7 @@ class ConnectionModelView(SuperUserMixin, ModelView):
('presto', 'Presto',),
('s3', 'S3',),
('samba', 'Samba',),
('sqlite', 'Sqlite',),
]
}
mv = ConnectionModelView(
@ -1719,7 +1720,13 @@ mv = VariableView(
admin.add_view(mv)
def pool_link(v, c, m, p):
url = '/admin/taskinstance/?flt1_pool_equals=' + m.pool
return Markup("<a href='{url}'>{m.pool}</a>".format(**locals()))
class PoolModelView(SuperUserMixin, ModelView):
pass
column_formatters = dict(pool=pool_link)
named_filter_urls = True
mv = PoolModelView(models.Pool, Session, name="Pools", category="Admin")
admin.add_view(mv)

Просмотреть файл

@ -44,6 +44,9 @@ span.started{
span.error{
background-color:red;
}
span.queued{
background-color:gray;
}
.tooltip-inner {
text-align:left !important;
font-size: 12px;