Google Cloud PubSub 运算符

Google Cloud PubSub 是一项完全托管的实时消息传递服务,允许您在独立的应用程序之间发送和接收消息。您可以利用 Cloud Pub/Sub 的灵活性来解耦托管在 Google Cloud 或互联网其他位置的系统和组件。

发布者应用程序可以向主题发送消息,其他应用程序可以订阅该主题以接收消息。通过解耦发送者和接收者,Google Cloud PubSub 允许开发人员在独立编写的应用程序之间进行通信。

先决条件任务

要使用这些运算符,您必须执行一些操作

创建 PubSub 主题

PubSub 主题是一个已命名的资源,发布者向其发送消息。PubSubCreateTopicOperator 运算符会创建一个主题。

tests/system/providers/google/cloud/pubsub/example_pubsub.py[源代码]

    create_topic = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
    )

创建 PubSub 订阅

Subscription 是一个已命名的资源,表示来自单个特定主题的消息流,该消息流将传递给订阅应用程序。PubSubCreateSubscriptionOperator 运算符会创建订阅。

tests/system/providers/google/cloud/pubsub/example_pubsub.py[源代码]

    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
    )

发布 PubSub 消息

Message 是数据和(可选)属性的组合,发布者将其发送到主题,并最终传递给订阅者。PubSubPublishMessageOperator 运算符会发布消息。

tests/system/providers/google/cloud/pubsub/example_pubsub.py[源代码]

    publish_task = PubSubPublishMessageOperator(
        task_id="publish_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        messages=[MESSAGE, MESSAGE],
    )

从 PubSub 订阅中提取消息

PubSubPullSensor 传感器从 PubSub 订阅中提取消息,并通过 XCom 传递这些消息。

tests/system/providers/google/cloud/pubsub/example_pubsub.py[源代码]

    subscription = subscribe_task.output

    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

对于此操作,您还可以在可延迟模式下使用传感器

tests/system/providers/google/cloud/pubsub/example_pubsub_deferrable.py[源代码]

pull_messages_async = PubSubPullSensor(
    task_id="pull_messages_async",
    ack_messages=True,
    project_id=PROJECT_ID,
    subscription=subscription,
    deferrable=True,
)

tests/system/providers/google/cloud/pubsub/example_pubsub.py[源代码]


    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

要从 XCom 拉取消息,请使用 BashOperator

tests/system/providers/google/cloud/pubsub/example_pubsub.py[源代码]

echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""

tests/system/providers/google/cloud/pubsub/example_pubsub.py[源代码]

    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)

删除 PubSub 订阅

PubSubDeleteSubscriptionOperator 运算符删除订阅。

tests/system/providers/google/cloud/pubsub/example_pubsub.py[源代码]

    unsubscribe_task = PubSubDeleteSubscriptionOperator(
        task_id="unsubscribe_task",
        project_id=PROJECT_ID,
        subscription=subscription,
    )

删除 PubSub 主题

PubSubDeleteTopicOperator 运算符删除主题。

tests/system/providers/google/cloud/pubsub/example_pubsub.py[源代码]

    delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)

参考

有关更多信息,请查看

此条目有帮助吗?