Raises a warning for provide_context instead of killing the task (#11597)

* raises a warning for provide_context instead of killing the task

* Update airflow/operators/python.py

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>

* static checks

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
This commit is contained in:
Daniel Imberman 2020-10-16 15:18:55 -07:00 коммит произвёл GitHub
Родитель 4582c808ab
Коммит 00dd7586fb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 30 добавлений и 0 удалений

Просмотреть файл

@ -22,6 +22,7 @@ import pickle
import re
import sys
import types
import warnings
from inspect import signature
from itertools import islice
from tempfile import TemporaryDirectory
@ -85,6 +86,10 @@ class PythonOperator(BaseOperator):
templates_exts: Optional[List[str]] = None,
**kwargs
) -> None:
if kwargs.get("provide_context"):
warnings.warn("provide_context is deprecated as of 2.0 and is no longer required",
DeprecationWarning, stacklevel=2)
kwargs.pop('provide_context', None)
super().__init__(**kwargs)
if not callable(python_callable):
raise AirflowException('`python_callable` param must be callable')

Просмотреть файл

@ -275,6 +275,31 @@ class TestPythonOperator(TestPythonBase):
python_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
self.assertTrue('dag' in context.exception, "'dag' not found in the exception")
def test_provide_context_does_not_fail(self):
"""
ensures that provide_context doesn't break dags in 2.0
"""
self.dag.create_dagrun(
run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
state=State.RUNNING,
external_trigger=False,
)
def func(custom, dag):
self.assertEqual(1, custom, "custom should be 1")
self.assertIsNotNone(dag, "dag should be set")
python_operator = PythonOperator(
task_id='python_operator',
op_kwargs={'custom': 1},
python_callable=func,
provide_context=True,
dag=self.dag
)
python_operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
def test_context_with_conflicting_op_args(self):
self.dag.create_dagrun(
run_type=DagRunType.MANUAL,