From 37a8f6a91075207f56bb856661fb75b1c814f2de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= Date: Thu, 27 Feb 2020 16:04:06 +0100 Subject: [PATCH] [AIRFLOW-6939] Executor configuration via import path (#7563) --- UPDATING.md | 23 ++++++++ airflow/executors/executor_loader.py | 65 ++++++++++++---------- docs/executor/index.rst | 4 +- docs/plugins.rst | 10 +--- tests/executors/test_executor_loader.py | 73 +++++++++++++++++++++++++ 5 files changed, 137 insertions(+), 38 deletions(-) create mode 100644 tests/executors/test_executor_loader.py diff --git a/UPDATING.md b/UPDATING.md index b72617e84a..9cfe415347 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -61,6 +61,29 @@ https://developers.google.com/style/inclusive-documentation --> +### Custom executors is loaded using full import path + +In previous versions of Airflow it was possible to use plugins to load custom executors. It is still +possible, but the configuration has changed. Now you don't have to create a plugin to configure a +custom executor, but you need to provide the full path to the module in the `executor` option +in the `core` section. The purpose of this change is to simplify the plugin mechanism and make +it easier to configure executor. + +If your module was in the path `my_acme_company.executors.MyCustomExecutor` and the plugin was +called `my_plugin` then your configuration looks like this + +```ini +[core] +executor = my_plguin.MyCustomExecutor +``` +And now it should look like this: +```ini +[core] +executor = my_acme_company.executors.MyCustomExecutor +``` + +The old configuration is still works but can be abandoned at any time. + ### Removed sub-package imports from `airflow/__init__.py` The imports `LoggingMixin`, `conf`, and `AirflowException` have been removed from `airflow/__init__.py`. diff --git a/airflow/executors/executor_loader.py b/airflow/executors/executor_loader.py index c40b41d1f1..5945960fdd 100644 --- a/airflow/executors/executor_loader.py +++ b/airflow/executors/executor_loader.py @@ -15,11 +15,12 @@ # specific language governing permissions and limitations # under the License. """All executors.""" -import importlib import logging +from contextlib import suppress from typing import Optional from airflow.executors.base_executor import BaseExecutor +from airflow.utils.module_loading import import_string log = logging.getLogger(__name__) @@ -38,12 +39,12 @@ class ExecutorLoader: _default_executor: Optional[BaseExecutor] = None executors = { - LOCAL_EXECUTOR: 'airflow.executors.local_executor', - SEQUENTIAL_EXECUTOR: 'airflow.executors.sequential_executor', - CELERY_EXECUTOR: 'airflow.executors.celery_executor', - DASK_EXECUTOR: 'airflow.executors.dask_executor', - KUBERNETES_EXECUTOR: 'airflow.executors.kubernetes_executor', - DEBUG_EXECUTOR: 'airflow.executors.debug_executor' + LOCAL_EXECUTOR: 'airflow.executors.local_executor.LocalExecutor', + SEQUENTIAL_EXECUTOR: 'airflow.executors.sequential_executor.SequentialExecutor', + CELERY_EXECUTOR: 'airflow.executors.celery_executor.CeleryExecutor', + DASK_EXECUTOR: 'airflow.executors.dask_executor.DaskExecutor', + KUBERNETES_EXECUTOR: 'airflow.executors.kubernetes_executor.KubernetesExecutor', + DEBUG_EXECUTOR: 'airflow.executors.debug_executor.DebugExecutor' } @classmethod @@ -55,31 +56,39 @@ class ExecutorLoader: from airflow.configuration import conf executor_name = conf.get('core', 'EXECUTOR') - cls._default_executor = ExecutorLoader._get_executor(executor_name) - - log.info("Using executor %s", executor_name) + cls._default_executor = ExecutorLoader._load_executor(executor_name) return cls._default_executor @classmethod - def _get_executor(cls, executor_name: str) -> BaseExecutor: + def _load_executor(cls, executor_name: str) -> BaseExecutor: """ - Creates a new instance of the named executor. - In case the executor name is unknown in airflow, - look for it in the plugins + Loads the executor. + + This supports the following following formats: + * by executor name for core executor + * by ``{plugin_name}.{class_name}`` for executor from plugins + * by import path. """ if executor_name in cls.executors: - executor_module = importlib.import_module(cls.executors[executor_name]) - executor = getattr(executor_module, executor_name) - return executor() - else: - # Load plugins here for executors as at that time the plugins might not have been initialized yet - from airflow import plugins_manager - plugins_manager.integrate_executor_plugins() - executor_path = executor_name.split('.') - if len(executor_path) != 2: - raise ValueError(f"Executor {executor_name} not supported: " - f"please specify in format plugin_module.executor") - if executor_path[0] not in globals(): - raise ValueError(f"Executor {executor_name} not supported") - return globals()[executor_path[0]].__dict__[executor_path[1]]() + log.debug("Loading core executor: %s", executor_name) + return import_string(cls.executors[executor_name])() + # If the executor name looks like "plugin executor path" then try to load plugins. + if executor_name.count(".") == 1: + log.debug( + "The executor name looks like the plugin path (executor_name=%s). Trying to load a " + "executor from a plugin", executor_name + ) + with suppress(ImportError), suppress(AttributeError): + # Load plugins here for executors as at that time the plugins might not have been + # initialized yet + from airflow import plugins_manager + plugins_manager.integrate_executor_plugins() + return import_string(f"airflow.executors.{executor_name}")() + + log.debug("Loading executor from custom path: %s", executor_name) + executor = import_string(executor_name)() + + log.info("Loaded executor: %s", executor_name) + + return executor diff --git a/docs/executor/index.rst b/docs/executor/index.rst index f38dee9672..d0163de790 100644 --- a/docs/executor/index.rst +++ b/docs/executor/index.rst @@ -21,7 +21,9 @@ Executor Executors are the mechanism by which task instances get run. Airflow has support for various executors. Current used is determined by the ``executor`` option in the ``core`` -section of the configuration file. +section of the configuration file. This option should contain the name executor e.g. ``KubernetesExecutor`` +if it is a core executor. If it is to load your own executor, then you should specify the +full path to the module e.g. ``my_acme_company.executors.MyCustomExecutor``. .. note:: For more information on setting the configuration, see :doc:`../howto/set-config`. diff --git a/docs/plugins.rst b/docs/plugins.rst index fd7867691e..0c949b0cf2 100644 --- a/docs/plugins.rst +++ b/docs/plugins.rst @@ -25,7 +25,7 @@ features to its core by simply dropping files in your ``$AIRFLOW_HOME/plugins`` folder. The python modules in the ``plugins`` folder get imported, -and **hooks**, **operators**, **sensors**, **macros**, **executors** and web **views** +and **hooks**, **operators**, **sensors**, **macros** and web **views** get integrated to Airflow's main collections and become available for use. What for? @@ -84,8 +84,6 @@ looks like: sensors = [] # A list of class(es) derived from BaseHook hooks = [] - # A list of class(es) derived from BaseExecutor - executors = [] # A list of references to inject into the macros namespace macros = [] # A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI @@ -161,7 +159,6 @@ definitions in Airflow. from airflow.models.baseoperator import BaseOperatorLink from airflow.providers.amazon.aws.operators.gcs_to_s3 import GCSToS3Operator from airflow.sensors.base_sensor_operator import BaseSensorOperator - from airflow.executors.base_executor import BaseExecutor # Will show up under airflow.hooks.test_plugin.PluginHook class PluginHook(BaseHook): @@ -175,10 +172,6 @@ definitions in Airflow. class PluginSensorOperator(BaseSensorOperator): pass - # Will show up under airflow.executors.test_plugin.PluginExecutor - class PluginExecutor(BaseExecutor): - pass - # Will show up under airflow.macros.test_plugin.plugin_macro # and in templates through {{ macros.test_plugin.plugin_macro }} def plugin_macro(): @@ -240,7 +233,6 @@ definitions in Airflow. operators = [PluginOperator] sensors = [PluginSensorOperator] hooks = [PluginHook] - executors = [PluginExecutor] macros = [plugin_macro] flask_blueprints = [bp] appbuilder_views = [v_appbuilder_package] diff --git a/tests/executors/test_executor_loader.py b/tests/executors/test_executor_loader.py new file mode 100644 index 0000000000..bd530d182e --- /dev/null +++ b/tests/executors/test_executor_loader.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest + +from parameterized import parameterized + +from airflow.executors.executor_loader import ExecutorLoader +from airflow.plugins_manager import executors_modules, make_module +from tests.test_utils.config import conf_vars + +# Plugin Manager creates new modules, which is difficult to mock, so we use test isolation by a unique name. +TEST_PLUGIN_NAME = "unique_plugin_name_to_avoid_collision_i_love_kitties" + + +class FakeExecutor: + pass + + +class TestExecutorLoader(unittest.TestCase): + + def setUp(self) -> None: + ExecutorLoader._default_executor = None + + def tearDown(self) -> None: + ExecutorLoader._default_executor = None + + @parameterized.expand([ + ("LocalExecutor", ), + ("DebugExecutor", ), + ]) + def test_should_support_executor_from_core(self, executor_name): + with conf_vars({ + ("core", "executor"): executor_name + }): + executor = ExecutorLoader.get_default_executor() + self.assertIsNotNone(executor) + self.assertIn(executor_name, executor.__class__.__name__) + + def test_should_support_plugin(self): + executors_modules.append(make_module('airflow.executors.' + TEST_PLUGIN_NAME, [FakeExecutor])) + self.addCleanup(self.remove_executor_module) + with conf_vars({ + ("core", "executor"): f"{TEST_PLUGIN_NAME}.FakeExecutor" + }): + executor = ExecutorLoader.get_default_executor() + self.assertIsNotNone(executor) + self.assertIn("FakeExecutor", executor.__class__.__name__) + + def remove_executor_module(self): + executors_modules.pop() + + def test_should_support_custom_path(self): + with conf_vars({ + ("core", "executor"): f"tests.executors.test_executor_loader.FakeExecutor" + }): + executor = ExecutorLoader.get_default_executor() + self.assertIsNotNone(executor) + self.assertIn("FakeExecutor", executor.__class__.__name__)