[AIRFLOW-6939] Executor configuration via import path (#7563)

This commit is contained in:
Kamil Breguła 2020-02-27 16:04:06 +01:00 коммит произвёл GitHub
Родитель 2cc8d20fcf
Коммит 37a8f6a910
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 137 добавлений и 38 удалений

Просмотреть файл

@ -61,6 +61,29 @@ https://developers.google.com/style/inclusive-documentation
-->
### Custom executors is loaded using full import path
In previous versions of Airflow it was possible to use plugins to load custom executors. It is still
possible, but the configuration has changed. Now you don't have to create a plugin to configure a
custom executor, but you need to provide the full path to the module in the `executor` option
in the `core` section. The purpose of this change is to simplify the plugin mechanism and make
it easier to configure executor.
If your module was in the path `my_acme_company.executors.MyCustomExecutor` and the plugin was
called `my_plugin` then your configuration looks like this
```ini
[core]
executor = my_plguin.MyCustomExecutor
```
And now it should look like this:
```ini
[core]
executor = my_acme_company.executors.MyCustomExecutor
```
The old configuration is still works but can be abandoned at any time.
### Removed sub-package imports from `airflow/__init__.py`
The imports `LoggingMixin`, `conf`, and `AirflowException` have been removed from `airflow/__init__.py`.

Просмотреть файл

@ -15,11 +15,12 @@
# specific language governing permissions and limitations
# under the License.
"""All executors."""
import importlib
import logging
from contextlib import suppress
from typing import Optional
from airflow.executors.base_executor import BaseExecutor
from airflow.utils.module_loading import import_string
log = logging.getLogger(__name__)
@ -38,12 +39,12 @@ class ExecutorLoader:
_default_executor: Optional[BaseExecutor] = None
executors = {
LOCAL_EXECUTOR: 'airflow.executors.local_executor',
SEQUENTIAL_EXECUTOR: 'airflow.executors.sequential_executor',
CELERY_EXECUTOR: 'airflow.executors.celery_executor',
DASK_EXECUTOR: 'airflow.executors.dask_executor',
KUBERNETES_EXECUTOR: 'airflow.executors.kubernetes_executor',
DEBUG_EXECUTOR: 'airflow.executors.debug_executor'
LOCAL_EXECUTOR: 'airflow.executors.local_executor.LocalExecutor',
SEQUENTIAL_EXECUTOR: 'airflow.executors.sequential_executor.SequentialExecutor',
CELERY_EXECUTOR: 'airflow.executors.celery_executor.CeleryExecutor',
DASK_EXECUTOR: 'airflow.executors.dask_executor.DaskExecutor',
KUBERNETES_EXECUTOR: 'airflow.executors.kubernetes_executor.KubernetesExecutor',
DEBUG_EXECUTOR: 'airflow.executors.debug_executor.DebugExecutor'
}
@classmethod
@ -55,31 +56,39 @@ class ExecutorLoader:
from airflow.configuration import conf
executor_name = conf.get('core', 'EXECUTOR')
cls._default_executor = ExecutorLoader._get_executor(executor_name)
log.info("Using executor %s", executor_name)
cls._default_executor = ExecutorLoader._load_executor(executor_name)
return cls._default_executor
@classmethod
def _get_executor(cls, executor_name: str) -> BaseExecutor:
def _load_executor(cls, executor_name: str) -> BaseExecutor:
"""
Creates a new instance of the named executor.
In case the executor name is unknown in airflow,
look for it in the plugins
Loads the executor.
This supports the following following formats:
* by executor name for core executor
* by ``{plugin_name}.{class_name}`` for executor from plugins
* by import path.
"""
if executor_name in cls.executors:
executor_module = importlib.import_module(cls.executors[executor_name])
executor = getattr(executor_module, executor_name)
return executor()
else:
# Load plugins here for executors as at that time the plugins might not have been initialized yet
from airflow import plugins_manager
plugins_manager.integrate_executor_plugins()
executor_path = executor_name.split('.')
if len(executor_path) != 2:
raise ValueError(f"Executor {executor_name} not supported: "
f"please specify in format plugin_module.executor")
if executor_path[0] not in globals():
raise ValueError(f"Executor {executor_name} not supported")
return globals()[executor_path[0]].__dict__[executor_path[1]]()
log.debug("Loading core executor: %s", executor_name)
return import_string(cls.executors[executor_name])()
# If the executor name looks like "plugin executor path" then try to load plugins.
if executor_name.count(".") == 1:
log.debug(
"The executor name looks like the plugin path (executor_name=%s). Trying to load a "
"executor from a plugin", executor_name
)
with suppress(ImportError), suppress(AttributeError):
# Load plugins here for executors as at that time the plugins might not have been
# initialized yet
from airflow import plugins_manager
plugins_manager.integrate_executor_plugins()
return import_string(f"airflow.executors.{executor_name}")()
log.debug("Loading executor from custom path: %s", executor_name)
executor = import_string(executor_name)()
log.info("Loaded executor: %s", executor_name)
return executor

Просмотреть файл

@ -21,7 +21,9 @@ Executor
Executors are the mechanism by which task instances get run.
Airflow has support for various executors. Current used is determined by the ``executor`` option in the ``core``
section of the configuration file.
section of the configuration file. This option should contain the name executor e.g. ``KubernetesExecutor``
if it is a core executor. If it is to load your own executor, then you should specify the
full path to the module e.g. ``my_acme_company.executors.MyCustomExecutor``.
.. note::
For more information on setting the configuration, see :doc:`../howto/set-config`.

Просмотреть файл

@ -25,7 +25,7 @@ features to its core by simply dropping files in your
``$AIRFLOW_HOME/plugins`` folder.
The python modules in the ``plugins`` folder get imported,
and **hooks**, **operators**, **sensors**, **macros**, **executors** and web **views**
and **hooks**, **operators**, **sensors**, **macros** and web **views**
get integrated to Airflow's main collections and become available for use.
What for?
@ -84,8 +84,6 @@ looks like:
sensors = []
# A list of class(es) derived from BaseHook
hooks = []
# A list of class(es) derived from BaseExecutor
executors = []
# A list of references to inject into the macros namespace
macros = []
# A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI
@ -161,7 +159,6 @@ definitions in Airflow.
from airflow.models.baseoperator import BaseOperatorLink
from airflow.providers.amazon.aws.operators.gcs_to_s3 import GCSToS3Operator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.executors.base_executor import BaseExecutor
# Will show up under airflow.hooks.test_plugin.PluginHook
class PluginHook(BaseHook):
@ -175,10 +172,6 @@ definitions in Airflow.
class PluginSensorOperator(BaseSensorOperator):
pass
# Will show up under airflow.executors.test_plugin.PluginExecutor
class PluginExecutor(BaseExecutor):
pass
# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
@ -240,7 +233,6 @@ definitions in Airflow.
operators = [PluginOperator]
sensors = [PluginSensorOperator]
hooks = [PluginHook]
executors = [PluginExecutor]
macros = [plugin_macro]
flask_blueprints = [bp]
appbuilder_views = [v_appbuilder_package]

Просмотреть файл

@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import unittest
from parameterized import parameterized
from airflow.executors.executor_loader import ExecutorLoader
from airflow.plugins_manager import executors_modules, make_module
from tests.test_utils.config import conf_vars
# Plugin Manager creates new modules, which is difficult to mock, so we use test isolation by a unique name.
TEST_PLUGIN_NAME = "unique_plugin_name_to_avoid_collision_i_love_kitties"
class FakeExecutor:
pass
class TestExecutorLoader(unittest.TestCase):
def setUp(self) -> None:
ExecutorLoader._default_executor = None
def tearDown(self) -> None:
ExecutorLoader._default_executor = None
@parameterized.expand([
("LocalExecutor", ),
("DebugExecutor", ),
])
def test_should_support_executor_from_core(self, executor_name):
with conf_vars({
("core", "executor"): executor_name
}):
executor = ExecutorLoader.get_default_executor()
self.assertIsNotNone(executor)
self.assertIn(executor_name, executor.__class__.__name__)
def test_should_support_plugin(self):
executors_modules.append(make_module('airflow.executors.' + TEST_PLUGIN_NAME, [FakeExecutor]))
self.addCleanup(self.remove_executor_module)
with conf_vars({
("core", "executor"): f"{TEST_PLUGIN_NAME}.FakeExecutor"
}):
executor = ExecutorLoader.get_default_executor()
self.assertIsNotNone(executor)
self.assertIn("FakeExecutor", executor.__class__.__name__)
def remove_executor_module(self):
executors_modules.pop()
def test_should_support_custom_path(self):
with conf_vars({
("core", "executor"): f"tests.executors.test_executor_loader.FakeExecutor"
}):
executor = ExecutorLoader.get_default_executor()
self.assertIsNotNone(executor)
self.assertIn("FakeExecutor", executor.__class__.__name__)