This commit is contained in:
Maxime Beauchemin 2014-11-21 23:54:05 +00:00
Родитель 477e2460da
Коммит b109ba4532
5 изменённых файлов: 10 добавлений и 5 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -5,6 +5,7 @@ build
dist dist
env env
initdb.py initdb.py
dbinit.py
logs logs
*.cfg *.cfg
MANIFEST MANIFEST

Просмотреть файл

@ -228,6 +228,7 @@ def initdb(args):
if __name__ == '__main__': if __name__ == '__main__':
reload(logging)
logging.root.handlers = [] logging.root.handlers = []
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()

Просмотреть файл

@ -45,7 +45,7 @@ class LocalWorker(multiprocessing.Process):
class LocalExecutor(BaseExecutor): class LocalExecutor(BaseExecutor):
def __init__(self, parallelism=4): def __init__(self, parallelism=16):
super(LocalExecutor, self).__init__() super(LocalExecutor, self).__init__()
self.parallelism = parallelism self.parallelism = parallelism

Просмотреть файл

@ -377,8 +377,7 @@ class TaskInstance(Base):
self, verbose=True, self, verbose=True,
ignore_dependencies=False, ignore_dependencies=False,
force=False, force=False,
mark_success=False, mark_success=False,):
):
""" """
Runs the task instnace. Runs the task instnace.
""" """
@ -601,7 +600,6 @@ class BackfillJob(BaseJob):
if ti.state == State.SUCCESS and key in task_instances: if ti.state == State.SUCCESS and key in task_instances:
del task_instances[key] del task_instances[key]
elif ti.is_runnable(): elif ti.is_runnable():
print "Runnable: " + ti.task_id
executor.queue_command( executor.queue_command(
key=ti.key, command=ti.command( key=ti.key, command=ti.command(
mark_success=mark_success, mark_success=mark_success,
@ -1101,7 +1099,12 @@ class DAG(Base):
self.add_task(task) self.add_task(task)
def db_merge(self): def db_merge(self):
BO = BaseOperator
session = settings.Session() session = settings.Session()
tasks = session.query(BO).filter(BO.dag_id==self.dag_id).all()
for t in tasks:
session.delete(t)
session.commit()
session.merge(self) session.merge(self)
session.commit() session.commit()

Просмотреть файл

@ -22,7 +22,7 @@ if BASE_FOLDER not in sys.path:
Session = sessionmaker() Session = sessionmaker()
#engine = create_engine('mysql://airflow:airflow@localhost/airflow') #engine = create_engine('mysql://airflow:airflow@localhost/airflow')
engine = create_engine( engine = create_engine(
SQL_ALCHEMY_CONN, pool_size=25) SQL_ALCHEMY_CONN, pool_size=50)
Session.configure(bind=engine) Session.configure(bind=engine)
# can't move this to configuration due to ConfigParser interpolation # can't move this to configuration due to ConfigParser interpolation