[AIRFLOW-5275] Add support for template parameters in DataprocWorkflowTemplateInstantiateOperator (#5877)

This commit is contained in:
Michal Brys 2019-08-28 21:06:30 +02:00 коммит произвёл Kamil Breguła
Родитель 3e0d703933
Коммит fb0910e847
2 изменённых файлов: 12 добавлений и 3 удалений

Просмотреть файл

@ -1099,18 +1099,25 @@ class DataprocWorkflowTemplateInstantiateOperator(DataprocOperationBaseOperator)
For this to work, the service account making the request must have domain-wide
delegation enabled.
:type delegate_to: str
:param parameters: a map of parameters for Dataproc Template in key-value format:
map (key: string, value: string)
Example: { "date_from": "2019-08-01", "date_to": "2019-08-02"}.
Values may not exceed 100 characters. Please refer to:
https://cloud.google.com/dataproc/docs/concepts/workflows/workflow-parameters
:type parameters: Dict[str, str]
"""
template_fields = ['template_id']
@apply_defaults
def __init__(self, template_id, *args, **kwargs):
def __init__(self, template_id, parameters, *args, **kwargs):
super().__init__(*args, **kwargs)
self.template_id = template_id
self.parameters = parameters
def start(self):
"""
Instantiate a WorkflowTemplate on Google Cloud Dataproc.
Instantiate a WorkflowTemplate on Google Cloud Dataproc with given parameters.
"""
self.log.info('Instantiating Template: %s', self.template_id)
return (
@ -1118,7 +1125,7 @@ class DataprocWorkflowTemplateInstantiateOperator(DataprocOperationBaseOperator)
.instantiate(
name=('projects/%s/regions/%s/workflowTemplates/%s' %
(self.project_id, self.region, self.template_id)),
body={'requestId': str(uuid.uuid4())})
body={'requestId': str(uuid.uuid4()), 'parameters': self.parameters})
.execute())

Просмотреть файл

@ -87,6 +87,7 @@ GCP_REGION = 'us-central1'
GCP_REGION_TEMPLATED = "{{ 'US-CENTRAL1' | lower }}"
MAIN_URI = 'test-uri'
TEMPLATE_ID = 'template-id'
WORKFLOW_PARAMETERS = '{"parameter": "value"}'
LABELS = {
'label_a': 'value_a',
@ -1235,6 +1236,7 @@ class TestDataprocWorkflowTemplateInstantiateOperator(unittest.TestCase):
project_id=GCP_PROJECT_ID,
region=GCP_REGION,
template_id=TEMPLATE_ID,
parameters=WORKFLOW_PARAMETERS,
dag=self.dag
)