Revert "AIRFLOW-5701: Don't clear xcom explicitly before execution (#6370)"

This reverts commit 74d2a0d9e7.
This commit is contained in:
Fokko Driesprong 2019-10-27 09:26:50 +01:00
Родитель 4440d5e56d
Коммит 4132f3bc70
4 изменённых файлов: 32 добавлений и 7 удалений

Просмотреть файл

@ -224,6 +224,7 @@ def upgrade():
if 'xcom' not in tables:
op.create_table(
'xcom',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('key', sa.String(length=512), nullable=True),
sa.Column('value', sa.PickleType(), nullable=True),
sa.Column(
@ -234,7 +235,7 @@ def upgrade():
sa.Column('execution_date', sa.DateTime(), nullable=False),
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.PrimaryKeyConstraint('dag_id', 'task_id', 'execution_date', 'key')
sa.PrimaryKeyConstraint('id')
)

Просмотреть файл

@ -473,6 +473,18 @@ class TaskInstance(Base, LoggingMixin):
else:
self.state = None
@provide_session
def clear_xcom_data(self, session=None):
"""
Clears all XCom data from the database for the task instance
"""
session.query(XCom).filter(
XCom.dag_id == self.dag_id,
XCom.task_id == self.task_id,
XCom.execution_date == self.execution_date
).delete()
session.commit()
@property
def key(self):
"""
@ -897,6 +909,9 @@ class TaskInstance(Base, LoggingMixin):
raise AirflowException("Task received SIGTERM signal")
signal.signal(signal.SIGTERM, signal_handler)
# Don't clear Xcom until the task is certain to execute
self.clear_xcom_data()
start_time = time.time()
self.render_templates(context=context)

Просмотреть файл

@ -20,7 +20,7 @@
import json
import pickle
from sqlalchemy import Column, Index, LargeBinary, String, and_
from sqlalchemy import Column, Index, Integer, LargeBinary, String, and_
from sqlalchemy.orm import reconstructor
from airflow.configuration import conf
@ -43,14 +43,16 @@ class XCom(Base, LoggingMixin):
"""
__tablename__ = "xcom"
key = Column(String(512), primary_key=True, nullable=False)
id = Column(Integer, primary_key=True)
key = Column(String(512))
value = Column(LargeBinary)
timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
execution_date = Column(UtcDateTime, primary_key=True, nullable=False)
timestamp = Column(
UtcDateTime, default=timezone.utcnow, nullable=False)
execution_date = Column(UtcDateTime, nullable=False)
# source information
task_id = Column(String(ID_LEN), primary_key=True, nullable=False)
dag_id = Column(String(ID_LEN), primary_key=True, nullable=False)
task_id = Column(String(ID_LEN), nullable=False)
dag_id = Column(String(ID_LEN), nullable=False)
__table_args__ = (
Index('idx_xcom_dag_task_date', dag_id, task_id, execution_date, unique=False),
@ -97,6 +99,8 @@ class XCom(Base, LoggingMixin):
:return: None
"""
session.expunge_all()
value = XCom.serialize_value(value)
# remove any duplicate XComs
@ -106,6 +110,8 @@ class XCom(Base, LoggingMixin):
cls.task_id == task_id,
cls.dag_id == dag_id).delete()
session.commit()
# insert new XCom
session.add(XCom(
key=key,

Просмотреть файл

@ -900,6 +900,9 @@ class TestTaskInstance(unittest.TestCase):
# execute, even if dependencies are ignored
ti.run(ignore_all_deps=True, mark_success=True)
self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
# Xcom IS finally cleared once task has executed
ti.run(ignore_all_deps=True)
self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), None)
def test_xcom_pull_different_execution_date(self):
"""