diff --git a/.gitignore b/.gitignore index fc1d931b80..74db03efdb 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ build dist env initdb.py +dbinit.py logs *.cfg MANIFEST diff --git a/airflow/bin/airflow b/airflow/bin/airflow index 7ff587ec58..57efa80923 100755 --- a/airflow/bin/airflow +++ b/airflow/bin/airflow @@ -228,6 +228,7 @@ def initdb(args): if __name__ == '__main__': + reload(logging) logging.root.handlers = [] parser = argparse.ArgumentParser() diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index d157fbbecc..6ffa5016d2 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -45,7 +45,7 @@ class LocalWorker(multiprocessing.Process): class LocalExecutor(BaseExecutor): - def __init__(self, parallelism=4): + def __init__(self, parallelism=16): super(LocalExecutor, self).__init__() self.parallelism = parallelism diff --git a/airflow/models.py b/airflow/models.py index 3506fd94cf..aed9a5fdb2 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -377,8 +377,7 @@ class TaskInstance(Base): self, verbose=True, ignore_dependencies=False, force=False, - mark_success=False, - ): + mark_success=False,): """ Runs the task instnace. """ @@ -601,7 +600,6 @@ class BackfillJob(BaseJob): if ti.state == State.SUCCESS and key in task_instances: del task_instances[key] elif ti.is_runnable(): - print "Runnable: " + ti.task_id executor.queue_command( key=ti.key, command=ti.command( mark_success=mark_success, @@ -1101,7 +1099,12 @@ class DAG(Base): self.add_task(task) def db_merge(self): + BO = BaseOperator session = settings.Session() + tasks = session.query(BO).filter(BO.dag_id==self.dag_id).all() + for t in tasks: + session.delete(t) + session.commit() session.merge(self) session.commit() diff --git a/airflow/settings.py b/airflow/settings.py index cb95e8c19c..415615f0e3 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -22,7 +22,7 @@ if BASE_FOLDER not in sys.path: Session = sessionmaker() #engine = create_engine('mysql://airflow:airflow@localhost/airflow') engine = create_engine( - SQL_ALCHEMY_CONN, pool_size=25) + SQL_ALCHEMY_CONN, pool_size=50) Session.configure(bind=engine) # can't move this to configuration due to ConfigParser interpolation