airflow.providers.google.cloud.sensors.pubsub

此模块包含 Google PubSub 传感器。

模块内容

PubSubPullSensor

从 PubSub 订阅中拉取消息并通过 XCom 传递它们。

exception airflow.providers.google.cloud.sensors.pubsub.PubSubMessageTransformException[源代码]

基类:airflow.exceptions.AirflowException

当消息无法转换 PubSub 接收的格式时引发。

class airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor(*, project_id, subscription, max_messages=5, return_immediately=True, ack_messages=False, gcp_conn_id='google_cloud_default', messages_callback=None, impersonation_chain=None, poke_interval=10.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源代码]

基类:airflow.sensors.base.BaseSensorOperator

从 PubSub 订阅中拉取消息并通过 XCom 传递它们。

始终等待至少从订阅返回一条消息。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:从 PubSub 订阅中拉取消息

另请参阅

如果您不想等待至少一条消息到达,请改用操作符:PubSubPullOperator

此传感器操作符将从指定的 PubSub 订阅中拉取最多 max_messages 条消息。当订阅返回消息时,将满足 poke 方法的标准,并且消息将从操作符返回并通过 XCom 传递给下游任务。

如果 ack_messages 设置为 True,消息将在返回之前立即被确认,否则,下游任务将负责确认它们。

如果您想要一个非阻塞的任务,不需要等待消息,请改用 PubSubPullOperator

project_idsubscription 是模板化的,因此您可以在其中使用变量。

参数
  • project_id (str) – 订阅的 Google Cloud 项目 ID(模板化)

  • subscription (str) – Pub/Sub 订阅名称。不包括完整的订阅路径。

  • max_messages (int) – 每个 PubSub 拉取请求要检索的最大消息数

  • return_immediately (bool) – 如果此字段设置为 true,即使 Pull 响应中没有可返回的消息,系统也会立即响应。否则,系统可能会等待(在有限的时间内),直到至少有一条消息可用,而不是返回没有消息。警告:不鼓励将此字段设置为 true,因为它会对 Pull 操作的性能产生不利影响。我们建议用户不要设置此字段。

  • ack_messages (bool) – 如果为 True,每条消息将立即被确认,而不是由任何下游任务确认

  • gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。

  • messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (可选)用于处理接收到的消息的回调。其返回值将保存到 XCom。如果您要拉取大型消息,您可能需要提供自定义回调。如果未提供,则默认实现将使用 google.protobuf.json_format.MessageToDict 函数将 ReceivedMessage 对象转换为 JSON 可序列化的字典。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接在前身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(模板化)。

  • deferrable (bool) – 在可延迟模式下运行传感器

template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[源代码]
ui_color = '#ff7f50'[源代码]
poke(context)[源代码]

在派生此类时覆盖。

execute(context)[源代码]

如果 deferrable 为 True,Airflow 会在 worker 上运行此方法并使用触发器进行延迟。

execute_complete(context, event)[源代码]

如果提供了 messages_callback,则执行它;否则,立即返回触发事件消息。

此条目是否有帮助?