airflow.providers.google.cloud.sensors.gcs

此模块包含 Google Cloud Storage 传感器。

GCSObjectExistenceSensor

检查 Google Cloud Storage 中是否存在文件。

GCSObjectUpdateSensor

检查 Google Cloud Storage 中的对象是否已更新。

GCSObjectsWithPrefixExistenceSensor

检查给定前缀是否存在 GCS 对象,并通过 XCom 传递匹配项。

GCSUploadSessionCompleteSensor

如果非活动期已过,且存储桶中的对象数量未增加,则返回 True。

函数

ts_function(context)

作为 GoogleCloudStorageObjectUpdatedSensor 的默认回调函数。

get_time()

作为 datetime.datetime.now 的包装器,以便在单元测试中简化模拟。

模块内容

class airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor(*, bucket, object, use_glob=False, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, retry=DEFAULT_RETRY, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类:airflow.sensors.base.BaseSensorOperator

检查 Google Cloud Storage 中是否存在文件。

参数:
  • bucket (str) – 对象所在的 Google Cloud Storage 存储桶。

  • object (str) – 要在 Google Cloud Storage 存储桶中检查的对象的名称。

  • use_glob (bool) – 设置为 True 时,将对象参数解释为 glob

  • google_cloud_conn_id (str) – 连接到 Google Cloud Storage 时使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者链式账号列表,获取列表中最后一个账号的 access_token,该账号将在请求中被模拟。如果设置为字符串,则原始账号必须向该账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号向原始账号授予此角色(模板化)。

  • retry (google.api_core.retry.Retry) – (可选) 如何重试 RPC

template_fields: collections.abc.Sequence[str] = ('bucket', 'object', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket[source]
object[source]
use_glob = False[source]
google_cloud_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
retry[source]
deferrable = True[source]
poke(context)[source]

在派生此类时重写。

execute(context)[source]

Airflow 在 worker 上运行此方法,并使用触发器推迟执行。

execute_complete(context, event)[source]

作为触发器触发时的回调函数 - 立即返回。

依赖于触发器抛出异常,否则假定执行成功。

airflow.providers.google.cloud.sensors.gcs.ts_function(context)[source]

作为 GoogleCloudStorageObjectUpdatedSensor 的默认回调函数。

默认行为是检查对象在数据间隔结束之后是否已更新。

class airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor(bucket, object, ts_func=ts_function, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类:airflow.sensors.base.BaseSensorOperator

检查 Google Cloud Storage 中的对象是否已更新。

参数:
  • bucket (str) – 对象所在的 Google Cloud Storage 存储桶。

  • object (str) – 要在 Google Cloud Storage 存储桶中下载的对象的名称。

  • ts_func (Callable) – 用于定义更新条件的回调函数。默认回调函数返回 logical_date + schedule_interval。回调函数接受 context 作为参数。

  • google_cloud_conn_id (str) – 连接到 Google Cloud Storage 时使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者链式账号列表,获取列表中最后一个账号的 access_token,该账号将在请求中被模拟。如果设置为字符串,则原始账号必须向该账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号向原始账号授予此角色(模板化)。

  • deferrable (bool) – 在可推迟模式下运行传感器。

template_fields: collections.abc.Sequence[str] = ('bucket', 'object', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket[source]
object[source]
ts_func[source]
google_cloud_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
deferrable = True[source]
poke(context)[source]

在派生此类时重写。

execute(context)[source]

Airflow 在 worker 上运行此方法,并使用触发器推迟执行。

execute_complete(context, event=None)[source]

立即返回并依赖触发器抛出成功事件。触发器的回调函数。

class airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefixExistenceSensor(bucket, prefix, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类:airflow.sensors.base.BaseSensorOperator

检查给定前缀是否存在 GCS 对象,并通过 XCom 传递匹配项。

当找到匹配给定前缀的文件时,poke 方法的条件将得到满足,匹配的对象将从操作器返回,并通过 XCom 传递给下游任务。

参数:
  • bucket (str) – 对象所在的 Google Cloud Storage 存储桶。

  • prefix (str) – 要在 Google Cloud Storage 存储桶中检查的前缀名称。

  • google_cloud_conn_id (str) – 连接到 Google Cloud Storage 时使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者链式账号列表,获取列表中最后一个账号的 access_token,该账号将在请求中被模拟。如果设置为字符串,则原始账号必须向该账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号向原始账号授予此角色(模板化)。

  • deferrable (bool) – 在可推迟模式下运行传感器。

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket[source]
prefix[source]
google_cloud_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
deferrable = True[source]
poke(context)[source]

在派生此类时重写。

execute(context)[source]

重写以允许传递匹配项。

execute_complete(context, event)[source]

立即返回并依赖触发器抛出成功事件。触发器的回调函数。

airflow.providers.google.cloud.sensors.gcs.get_time()[source]

作为 datetime.datetime.now 的包装器,以便在单元测试中简化模拟。

class airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor(bucket, prefix, inactivity_period=60 * 60, min_objects=1, previous_objects=None, allow_delete=True, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类:airflow.sensors.base.BaseSensorOperator

如果非活动期已过,且存储桶中的对象数量未增加,则返回 True。

检查 Google Cloud Storage 存储桶中前缀下对象数量的变化,如果非活动期已过且对象数量没有增加,则返回 True。请注意,此传感器在重新调度模式下表现不正确,因为 GCS 存储桶中列出的对象状态会在重新调度调用之间丢失。

参数:
  • bucket (str) – 预期对象所在的 Google Cloud Storage 存储桶。

  • prefix (str) – 要在 Google Cloud Storage 存储桶中检查的前缀名称。

  • inactivity_period (float) – 用于确定上传会话结束的总非活动秒数。请注意,此机制不是实时的,此操作器可能要等到此周期过去且没有感知到其他对象后才会返回(延迟一个 poke_interval)。

  • min_objects (int) – 上传会话被视为有效所需的最小对象数量。

  • previous_objects (set[str] | None) – 上次 poke 期间找到的对象 ID 集合。

  • allow_delete (bool) – 此传感器是否应将两次 poke 之间对象被删除视为有效行为。如果为 True,当发生这种情况时会记录警告消息。如果为 False,则会引发错误。

  • google_cloud_conn_id (str) – 连接到 Google Cloud Storage 时使用的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者链式账号列表,获取列表中最后一个账号的 access_token,该账号将在请求中被模拟。如果设置为字符串,则原始账号必须向该账号授予 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须向直接前一个身份授予 Service Account Token Creator IAM 角色,列表中的第一个账号向原始账号授予此角色(模板化)。

  • deferrable (bool) – 在可推迟模式下运行传感器。

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket[source]
prefix[source]
inactivity_period = 3600[source]
min_objects = 1[source]
previous_objects[source]
inactivity_seconds = 0[source]
allow_delete = True[source]
google_cloud_conn_id = 'google_cloud_default'[source]
last_activity_time = None[source]
impersonation_chain = None[source]
hook: airflow.providers.google.cloud.hooks.gcs.GCSHook | None = None[source]
deferrable = True[source]
is_bucket_updated(current_objects)[source]

检查是否添加了新对象且非活动周期已过,并更新状态。

参数:

current_objects (set[str]) – 上次探测时桶中的对象ID集合。

poke(context)[source]

在派生此类时重写。

execute(context)[source]

Airflow 在 worker 上运行此方法,并使用触发器推迟执行。

execute_complete(context, event=None)[source]

依赖触发器抛出异常,否则认为执行成功。

触发器触发时的回调 - 立即返回。

这条目有帮助吗?