[AIRFLOW-74] SubdagOperators can consume all celeryd worker processes

Closes #3251 from feng-tao/airflow-74
This commit is contained in:
Tao feng 2018-04-24 10:13:25 -07:00 коммит произвёл Arthur Wiedmer
Родитель be886b986c
Коммит 64d9501667
3 изменённых файлов: 29 добавлений и 14 удалений

Просмотреть файл

@ -5,6 +5,8 @@ assists users migrating to a new version.
## Airflow Master
### Default executor for SubDagOperator is changed to SequentialExecutor
### New Webserver UI with Role-Based Access Control
The current webserver UI uses the Flask-Admin extension. The new webserver UI uses the [Flask-AppBuilder (FAB)](https://github.com/dpgaspar/Flask-AppBuilder) extension. FAB has built-in authentication support and Role-Based Access Control (RBAC), which provides configurable roles and permissions for individual users.

Просмотреть файл

@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -18,10 +18,10 @@
# under the License.
from airflow.exceptions import AirflowException
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.models import BaseOperator, Pool
from airflow.utils.decorators import apply_defaults
from airflow.utils.db import provide_session
from airflow.executors import GetDefaultExecutor
class SubDagOperator(BaseOperator):
@ -35,16 +35,19 @@ class SubDagOperator(BaseOperator):
def __init__(
self,
subdag,
executor=GetDefaultExecutor(),
executor=SequentialExecutor(),
*args, **kwargs):
"""
Yo dawg. This runs a sub dag. By convention, a sub dag's dag_id
This runs a sub dag. By convention, a sub dag's dag_id
should be prefixed by its parent and a dot. As in `parent.child`.
:param subdag: the DAG object to run as a subdag of the current DAG.
:type subdag: airflow.DAG
:param dag: the parent DAG
:type subdag: airflow.DAG
:type subdag: airflow.DAG.
:param dag: the parent DAG for the subdag.
:type dag: airflow.DAG.
:param executor: the executor for this subdag. Default to use SequentialExecutor.
Please find AIRFLOW-74 for more details.
:type executor: airflow.executors.
"""
import airflow.models
dag = kwargs.get('dag') or airflow.models._CONTEXT_MANAGER_DAG
@ -88,6 +91,9 @@ class SubDagOperator(BaseOperator):
)
self.subdag = subdag
# Airflow pool is not honored by SubDagOperator.
# Hence resources could be consumed by SubdagOperators
# Use other executor with your own risk.
self.executor = executor
def execute(self, context):

Просмотреть файл

@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -17,18 +17,16 @@
# specific language governing permissions and limitations
# under the License.
import os
import unittest
from mock import Mock
import airflow
from airflow.exceptions import AirflowException
from airflow.executors.sequential_executor import SequentialExecutor
from airflow.models import DAG, DagBag
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.jobs import BackfillJob
from airflow.exceptions import AirflowException
from airflow.utils.timezone import datetime
DEFAULT_DATE = datetime(2016, 1, 1)
@ -143,3 +141,12 @@ class SubDagOperatorTests(unittest.TestCase):
# now make sure dag picks up the subdag error
self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_subdag_executor(self):
"""
Test default subdag executor is SequentialExecutor
"""
dag = DAG('parent', default_args=default_args)
subdag_good = DAG('parent.test', default_args=default_args)
subdag = SubDagOperator(task_id='test', dag=dag, subdag=subdag_good)
self.assertEqual(type(subdag.executor), SequentialExecutor)