Google Cloud Storage 操作符

Cloud Storage 允许随时随地存储和检索任意数量的数据。你可以将 Cloud Storage 用于各种场景,包括提供网站内容、存储归档和灾难恢复数据,或通过直接下载向用户分发大型数据对象。

请参阅 Google 传输操作符,了解到和从 Google Cloud Storage 的专门传输操作符列表。

先决任务

要使用这些操作符,你必须执行以下操作

运算符

GCSTimeSpanFileTransformOperator

使用 GCSTimeSpanFileTransformOperator 转换在特定时间跨度(数据间隔)内修改的文件。时间跨度由时间跨度的开始和结束时间戳定义。如果 DAG 没有安排下一个 DAG 实例,则时间跨度结束无限,这意味着运算符处理所有早于 data_interval_start 的文件。

tests/system/providers/google/cloud/gcs/example_gcs_transform_timespan.py[源代码]

gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
    task_id="gcs_timespan_transform_files",
    source_bucket=BUCKET_NAME_SRC,
    source_prefix=SOURCE_PREFIX,
    source_gcp_conn_id=SOURCE_GCP_CONN_ID,
    destination_bucket=BUCKET_NAME_DST,
    destination_prefix=DESTINATION_PREFIX,
    destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
    transform_script=["python", TRANSFORM_SCRIPT_PATH],
)

GCSBucketCreateAclEntryOperator

在指定存储分区上创建新的 ACL 条目。

有关参数定义,请参阅 GCSBucketCreateAclEntryOperator

使用运算符

tests/system/providers/google/cloud/gcs/example_gcs_acl.py[源代码]

gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
    bucket=BUCKET_NAME,
    entity=GCS_ACL_ENTITY,
    role=GCS_ACL_BUCKET_ROLE,
    task_id="gcs_bucket_create_acl_entry_task",
)

模板化

template_fields: Sequence[str] = (
    "bucket",
    "entity",
    "role",
    "user_project",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud Storage 文档以 为存储分区创建新的 ACL 条目

GCSObjectCreateAclEntryOperator

在指定对象上创建新的 ACL 条目。

有关参数定义,请参阅 GCSObjectCreateAclEntryOperator

使用运算符

tests/system/providers/google/cloud/gcs/example_gcs_acl.py[源代码]

gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
    bucket=BUCKET_NAME,
    object_name=FILE_NAME,
    entity=GCS_ACL_ENTITY,
    role=GCS_ACL_OBJECT_ROLE,
    task_id="gcs_object_create_acl_entry_task",
)

模板

template_fields: Sequence[str] = (
    "bucket",
    "object_name",
    "entity",
    "generation",
    "role",
    "user_project",
    "impersonation_chain",
)

更多信息

请参阅 Google Cloud Storage 插入文档,以创建 ObjectAccess 的 ACL 条目

删除存储分区

删除存储分区允许您从 Google Cloud Storage 中移除存储分区对象。它通过GCSDeleteBucketOperator操作员执行。

tests/system/providers/google/cloud/gcs/example_gcs_upload_download.py[源代码]

delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME)

您可以将Jinja 模板bucket_name, gcp_conn_id, impersonation_chain, user_project参数一起使用,这些参数允许您动态确定值。

参考

有关更多信息,请参阅

传感器

GCSObjectExistenceSensor

使用GCSObjectExistenceSensor来等待(轮询)Google Cloud Storage 中的文件是否存在。

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[源代码]

gcs_object_exists = GCSObjectExistenceSensor(
    bucket=DESTINATION_BUCKET_NAME,
    object=FILE_NAME,
    task_id="gcs_object_exists_task",
)

此外,如果您希望在传感器运行时释放工作程序槽,则可以在此操作员中使用可延迟模式。

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[源代码]

gcs_object_exists_defered = GCSObjectExistenceSensor(
    bucket=DESTINATION_BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)

GCSObjectExistenceAsyncSensor

GCSObjectExistenceAsyncSensor 已弃用,将在将来的版本中移除。请使用 GCSObjectExistenceSensor 并使用该操作符中的可延迟模式。

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[源代码]

gcs_object_exists_async = GCSObjectExistenceAsyncSensor(
    bucket=DESTINATION_BUCKET_NAME,
    object=FILE_NAME,
    task_id="gcs_object_exists_task_async",
)

GCSObjectsWithPrefixExistenceSensor

使用 GCSObjectsWithPrefixExistenceSensor 等待(轮询)Google Cloud Storage 中具有指定前缀的文件的存在。

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[源代码]

gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
    bucket=DESTINATION_BUCKET_NAME,
    prefix=FILE_NAME[:5],
    task_id="gcs_object_with_prefix_exists_task",
)

如果希望此传感器异步运行,从而更有效地利用 Airflow 部署中的资源,则可以将 deferrable 参数设置为 True。但是,此功能需要启用触发器组件才能正常工作。

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[源代码]

gcs_object_with_prefix_exists_async = GCSObjectsWithPrefixExistenceSensor(
    bucket=DESTINATION_BUCKET_NAME,
    prefix=FILE_NAME[:5],
    task_id="gcs_object_with_prefix_exists_task_async",
    deferrable=True,
)

GCSUploadSessionCompleteSensor

使用 GCSUploadSessionCompleteSensor 检查 Google Cloud Storage 中具有指定前缀的文件数量的变化。

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[源代码]

gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
    bucket=DESTINATION_BUCKET_NAME,
    prefix=FILE_NAME,
    inactivity_period=15,
    min_objects=1,
    allow_delete=True,
    previous_objects=set(),
    task_id="gcs_upload_session_complete_task",
)

如果希望在传感器运行时释放工作程序槽,则可以将参数 deferrable 设置为 True。

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[源代码]

gcs_upload_session_async_complete = GCSUploadSessionCompleteSensor(
    bucket=DESTINATION_BUCKET_NAME,
    prefix=FILE_NAME,
    inactivity_period=15,
    min_objects=1,
    allow_delete=True,
    previous_objects=set(),
    task_id="gcs_upload_session_async_complete",
    deferrable=True,
)

GCSObjectUpdateSensor

使用 GCSObjectUpdateSensor 检查 Google Cloud Storage 中的对象是否已更新。

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[源代码]

gcs_update_object_exists = GCSObjectUpdateSensor(
    bucket=DESTINATION_BUCKET_NAME,
    object=FILE_NAME,
    task_id="gcs_object_update_sensor_task",
)

如果希望此传感器异步运行,从而有效利用 Airflow 部署中的资源,则可以将 deferrable 参数设置为 True。但是,触发器组件需要启用才能使用此功能。

tests/system/providers/google/cloud/gcs/example_gcs_sensor.py[源代码]

gcs_update_object_exists_async = GCSObjectUpdateSensor(
    bucket=DESTINATION_BUCKET_NAME,
    object=FILE_NAME,
    task_id="gcs_object_update_sensor_task_async",
    deferrable=True,
)

更多信息

传感器有不同的模式,这些模式决定了任务执行期间资源的行为。有关使用传感器时的最佳实践,请参阅 Airflow 传感器文档

参考

有关更多信息,请参阅

此条目是否有用?