Only allow passing JSON Serializable conf to TriggerDagRunOperator (#13964)
closes https://github.com/apache/airflow/issues/13414
This commit is contained in:
Родитель
862443f6d3
Коммит
b4885b2587
|
@ -17,6 +17,7 @@
|
|||
# under the License.
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import time
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
|
@ -108,6 +109,11 @@ class TriggerDagRunOperator(BaseOperator):
|
|||
|
||||
self.execution_date: Optional[datetime.datetime] = execution_date # type: ignore
|
||||
|
||||
try:
|
||||
json.dumps(self.conf)
|
||||
except TypeError:
|
||||
raise AirflowException("conf parameter should be JSON Serializable")
|
||||
|
||||
def execute(self, context: Dict):
|
||||
if isinstance(self.execution_date, datetime.datetime):
|
||||
execution_date = self.execution_date
|
||||
|
|
|
@ -164,6 +164,17 @@ class TestDagRunOperator(TestCase):
|
|||
assert len(dagruns) == 1
|
||||
assert dagruns[0].conf, {"foo": "bar"}
|
||||
|
||||
def test_trigger_dagrun_operator_templated_invalid_conf(self):
|
||||
"""Test passing a conf that is not JSON Serializable raise error."""
|
||||
|
||||
with pytest.raises(AirflowException, match="^conf parameter should be JSON Serializable$"):
|
||||
TriggerDagRunOperator(
|
||||
task_id="test_trigger_dagrun_with_invalid_conf",
|
||||
trigger_dag_id=TRIGGERED_DAG_ID,
|
||||
conf={"foo": "{{ dag.dag_id }}", "datetime": timezone.utcnow()},
|
||||
dag=self.dag,
|
||||
)
|
||||
|
||||
def test_trigger_dagrun_operator_templated_conf(self):
|
||||
"""Test passing a templated conf to the triggered DagRun."""
|
||||
task = TriggerDagRunOperator(
|
||||
|
|
Загрузка…
Ссылка в новой задаче