Google Cloud PubSub 操作符

Google Cloud PubSub 是一种完全托管的实时消息服务,允许您在独立的应用程序之间发送和接收消息。您可以利用 Cloud Pub/Sub 的灵活性来解耦托管在 Google Cloud 或 Internet 其他位置的系统和组件。

发布者应用程序可以将消息发送到主题,其他应用程序可以订阅该主题以接收消息。通过解耦发送者和接收者,Google Cloud PubSub 允许开发人员在独立编写的应用程序之间进行通信。

先决条件任务

要使用这些操作符,您必须执行以下几项操作

创建 PubSub 主题

PubSub 主题是一个命名资源,发布者将消息发送到该资源。PubSubCreateTopicOperator 操作符用于创建主题。

tests/system/google/cloud/pubsub/example_pubsub.py[源代码]

    create_topic = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
    )

创建 PubSub 订阅

Subscription 是一个命名资源,表示来自单个特定主题的消息流,这些消息将传递到订阅应用程序。PubSubCreateSubscriptionOperator 操作符用于创建订阅。

tests/system/google/cloud/pubsub/example_pubsub.py[源代码]

    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
    )

发布 PubSub 消息

Message 是发布者发送到主题并最终传递给订阅者的数据和(可选)属性的组合。PubSubPublishMessageOperator 操作符用于发布消息。

tests/system/google/cloud/pubsub/example_pubsub.py[源代码]

    publish_task = PubSubPublishMessageOperator(
        task_id="publish_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        messages=[MESSAGE, MESSAGE],
    )

从 PubSub 订阅拉取消息

PubSubPullSensor 传感器从 PubSub 订阅拉取消息,并通过 XCom 传递它们。

tests/system/google/cloud/pubsub/example_pubsub.py[源代码]

    subscription = subscribe_task.output

    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

此外,对于此操作,您可以使用可延迟模式的传感器

tests/system/google/cloud/pubsub/example_pubsub_deferrable.py[源代码]

pull_messages_async = PubSubPullSensor(
    task_id="pull_messages_async",
    ack_messages=True,
    project_id=PROJECT_ID,
    subscription=subscription,
    deferrable=True,
)

tests/system/google/cloud/pubsub/example_pubsub.py[源代码]


    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

要从 XCom 拉取消息,请使用 BashOperator

tests/system/google/cloud/pubsub/example_pubsub.py[源代码]

echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""

tests/system/google/cloud/pubsub/example_pubsub.py[源代码]

    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)

删除 PubSub 订阅

PubSubDeleteSubscriptionOperator 操作符用于删除订阅。

tests/system/google/cloud/pubsub/example_pubsub.py[源代码]

    unsubscribe_task = PubSubDeleteSubscriptionOperator(
        task_id="unsubscribe_task",
        project_id=PROJECT_ID,
        subscription=subscription,
    )

删除 PubSub 主题

PubSubDeleteTopicOperator 操作符用于删除主题。

tests/system/google/cloud/pubsub/example_pubsub.py[源代码]

    delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)

参考

有关更多信息,请查看

此条目是否有帮助?