Add DataprocCreateWorkflowTemplateOperator (#13338)

* Add DataprocCreateWorkflowTemplateOperator

* fixup! Add DataprocCreateWorkflowTemplateOperator

* fixup! fixup! Add DataprocCreateWorkflowTemplateOperator
This commit is contained in:
Tomek Urbaszek 2020-12-28 15:49:59 +00:00 коммит произвёл GitHub
Родитель f7d354df1c
Коммит 04ec45f045
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
5 изменённых файлов: 160 добавлений и 3 удалений

Просмотреть файл

@ -25,7 +25,9 @@ import os
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocCreateWorkflowTemplateOperator,
DataprocDeleteClusterOperator,
DataprocInstantiateWorkflowTemplateOperator,
DataprocSubmitJobOperator,
DataprocUpdateClusterOperator,
)
@ -33,7 +35,7 @@ from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-project")
CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-cluster")
REGION = os.environ.get("GCP_LOCATION", "europe-west1")
ZONE = os.environ.get("GCP_REGION", "europe-west1-b")
BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests")
@ -136,6 +138,18 @@ HADOOP_JOB = {
},
}
# [END how_to_cloud_dataproc_hadoop_config]
WORKFLOW_NAME = "airflow-dataproc-test"
WORKFLOW_TEMPLATE = {
"id": WORKFLOW_NAME,
"placement": {
"managed_cluster": {
"cluster_name": CLUSTER_NAME,
"config": CLUSTER_CONFIG,
}
},
"jobs": [{"step_id": "pig_job_1", "pig_job": PIG_JOB["pig_job"]}],
}
with models.DAG("example_gcp_dataproc", start_date=days_ago(1), schedule_interval=None) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
@ -160,6 +174,21 @@ with models.DAG("example_gcp_dataproc", start_date=days_ago(1), schedule_interva
)
# [END how_to_cloud_dataproc_update_cluster_operator]
# [START how_to_cloud_dataproc_create_workflow_template]
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
task_id="create_workflow_template",
template=WORKFLOW_TEMPLATE,
project_id=PROJECT_ID,
location=REGION,
)
# [END how_to_cloud_dataproc_create_workflow_template]
# [START how_to_cloud_dataproc_trigger_workflow_template]
trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)
# [END how_to_cloud_dataproc_trigger_workflow_template]
pig_task = DataprocSubmitJobOperator(
task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID
)
@ -210,6 +239,7 @@ with models.DAG("example_gcp_dataproc", start_date=days_ago(1), schedule_interva
# [END how_to_cloud_dataproc_delete_cluster_operator]
create_cluster >> scale_cluster
scale_cluster >> create_workflow_template >> trigger_workflow >> delete_cluster
scale_cluster >> hive_task >> delete_cluster
scale_cluster >> pig_task >> delete_cluster
scale_cluster >> spark_sql_task >> delete_cluster

Просмотреть файл

@ -1545,6 +1545,70 @@ class DataprocSubmitPySparkJobOperator(DataprocJobBaseOperator):
super().execute(context)
class DataprocCreateWorkflowTemplateOperator(BaseOperator):
"""
Creates new workflow template.
:param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
:type project_id: str
:param location: Required. The Cloud Dataproc region in which to handle the request.
:type location: str
:param template: The Dataproc workflow template to create. If a dict is provided,
it must be of the same form as the protobuf message WorkflowTemplate.
:type template: Union[dict, WorkflowTemplate]
:param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
retried.
:type retry: google.api_core.retry.Retry
:param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
``retry`` is specified, the timeout applies to each individual attempt.
:type timeout: float
:param metadata: Additional metadata that is provided to the method.
:type metadata: Sequence[Tuple[str, str]]
"""
template_fields = ("location", "template")
template_fields_renderers = {"template": "json"}
def __init__(
self,
*,
location: str,
template: Dict,
project_id: str,
retry: Optional[Retry] = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
):
super().__init__(**kwargs)
self.location = location
self.template = template
self.project_id = project_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
def execute(self, context):
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
self.log.info("Creating template")
try:
workflow = hook.create_workflow_template(
location=self.location,
template=self.template,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
self.log.info("Workflow %s created", workflow.name)
except AlreadyExists:
self.log.info("Workflow with given id already exists")
class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
"""
Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait
@ -1596,7 +1660,8 @@ class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
:type impersonation_chain: Union[str, Sequence[str]]
"""
template_fields = ['template_id', 'impersonation_chain']
template_fields = ['template_id', 'impersonation_chain', 'request_id', 'parameters']
template_fields_renderers = {"parameters": "json"}
@apply_defaults
def __init__( # pylint: disable=too-many-arguments

Просмотреть файл

@ -180,6 +180,30 @@ Example of the configuration for a SparkR:
:start-after: [START how_to_cloud_dataproc_sparkr_config]
:end-before: [END how_to_cloud_dataproc_sparkr_config]
Working with workflows templates
--------------------------------
Dataproc supports creating workflow templates that can be triggered later on.
A workflow template can be created using:
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateWorkflowTemplateOperator`.
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_create_workflow_template]
:end-before: [END how_to_cloud_dataproc_create_workflow_template]
Once a workflow is created users can trigger it using
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator`:
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_dataproc_trigger_workflow_template]
:end-before: [END how_to_cloud_dataproc_trigger_workflow_template]
References
^^^^^^^^^^
For further information, take a look at:

Просмотреть файл

@ -173,7 +173,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHadoopJobOperator',
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator',
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator',
# Deprecated operator. Ignore it
'airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator',
# Base operator. Ignore it

Просмотреть файл

@ -28,6 +28,7 @@ from airflow import AirflowException
from airflow.providers.google.cloud.operators.dataproc import (
ClusterGenerator,
DataprocCreateClusterOperator,
DataprocCreateWorkflowTemplateOperator,
DataprocDeleteClusterOperator,
DataprocInstantiateInlineWorkflowTemplateOperator,
DataprocInstantiateWorkflowTemplateOperator,
@ -115,6 +116,18 @@ RETRY = mock.MagicMock(Retry)
METADATA = [("key", "value")]
REQUEST_ID = "request_id_uuid"
WORKFLOW_NAME = "airflow-dataproc-test"
WORKFLOW_TEMPLATE = {
"id": WORKFLOW_NAME,
"placement": {
"managed_cluster": {
"cluster_name": CLUSTER_NAME,
"config": CLUSTER,
}
},
"jobs": [{"step_id": "pig_job_1", "pig_job": {}}],
}
def assert_warning(msg: str, warning: Any):
assert any(msg in str(w) for w in warning.warnings)
@ -914,3 +927,29 @@ class TestDataProcPySparkOperator(unittest.TestCase):
)
job = op.generate_job()
self.assertDictEqual(self.job, job)
class TestDataprocCreateWorkflowTemplateOperator:
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
def test_execute(self, mock_hook):
op = DataprocCreateWorkflowTemplateOperator(
task_id=TASK_ID,
gcp_conn_id=GCP_CONN_ID,
impersonation_chain=IMPERSONATION_CHAIN,
location=GCP_LOCATION,
project_id=GCP_PROJECT,
retry=RETRY,
timeout=TIMEOUT,
metadata=METADATA,
template=WORKFLOW_TEMPLATE,
)
op.execute(context={})
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
mock_hook.return_value.create_workflow_template.assert_called_once_with(
location=GCP_LOCATION,
project_id=GCP_PROJECT,
retry=RETRY,
timeout=TIMEOUT,
metadata=METADATA,
template=WORKFLOW_TEMPLATE,
)