[AIRFLOW-5710] Optionally raise exception on unused operator arguments. (#6332)
This commit is contained in:
Родитель
6c7c58c5d3
Коммит
e9d65e3d21
|
@ -243,6 +243,7 @@ default_cpus = 1
|
||||||
default_ram = 512
|
default_ram = 512
|
||||||
default_disk = 512
|
default_disk = 512
|
||||||
default_gpus = 0
|
default_gpus = 0
|
||||||
|
allow_illegal_arguments = True
|
||||||
|
|
||||||
[hive]
|
[hive]
|
||||||
# Default mapreduce queue for HiveOperator tasks
|
# Default mapreduce queue for HiveOperator tasks
|
||||||
|
|
|
@ -319,6 +319,12 @@ class BaseOperator(LoggingMixin):
|
||||||
|
|
||||||
if args or kwargs:
|
if args or kwargs:
|
||||||
# TODO remove *args and **kwargs in Airflow 2.0
|
# TODO remove *args and **kwargs in Airflow 2.0
|
||||||
|
if not conf.getboolean('operators', 'ALLOW_ILLEGAL_ARGUMENTS'):
|
||||||
|
raise AirflowException(
|
||||||
|
"Invalid arguments were passed to {c} (task_id: {t}). Invalid "
|
||||||
|
"arguments were:\n*args: {a}\n**kwargs: {k}".format(
|
||||||
|
c=self.__class__.__name__, a=args, k=kwargs, t=task_id),
|
||||||
|
)
|
||||||
warnings.warn(
|
warnings.warn(
|
||||||
'Invalid arguments were passed to {c} (task_id: {t}). '
|
'Invalid arguments were passed to {c} (task_id: {t}). '
|
||||||
'Support for passing such arguments will be dropped in '
|
'Support for passing such arguments will be dropped in '
|
||||||
|
|
|
@ -455,6 +455,23 @@ class TestCore(unittest.TestCase):
|
||||||
'(task_id: test_illegal_args).'),
|
'(task_id: test_illegal_args).'),
|
||||||
w[0].message.args[0])
|
w[0].message.args[0])
|
||||||
|
|
||||||
|
def test_illegal_args_forbidden(self):
|
||||||
|
"""
|
||||||
|
Tests that operators raise exceptions on illegal arguments when
|
||||||
|
illegal arguments are not allowed.
|
||||||
|
"""
|
||||||
|
with conf_vars({('operators', 'allow_illegal_arguments'): 'False'}):
|
||||||
|
with self.assertRaises(AirflowException) as ctx:
|
||||||
|
BashOperator(
|
||||||
|
task_id='test_illegal_args',
|
||||||
|
bash_command='echo success',
|
||||||
|
dag=self.dag,
|
||||||
|
illegal_argument_1234='hello?')
|
||||||
|
self.assertIn(
|
||||||
|
('Invalid arguments were passed to BashOperator '
|
||||||
|
'(task_id: test_illegal_args).'),
|
||||||
|
str(ctx.exception))
|
||||||
|
|
||||||
def test_bash_operator(self):
|
def test_bash_operator(self):
|
||||||
t = BashOperator(
|
t = BashOperator(
|
||||||
task_id='test_bash_operator',
|
task_id='test_bash_operator',
|
||||||
|
|
Загрузка…
Ссылка в новой задаче