From 199425e35a65d0e6113b79521d95fe75a8b95886 Mon Sep 17 00:00:00 2001 From: Maxime Date: Mon, 19 Jan 2015 06:39:03 +0000 Subject: [PATCH] Pickling the content of files referenced instead of file locations --- airflow/models.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/airflow/models.py b/airflow/models.py index c5a29d21eb..895208f311 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -206,6 +206,8 @@ class DagPickle(Base): def __init__(self, dag, job): self.dag_id = dag.dag_id self.job = job + for t in dag.tasks: + t.materialize_files() self.pickle = dumps(dag) def get_object(self): @@ -767,6 +769,18 @@ class BaseOperator(Base): else: return self._schedule_interval + def materialize_files(self): + # Getting the content of files for template_field / template_ext + for field in self.template_fields: + content = getattr(self, field) + for ext in self.template_ext: + if content.endswith(ext): + fp = os.path.join( + os.path.dirname(self.dag.full_filepath), content) + f = open(fp, 'r') + setattr(self, field, f.read()) + f.close() + @property def upstream_list(self): """@property: list of tasks directly upstream"""