diff --git a/airflow/providers/google/cloud/example_dags/example_pubsub.py b/airflow/providers/google/cloud/example_dags/example_pubsub.py index 2359f60e2b..7f7bd5eb23 100644 --- a/airflow/providers/google/cloud/example_dags/example_pubsub.py +++ b/airflow/providers/google/cloud/example_dags/example_pubsub.py @@ -37,24 +37,32 @@ MESSAGE = {"data": b"Tool", "attributes": {"name": "wrench", "mass": "1.3kg", "c default_args = {"start_date": airflow.utils.dates.days_ago(1)} +# [START howto_operator_gcp_pubsub_pull_messages_result_cmd] echo_cmd = """ {% for m in task_instance.xcom_pull('pull_messages') %} echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}" {% endfor %} """ +# [END howto_operator_gcp_pubsub_pull_messages_result_cmd] with models.DAG( "example_gcp_pubsub", default_args=default_args, schedule_interval=None, # Override to match your needs ) as example_dag: + # [START howto_operator_gcp_pubsub_create_topic] create_topic = PubSubTopicCreateOperator( task_id="create_topic", topic=TOPIC, project_id=GCP_PROJECT_ID ) + # [END howto_operator_gcp_pubsub_create_topic] + # [START howto_operator_gcp_pubsub_create_subscription] subscribe_task = PubSubSubscriptionCreateOperator( task_id="subscribe_task", project_id=GCP_PROJECT_ID, topic=TOPIC ) + # [END howto_operator_gcp_pubsub_create_subscription] + + # [START howto_operator_gcp_pubsub_pull_message] subscription = "{{ task_instance.xcom_pull('subscribe_task') }}" pull_messages = PubSubPullSensor( @@ -63,26 +71,36 @@ with models.DAG( project_id=GCP_PROJECT_ID, subscription=subscription, ) + # [END howto_operator_gcp_pubsub_pull_message] + # [START howto_operator_gcp_pubsub_pull_messages_result] pull_messages_result = BashOperator( task_id="pull_messages_result", bash_command=echo_cmd ) + # [END howto_operator_gcp_pubsub_pull_messages_result] + # [START howto_operator_gcp_pubsub_publish] publish_task = PubSubPublishOperator( task_id="publish_task", project_id=GCP_PROJECT_ID, topic=TOPIC, messages=[MESSAGE, MESSAGE, MESSAGE], ) + # [END howto_operator_gcp_pubsub_publish] + + # [START howto_operator_gcp_pubsub_unsubscribe] unsubscribe_task = PubSubSubscriptionDeleteOperator( task_id="unsubscribe_task", project_id=GCP_PROJECT_ID, subscription="{{ task_instance.xcom_pull('subscribe_task') }}", ) + # [END howto_operator_gcp_pubsub_unsubscribe] + # [START howto_operator_gcp_pubsub_delete_topic] delete_topic = PubSubTopicDeleteOperator( task_id="delete_topic", topic=TOPIC, project_id=GCP_PROJECT_ID ) + # [END howto_operator_gcp_pubsub_delete_topic] create_topic >> subscribe_task >> publish_task subscribe_task >> pull_messages >> pull_messages_result >> unsubscribe_task >> delete_topic diff --git a/airflow/providers/google/cloud/operators/pubsub.py b/airflow/providers/google/cloud/operators/pubsub.py index d89a6cb766..afa6e634e1 100644 --- a/airflow/providers/google/cloud/operators/pubsub.py +++ b/airflow/providers/google/cloud/operators/pubsub.py @@ -33,6 +33,10 @@ from airflow.utils.decorators import apply_defaults class PubSubTopicCreateOperator(BaseOperator): """Create a PubSub topic. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:PubSubTopicCreateOperator` + By default, if the topic already exists, this operator will not cause the DAG to fail. :: @@ -166,6 +170,10 @@ class PubSubTopicCreateOperator(BaseOperator): class PubSubSubscriptionCreateOperator(BaseOperator): """Create a PubSub subscription. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:PubSubSubscriptionCreateOperator` + By default, the subscription will be created in ``topic_project``. If ``subscription_project`` is specified and the GCP credentials allow, the Subscription can be created in a different project from its topic. @@ -354,6 +362,10 @@ class PubSubSubscriptionCreateOperator(BaseOperator): class PubSubTopicDeleteOperator(BaseOperator): """Delete a PubSub topic. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:PubSubTopicDeleteOperator` + By default, if the topic does not exist, this operator will not cause the DAG to fail. :: @@ -462,6 +474,10 @@ class PubSubTopicDeleteOperator(BaseOperator): class PubSubSubscriptionDeleteOperator(BaseOperator): """Delete a PubSub subscription. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:PubSubSubscriptionDeleteOperator` + By default, if the subscription does not exist, this operator will not cause the DAG to fail. :: @@ -572,6 +588,10 @@ class PubSubSubscriptionDeleteOperator(BaseOperator): class PubSubPublishOperator(BaseOperator): """Publish messages to a PubSub topic. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:PubSubPublishOperator` + Each Task publishes all provided messages to the same topic in a single GCP project. If the topic does not exist, this task will fail. :: diff --git a/airflow/providers/google/cloud/sensors/pubsub.py b/airflow/providers/google/cloud/sensors/pubsub.py index 6f01954547..424147e7d9 100644 --- a/airflow/providers/google/cloud/sensors/pubsub.py +++ b/airflow/providers/google/cloud/sensors/pubsub.py @@ -32,6 +32,10 @@ from airflow.utils.decorators import apply_defaults class PubSubPullSensor(BaseSensorOperator): """Pulls messages from a PubSub subscription and passes them through XCom. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:PubSubPullSensor` + This sensor operator will pull up to ``max_messages`` messages from the specified PubSub subscription. When the subscription returns messages, the poke method's criteria will be fulfilled and the messages will be diff --git a/docs/howto/operator/gcp/pubsub.rst b/docs/howto/operator/gcp/pubsub.rst new file mode 100644 index 0000000000..4ff6744e68 --- /dev/null +++ b/docs/howto/operator/gcp/pubsub.rst @@ -0,0 +1,140 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +Google Cloud PubSub Operators +============================= + +`Google Cloud PubSub `__ is a fully-managed real-time +messaging service that allows you to send and receive messages between independent applications. +You can leverage Cloud Pub/Sub’s flexibility to decouple systems and components hosted +on Google Cloud Platform or elsewhere on the Internet. + +Publisher applications can send messages to a topic and other applications can subscribe to that topic to receive the messages. +By decoupling senders and receivers, Google Cloud PubSub allows developers to communicate between independently written applications. + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include:: _partials/prerequisite_tasks.rst + +.. _howto/operator:PubSubTopicCreateOperator: + +Creating a PubSub topic +^^^^^^^^^^^^^^^^^^^^^^^ + +The PubSub topic is a named resource to which messages are sent by publishers. +The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubTopicCreateOperator` operator would create a topic. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py + :language: python + :start-after: [START howto_operator_gcp_pubsub_create_topic] + :end-before: [END howto_operator_gcp_pubsub_create_topic] + + +.. _howto/operator:PubSubSubscriptionCreateOperator: + +Creating a PubSub subscription +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The subscription is named resource representing the stream of messages from a single, specific topic, +to be delivered to the subscribing application. +The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionCreateOperator` operator would create a subscription. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py + :language: python + :start-after: [START howto_operator_gcp_pubsub_create_subscription] + :end-before: [END howto_operator_gcp_pubsub_create_subscription] + + +.. _howto/operator:PubSubPublishOperator: + +Publishing PubSub messages +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A ``Message`` is the combination of data and (optional) attributes that a publisher sends to a topic and is eventually delivered to subscribers. +The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubPublishOperator` operator would publish messages. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py + :language: python + :start-after: [START howto_operator_gcp_pubsub_publish] + :end-before: [END howto_operator_gcp_pubsub_publish] + + +.. _howto/operator:PubSubPullSensor: + +Pulling messages from a PubSub subscription +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +The :class:`~airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor` sensor would pull messages from a PubSub subscription +and pass them through XCom. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py + :language: python + :start-after: [START howto_operator_gcp_pubsub_pull_message] + :end-before: [END howto_operator_gcp_pubsub_pull_message] + +To pull messages from XCom :class:`~airflow.operators.bash_operator.BashOperator` operator would do this. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py + :language: python + :start-after: [START howto_operator_gcp_pubsub_pull_messages_result_cmd] + :end-before: [END howto_operator_gcp_pubsub_pull_messages_result_cmd] + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py + :language: python + :start-after: [START howto_operator_gcp_pubsub_pull_messages_result] + :end-before: [END howto_operator_gcp_pubsub_pull_messages_result] + + +.. _howto/operator:PubSubSubscriptionDeleteOperator: + +Deleting a PubSub subscription +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubSubscriptionDeleteOperator` operator would delete a subscription. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py + :language: python + :start-after: [START howto_operator_gcp_pubsub_unsubscribe] + :end-before: [END howto_operator_gcp_pubsub_unsubscribe] + + +.. _howto/operator:PubSubTopicDeleteOperator: + +Deleting a PubSub topic +^^^^^^^^^^^^^^^^^^^^^^^ + +The :class:`~airflow.providers.google.cloud.operators.pubsub.PubSubTopicDeleteOperator` operator would delete a topic. + +.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_pubsub.py + :language: python + :start-after: [START howto_operator_gcp_pubsub_delete_topic] + :end-before: [END howto_operator_gcp_pubsub_delete_topic] + + +Reference +^^^^^^^^^ + +For further information, look at: + +* `Client Library Documentation `__ +* `Product Documentation `__ diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst index 9422409e37..05f11fa8ac 100644 --- a/docs/operators-and-hooks-ref.rst +++ b/docs/operators-and-hooks-ref.rst @@ -606,10 +606,10 @@ These integrations allow you to perform various operations within the Google Clo - * - `Cloud Pub/Sub `__ - - - - :mod:`airflow.providers.google.cloud.hooks.pubsub` - - :mod:`airflow.providers.google.cloud.operators.pubsub` - - :mod:`airflow.providers.google.cloud.sensors.pubsub` + - :doc:`How to use ` + - :mod:`airflow.providers.google.cloud..hooks.pubsub` + - :mod:`airflow.providers.google.cloud..operators.pubsub` + - :mod:`airflow.providers.google.cloud..sensors.pubsub` * - `Cloud Spanner `__ - :doc:`How to use `