This commit is contained in:
Maxime 2015-01-23 21:11:02 +00:00
Родитель 450049ba43
Коммит 3d5a649bd2
3 изменённых файлов: 34 добавлений и 11 удалений

Просмотреть файл

@ -108,6 +108,7 @@ def run(args):
'as pickle_id:{pickle_id}').format(**locals())
executor = DEFAULT_EXECUTOR
executor.start()
print("Sending run command to executor.")
executor.queue_command(ti.key, ti.command(pickle_id=pickle_id))
executor.end()

Просмотреть файл

@ -66,9 +66,7 @@ class CeleryExecutor(BaseExecutor):
self.last_state[key] = async.state
def end(self):
print('entering end')
while any([
async.state not in celery_states.READY_STATES
for async in self.tasks.values()]):
print str([async.state for async in self.tasks.values()])
time.sleep(5)

Просмотреть файл

@ -468,13 +468,8 @@ class TaskInstance(Base):
self.end_date + self.task.retry_delay < datetime.now()
def get_template(self, source):
searchpath = []
if hasattr(self, 'task') and hasattr(self.task, 'dag'):
searchpath = [self.task.dag.folder]
if self.task.dag.template_searchpath:
searchpath += self.task.dag.template_searchpath
env = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath))
env = self.task.dag.get_template_env()
template = None
for ext in self.task.__class__.template_ext:
# if field has the right extension, look for the file.
@ -783,9 +778,10 @@ class BaseOperator(Base):
content = getattr(self, field)
for ext in self.template_ext:
if content.endswith(ext):
fp = os.path.join(
os.path.dirname(self.dag.full_filepath), content)
f = open(fp, 'r')
env = self.dag.get_template_env()
template = env.get_template(content)
f = open(template.filename, 'r')
setattr(self, field, f.read())
f.close()
@ -956,6 +952,25 @@ class DAG(Base):
DAGs essentially act as namespaces for tasks. A task_id can only be
added once to a DAG.
:param dag_id: The id of the DAG
:type dag_id: string
:param schedule_interval: Defines how often that DAG runs
:type schedule_interval: datetime.timedelta
:param start_date: The timestamp from which the sceduler will
attempt to backfill
:type start_date: datetime.datetime
:param end_date: A date beyond which your DAG won't run, leave to None
for open ended scheduling
:type end_date: datetime.datetime
:param executor: The executor to use, default stays in sync with how
your environment is setup
:type executor: derivative of airflow.executors.BaseExecutor
:param template_searchpath: This list of folders (non relative)
defines where jinja will look for your templates. Order matters.
Note that jinja/airflow includes the path of your DAG file by
default
:type template_searchpath: string or list of stings
"""
__tablename__ = "dag"
@ -981,6 +996,8 @@ class DAG(Base):
self.parallelism = parallelism
self.schedule_interval = schedule_interval
self.full_filepath = full_filepath if full_filepath else ''
if isinstance(template_searchpath, basestring):
template_searchpath = [template_searchpath]
self.template_searchpath = template_searchpath
def __repr__(self):
@ -1037,6 +1054,13 @@ class DAG(Base):
t.start_date = start_date
self.start_date = start_date
def get_template_env(self):
searchpath = [self.folder]
if self.template_searchpath:
searchpath += self.template_searchpath
return jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath))
def set_dependency(self, upstream_task_id, downstream_task_id):
"""
Simple utility method to set dependency between two tasks that