diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 11d47b81cb..fc3e9fd06a 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -1099,18 +1099,25 @@ class DataprocWorkflowTemplateInstantiateOperator(DataprocOperationBaseOperator) For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: str + :param parameters: a map of parameters for Dataproc Template in key-value format: + map (key: string, value: string) + Example: { "date_from": "2019-08-01", "date_to": "2019-08-02"}. + Values may not exceed 100 characters. Please refer to: + https://cloud.google.com/dataproc/docs/concepts/workflows/workflow-parameters + :type parameters: Dict[str, str] """ template_fields = ['template_id'] @apply_defaults - def __init__(self, template_id, *args, **kwargs): + def __init__(self, template_id, parameters, *args, **kwargs): super().__init__(*args, **kwargs) self.template_id = template_id + self.parameters = parameters def start(self): """ - Instantiate a WorkflowTemplate on Google Cloud Dataproc. + Instantiate a WorkflowTemplate on Google Cloud Dataproc with given parameters. """ self.log.info('Instantiating Template: %s', self.template_id) return ( @@ -1118,7 +1125,7 @@ class DataprocWorkflowTemplateInstantiateOperator(DataprocOperationBaseOperator) .instantiate( name=('projects/%s/regions/%s/workflowTemplates/%s' % (self.project_id, self.region, self.template_id)), - body={'requestId': str(uuid.uuid4())}) + body={'requestId': str(uuid.uuid4()), 'parameters': self.parameters}) .execute()) diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index 41f5cd4dc3..efa025beac 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -87,6 +87,7 @@ GCP_REGION = 'us-central1' GCP_REGION_TEMPLATED = "{{ 'US-CENTRAL1' | lower }}" MAIN_URI = 'test-uri' TEMPLATE_ID = 'template-id' +WORKFLOW_PARAMETERS = '{"parameter": "value"}' LABELS = { 'label_a': 'value_a', @@ -1235,6 +1236,7 @@ class TestDataprocWorkflowTemplateInstantiateOperator(unittest.TestCase): project_id=GCP_PROJECT_ID, region=GCP_REGION, template_id=TEMPLATE_ID, + parameters=WORKFLOW_PARAMETERS, dag=self.dag )