Google Cloud PubSub 运算符¶
Google Cloud PubSub 是一项完全托管的实时消息传递服务,允许您在独立的应用程序之间发送和接收消息。您可以利用 Cloud Pub/Sub 的灵活性来解耦托管在 Google Cloud 或互联网其他位置的系统和组件。
发布者应用程序可以向主题发送消息,其他应用程序可以订阅该主题以接收消息。通过解耦发送者和接收者,Google Cloud PubSub 允许开发人员在独立编写的应用程序之间进行通信。
先决条件任务¶
要使用这些运算符,您必须执行一些操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为您的项目启用结算,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息,请参阅。
创建 PubSub 主题¶
PubSub 主题是一个已命名的资源,发布者向其发送消息。PubSubCreateTopicOperator
运算符会创建一个主题。
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
)
创建 PubSub 订阅¶
Subscription
是一个已命名的资源,表示来自单个特定主题的消息流,该消息流将传递给订阅应用程序。PubSubCreateSubscriptionOperator
运算符会创建订阅。
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
)
发布 PubSub 消息¶
Message
是数据和(可选)属性的组合,发布者将其发送到主题,并最终传递给订阅者。PubSubPublishMessageOperator
运算符会发布消息。
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=PROJECT_ID,
topic=TOPIC_ID,
messages=[MESSAGE, MESSAGE],
)
从 PubSub 订阅中提取消息¶
PubSubPullSensor
传感器从 PubSub 订阅中提取消息,并通过 XCom 传递这些消息。
subscription = subscribe_task.output
pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
对于此操作,您还可以在可延迟模式下使用传感器
pull_messages_async = PubSubPullSensor(
task_id="pull_messages_async",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
deferrable=True,
)
pull_messages_operator = PubSubPullOperator(
task_id="pull_messages_operator",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
要从 XCom 拉取消息,请使用 BashOperator
。
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""
pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)
删除 PubSub 订阅¶
PubSubDeleteSubscriptionOperator
运算符删除订阅。
unsubscribe_task = PubSubDeleteSubscriptionOperator(
task_id="unsubscribe_task",
project_id=PROJECT_ID,
subscription=subscription,
)
删除 PubSub 主题¶
PubSubDeleteTopicOperator
运算符删除主题。
delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)