Merge pull request #3 from mistercrunch/executor_fix

Providing a way to specify the executor to use while constructing the DAG object
This commit is contained in:
Krishna Puttaswamy 2014-11-07 13:40:38 -08:00
Родитель 88815ad3c8 25cb7264b9
Коммит cc94aa007d
3 изменённых файлов: 12 добавлений и 8 удалений

Просмотреть файл

@ -927,10 +927,11 @@ class DAG(Base):
self, dag_id,
schedule_interval=timedelta(days=1),
start_date=None, end_date=None, parallelism=0,
full_filepath=None):
full_filepath=None, executor=DEFAULT_EXECUTOR):
utils.validate_key(dag_id)
self.dag_id = dag_id
self.executor = executor
self.end_date = end_date or datetime.now()
self.parallelism = parallelism
self.schedule_interval = schedule_interval
@ -1088,11 +1089,9 @@ class DAG(Base):
session.merge(self)
session.commit()
def run(
self, start_date=None, end_date=None, mark_success=False,
executor=DEFAULT_EXECUTOR):
def run(self, start_date=None, end_date=None, mark_success=False):
session = settings.Session()
job = BackfillJob(executor=executor)
job = BackfillJob(executor=self.executor)
session.add(job)
session.commit()
job.run(self, start_date, end_date, mark_success)

Просмотреть файл

@ -1,5 +1,7 @@
from airflow.operators import BashOperator, MySqlOperator, DummyOperator
from airflow.models import DAG
from airflow.executors import SequentialExecutor
from airflow.executors import LocalExecutor
from datetime import datetime
default_args = {
@ -8,7 +10,8 @@ default_args = {
'mysql_dbid': 'local_mysql',
}
dag = DAG(dag_id='example_1')
dag = DAG(dag_id='example_1', executor=LocalExecutor())
# dag = DAG(dag_id='example_1', executor=SequentialExecutor())
cmd = 'ls -l'
run_this_last = DummyOperator(
@ -45,3 +48,4 @@ task = MySqlOperator(task_id='also_run_mysql', sql=sql, **default_args)
dag.add_task(task)
task.set_downstream(run_this_last)
task.set_upstream(create_table)

Просмотреть файл

@ -1,16 +1,17 @@
from airflow.operators import MySqlOperator
from airflow.executors import SequentialExecutor
from airflow.executors import LocalExecutor
from airflow import DAG
from datetime import datetime
# Setting some default operator parameters
default_args = {
'owner': 'max',
'owner': 'max',
'mysql_dbid': 'local_mysql',
}
# Initializing a directed acyclic graph
dag = DAG(dag_id='simple', )
dag = DAG(dag_id='simple', executor=LocalExecutor())
# MySQL Operator
sql = "TRUNCATE TABLE tmp;"