diff --git a/airflow/models.py b/airflow/models.py index b1c589582e..36342f13ad 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -927,10 +927,11 @@ class DAG(Base): self, dag_id, schedule_interval=timedelta(days=1), start_date=None, end_date=None, parallelism=0, - full_filepath=None): + full_filepath=None, executor=DEFAULT_EXECUTOR): utils.validate_key(dag_id) self.dag_id = dag_id + self.executor = executor self.end_date = end_date or datetime.now() self.parallelism = parallelism self.schedule_interval = schedule_interval @@ -1088,11 +1089,9 @@ class DAG(Base): session.merge(self) session.commit() - def run( - self, start_date=None, end_date=None, mark_success=False, - executor=DEFAULT_EXECUTOR): + def run(self, start_date=None, end_date=None, mark_success=False): session = settings.Session() - job = BackfillJob(executor=executor) + job = BackfillJob(executor=self.executor) session.add(job) session.commit() job.run(self, start_date, end_date, mark_success) diff --git a/dags/examples/example1.py b/dags/examples/example1.py index 6a23778e8d..d7438ae9b9 100644 --- a/dags/examples/example1.py +++ b/dags/examples/example1.py @@ -1,5 +1,7 @@ from airflow.operators import BashOperator, MySqlOperator, DummyOperator from airflow.models import DAG +from airflow.executors import SequentialExecutor +from airflow.executors import LocalExecutor from datetime import datetime default_args = { @@ -8,7 +10,8 @@ default_args = { 'mysql_dbid': 'local_mysql', } -dag = DAG(dag_id='example_1') +dag = DAG(dag_id='example_1', executor=LocalExecutor()) +# dag = DAG(dag_id='example_1', executor=SequentialExecutor()) cmd = 'ls -l' run_this_last = DummyOperator( @@ -45,3 +48,4 @@ task = MySqlOperator(task_id='also_run_mysql', sql=sql, **default_args) dag.add_task(task) task.set_downstream(run_this_last) task.set_upstream(create_table) + diff --git a/dags/examples/simple.py b/dags/examples/simple.py index 24c419edd6..3ffa6e39e0 100644 --- a/dags/examples/simple.py +++ b/dags/examples/simple.py @@ -1,16 +1,17 @@ from airflow.operators import MySqlOperator from airflow.executors import SequentialExecutor +from airflow.executors import LocalExecutor from airflow import DAG from datetime import datetime # Setting some default operator parameters default_args = { - 'owner': 'max', + 'owner': 'max', 'mysql_dbid': 'local_mysql', } # Initializing a directed acyclic graph -dag = DAG(dag_id='simple', ) +dag = DAG(dag_id='simple', executor=LocalExecutor()) # MySQL Operator sql = "TRUNCATE TABLE tmp;"