[AIRFLOW-XXX] Add How-To-Guide to GCP PubSub (#6497)
This commit is contained in:
Родитель
5d3a5db051
Коммит
d985c02d9f
|
@ -37,24 +37,32 @@ MESSAGE = {"data": b"Tool", "attributes": {"name": "wrench", "mass": "1.3kg", "c
|
|||
|
||||
default_args = {"start_date": airflow.utils.dates.days_ago(1)}
|
||||
|
||||
# [START howto_operator_gcp_pubsub_pull_messages_result_cmd]
|
||||
echo_cmd = """
|
||||
{% for m in task_instance.xcom_pull('pull_messages') %}
|
||||
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
|
||||
{% endfor %}
|
||||
"""
|
||||
# [END howto_operator_gcp_pubsub_pull_messages_result_cmd]
|
||||
|
||||
with models.DAG(
|
||||
"example_gcp_pubsub",
|
||||
default_args=default_args,
|
||||
schedule_interval=None, # Override to match your needs
|
||||
) as example_dag:
|
||||
# [START howto_operator_gcp_pubsub_create_topic]
|
||||
create_topic = PubSubTopicCreateOperator(
|
||||
task_id="create_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
|
||||
)
|
||||
# [END howto_operator_gcp_pubsub_create_topic]
|
||||
|
||||
# [START howto_operator_gcp_pubsub_create_subscription]
|
||||
subscribe_task = PubSubSubscriptionCreateOperator(
|
||||
task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC
|
||||
)
|
||||
# [END howto_operator_gcp_pubsub_create_subscription]
|
||||
|
||||
# [START howto_operator_gcp_pubsub_pull_message]
|
||||
subscription = "{{ task_instance.xcom_pull('subscribe_task') }}"
|
||||
|
||||
pull_messages = PubSubPullSensor(
|
||||
|
@ -63,26 +71,36 @@ with models.DAG(
|
|||
project_id=GCP_PROJECT_ID,
|
||||
subscription=subscription,
|
||||
)
|
||||
# [END howto_operator_gcp_pubsub_pull_message]
|
||||
|
||||
# [START howto_operator_gcp_pubsub_pull_messages_result]
|
||||
pull_messages_result = BashOperator(
|
||||
task_id="pull_messages_result", bash_command=echo_cmd
|
||||
)
|
||||
# [END howto_operator_gcp_pubsub_pull_messages_result]
|
||||
|
||||
# [START howto_operator_gcp_pubsub_publish]
|
||||
publish_task = PubSubPublishOperator(
|
||||
task_id="publish_task",
|
||||
project_id=GCP_PROJECT_ID,
|
||||
topic=TOPIC,
|
||||
messages=[MESSAGE, MESSAGE, MESSAGE],
|
||||
)
|
||||
# [END howto_operator_gcp_pubsub_publish]
|
||||
|
||||
# [START howto_operator_gcp_pubsub_unsubscribe]
|
||||
unsubscribe_task = PubSubSubscriptionDeleteOperator(
|
||||
task_id="unsubscribe_task",
|
||||
project_id=GCP_PROJECT_ID,
|
||||
subscription="{{ task_instance.xcom_pull('subscribe_task') }}",
|
||||
)
|
||||
# [END howto_operator_gcp_pubsub_unsubscribe]
|
||||
|
||||
# [START howto_operator_gcp_pubsub_delete_topic]
|
||||
delete_topic = PubSubTopicDeleteOperator(
|
||||
task_id="delete_topic", topic=TOPIC, project_id=GCP_PROJECT_ID
|
||||
)
|
||||
# [END howto_operator_gcp_pubsub_delete_topic]
|
||||
|
||||
create_topic >> subscribe_task >> publish_task
|
||||
subscribe_task >> pull_messages >> pull_messages_result >> unsubscribe_task >> delete_topic
|
||||
|
|
|
@ -33,6 +33,10 @@ from airflow.utils.decorators import apply_defaults
|
|||
class PubSubTopicCreateOperator(BaseOperator):
|
||||
"""Create a PubSub topic.
|
||||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:PubSubTopicCreateOperator`
|
||||
|
||||
By default, if the topic already exists, this operator will
|
||||
not cause the DAG to fail. ::
|
||||
|
||||
|
@ -166,6 +170,10 @@ class PubSubTopicCreateOperator(BaseOperator):
|
|||
class PubSubSubscriptionCreateOperator(BaseOperator):
|
||||
"""Create a PubSub subscription.
|
||||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:PubSubSubscriptionCreateOperator`
|
||||
|
||||
By default, the subscription will be created in ``topic_project``. If
|
||||
``subscription_project`` is specified and the GCP credentials allow, the
|
||||
Subscription can be created in a different project from its topic.
|
||||
|
@ -354,6 +362,10 @@ class PubSubSubscriptionCreateOperator(BaseOperator):
|
|||
class PubSubTopicDeleteOperator(BaseOperator):
|
||||
"""Delete a PubSub topic.
|
||||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:PubSubTopicDeleteOperator`
|
||||
|
||||
By default, if the topic does not exist, this operator will
|
||||
not cause the DAG to fail. ::
|
||||
|
||||
|
@ -462,6 +474,10 @@ class PubSubTopicDeleteOperator(BaseOperator):
|
|||
class PubSubSubscriptionDeleteOperator(BaseOperator):
|
||||
"""Delete a PubSub subscription.
|
||||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:PubSubSubscriptionDeleteOperator`
|
||||
|
||||
By default, if the subscription does not exist, this operator will
|
||||
not cause the DAG to fail. ::
|
||||
|
||||
|
@ -572,6 +588,10 @@ class PubSubSubscriptionDeleteOperator(BaseOperator):
|
|||
class PubSubPublishOperator(BaseOperator):
|
||||
"""Publish messages to a PubSub topic.
|
||||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:PubSubPublishOperator`
|
||||
|
||||
Each Task publishes all provided messages to the same topic
|
||||
in a single GCP project. If the topic does not exist, this
|
||||
task will fail. ::
|
||||
|
|
|
@ -32,6 +32,10 @@ from airflow.utils.decorators import apply_defaults
|
|||
class PubSubPullSensor(BaseSensorOperator):
|
||||
"""Pulls messages from a PubSub subscription and passes them through XCom.
|
||||
|
||||
.. seealso::
|
||||
For more information on how to use this operator, take a look at the guide:
|
||||
:ref:`howto/operator:PubSubPullSensor`
|
||||
|
||||
This sensor operator will pull up to ``max_messages`` messages from the
|
||||
specified PubSub subscription. When the subscription returns messages,
|
||||
the poke method's criteria will be fulfilled and the messages will be
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
.. Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
.. http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
.. Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
|
||||
|
||||
|
||||
Google Cloud PubSub Operators
|
||||
=============================
|
||||
|
||||
`Google Cloud PubSub <https://cloud.google.com/pubsub/>`__ is a fully-managed real-time
|
||||
messaging service that allows you to send and receive messages between independent applications.
|
||||
You can leverage Cloud Pub/Sub’s flexibility to decouple systems and components hosted
|
||||
on Google Cloud Platform or elsewhere on the Internet.
|
||||
|
||||
Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages.
|
||||
By decoupling senders and receivers, Google Cloud PubSub allows developers to communicate between independently written applications.
|
||||
|
||||
.. contents::
|
||||
:depth: 1
|
||||
:local:
|
||||
|
||||
Prerequisite Tasks
|
||||
^^^^^^^^^^^^^^^^^^
|
||||
|
||||
.. include:: _partials/prerequisite_tasks.rst
|
||||
|
||||
.. _howto/operator:PubSubTopicCreateOperator:
|
||||
|
||||
Creating a PubSub topic
|
||||
^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The PubSub topic is a named resource to which messages are sent by publishers.
|
||||
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubTopicCreateOperator` operator would create a topic.
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
|
||||
:language: python
|
||||
:start-after: [START howto_operator_gcp_pubsub_create_topic]
|
||||
:end-before: [END howto_operator_gcp_pubsub_create_topic]
|
||||
|
||||
|
||||
.. _howto/operator:PubSubSubscriptionCreateOperator:
|
||||
|
||||
Creating a PubSub subscription
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The subscription is named resource representing the stream of messages from a single, specific topic,
|
||||
to be delivered to the subscribing application.
|
||||
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionCreateOperator` operator would create a subscription.
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
|
||||
:language: python
|
||||
:start-after: [START howto_operator_gcp_pubsub_create_subscription]
|
||||
:end-before: [END howto_operator_gcp_pubsub_create_subscription]
|
||||
|
||||
|
||||
.. _howto/operator:PubSubPublishOperator:
|
||||
|
||||
Publishing PubSub messages
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
A ``Message`` is the combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers.
|
||||
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPublishOperator` operator would publish messages.
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
|
||||
:language: python
|
||||
:start-after: [START howto_operator_gcp_pubsub_publish]
|
||||
:end-before: [END howto_operator_gcp_pubsub_publish]
|
||||
|
||||
|
||||
.. _howto/operator:PubSubPullSensor:
|
||||
|
||||
Pulling messages from a PubSub subscription
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
The :class:`~airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor` sensor would pull messages from a PubSub subscription
|
||||
and pass them through XCom.
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
|
||||
:language: python
|
||||
:start-after: [START howto_operator_gcp_pubsub_pull_message]
|
||||
:end-before: [END howto_operator_gcp_pubsub_pull_message]
|
||||
|
||||
To pull messages from XCom :class:`~airflow.operators.bash_operator.BashOperator` operator would do this.
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
|
||||
:language: python
|
||||
:start-after: [START howto_operator_gcp_pubsub_pull_messages_result_cmd]
|
||||
:end-before: [END howto_operator_gcp_pubsub_pull_messages_result_cmd]
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
|
||||
:language: python
|
||||
:start-after: [START howto_operator_gcp_pubsub_pull_messages_result]
|
||||
:end-before: [END howto_operator_gcp_pubsub_pull_messages_result]
|
||||
|
||||
|
||||
.. _howto/operator:PubSubSubscriptionDeleteOperator:
|
||||
|
||||
Deleting a PubSub subscription
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionDeleteOperator` operator would delete a subscription.
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
|
||||
:language: python
|
||||
:start-after: [START howto_operator_gcp_pubsub_unsubscribe]
|
||||
:end-before: [END howto_operator_gcp_pubsub_unsubscribe]
|
||||
|
||||
|
||||
.. _howto/operator:PubSubTopicDeleteOperator:
|
||||
|
||||
Deleting a PubSub topic
|
||||
^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubTopicDeleteOperator` operator would delete a topic.
|
||||
|
||||
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py
|
||||
:language: python
|
||||
:start-after: [START howto_operator_gcp_pubsub_delete_topic]
|
||||
:end-before: [END howto_operator_gcp_pubsub_delete_topic]
|
||||
|
||||
|
||||
Reference
|
||||
^^^^^^^^^
|
||||
|
||||
For further information, look at:
|
||||
|
||||
* `Client Library Documentation <https://googleapis.dev/python/pubsub/latest/index.html>`__
|
||||
* `Product Documentation <https://cloud.google.com/pubsub/docs/>`__
|
|
@ -606,10 +606,10 @@ These integrations allow you to perform various operations within the Google Clo
|
|||
-
|
||||
|
||||
* - `Cloud Pub/Sub <https://cloud.google.com/pubsub/>`__
|
||||
-
|
||||
- :mod:`airflow.providers.google.cloud.hooks.pubsub`
|
||||
- :mod:`airflow.providers.google.cloud.operators.pubsub`
|
||||
- :mod:`airflow.providers.google.cloud.sensors.pubsub`
|
||||
- :doc:`How to use <howto/operator/gcp/pubsub>`
|
||||
- :mod:`airflow.providers.google.cloud..hooks.pubsub`
|
||||
- :mod:`airflow.providers.google.cloud..operators.pubsub`
|
||||
- :mod:`airflow.providers.google.cloud..sensors.pubsub`
|
||||
|
||||
* - `Cloud Spanner <https://cloud.google.com/spanner/>`__
|
||||
- :doc:`How to use <howto/operator/gcp/spanner>`
|
||||
|
|
Загрузка…
Ссылка в новой задаче