Allow to define custom XCom class (#8560)

* Allow to define custom XCom class

closes: #8059
This commit is contained in:
Tomek Urbaszek 2020-04-28 16:55:05 +02:00 коммит произвёл GitHub
Родитель 992a24ce41
Коммит 6c6d6611d2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 86 добавлений и 1 удалений

Просмотреть файл

@ -359,6 +359,13 @@
type: string
example: ~
default: "True"
- name: xcom_backend
description: |
Path to custom XCom class that will be used to store and resolve operators results
version_added: 2.0.0
type: string
example: "path.to.CustomXCom"
default: "airflow.models.xcom.BaseXCom"
- name: logging
description: ~

Просмотреть файл

@ -208,6 +208,10 @@ max_num_rendered_ti_fields_per_task = 30
# On each dagrun check against defined SLAs
check_slas = True
# Path to custom XCom class that will be used to store and resolve operators results
# Example: xcom_backend = path.to.CustomXCom
xcom_backend = airflow.models.xcom.BaseXCom
[logging]
# The folder where airflow should store its log files
# This path must be absolute

Просмотреть файл

@ -41,7 +41,7 @@ MAX_XCOM_SIZE = 49344
XCOM_RETURN_KEY = 'return_value'
class XCom(Base, LoggingMixin):
class BaseXCom(Base, LoggingMixin):
"""
Base class for XCom objects.
"""
@ -229,3 +229,18 @@ class XCom(Base, LoggingMixin):
"for XCOM, then you need to enable pickle "
"support for XCOM in your airflow config.")
raise
def resolve_xcom_backend():
"""Resolves custom XCom class"""
clazz = conf.getimport("core", "xcom_backend", fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
if clazz:
if not issubclass(clazz, BaseXCom):
raise TypeError(
f"Your custom XCom class `{clazz.__name__}` is not a subclass of `{BaseXCom.__name__}`."
)
return clazz
return BaseXCom
XCom = resolve_xcom_backend()

Просмотреть файл

@ -649,6 +649,15 @@ of what this may look like:
Note that XComs are similar to `Variables`_, but are specifically designed
for inter-task communication rather than global settings.
Custom XCom backend
'''''''''''''''''''
It is possible to change ``XCom`` behaviour os serialization and deserialization of tasks' result.
To do this one have to change ``xcom_backend`` parameter in Airflow config. Provided value should point
to a class that is subclass of :class:`~airflow.models.xcom.BaseXCom`. To alter the serialaization /
deserialization mechanism the custom class should override ``serialize_value`` and ``deserialize_value``
methods.
.. _concepts:variables:
Variables

50
tests/models/test_xcom.py Normal file
Просмотреть файл

@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from airflow.configuration import conf
from airflow.models.xcom import BaseXCom, resolve_xcom_backend
from tests.test_utils.config import conf_vars
class CustomXCom(BaseXCom):
@staticmethod
def serialize_value(_):
return "custom_value"
class TestXCom:
@conf_vars({("core", "xcom_backend"): "tests.models.test_xcom.CustomXCom"})
def test_resolve_xcom_class(self):
cls = resolve_xcom_backend()
assert issubclass(cls, CustomXCom)
assert cls().serialize_value(None) == "custom_value"
@conf_vars(
{("core", "xcom_backend"): "", ("core", "enable_xcom_pickling"): "False"}
)
def test_resolve_xcom_class_fallback_to_basexcom(self):
cls = resolve_xcom_backend()
assert issubclass(cls, BaseXCom)
assert cls().serialize_value([1]) == b"[1]"
@conf_vars({("core", "enable_xcom_pickling"): "False"})
def test_resolve_xcom_class_fallback_to_basexcom_no_config(self):
init = conf.get("core", "xcom_backend")
conf.remove_option("core", "xcom_backend")
cls = resolve_xcom_backend()
assert issubclass(cls, BaseXCom)
assert cls().serialize_value([1]) == b"[1]"
conf.set("core", "xcom_backend", init)