在 Google Cloud Storage 中传输数据

Google Cloud Storage (GCS) 用于存储来自各种应用程序的大量数据。 请注意,在 GCS 术语中,文件称为对象,因此本指南中“对象”和“文件”这两个术语可以互换使用。 有几个操作符的目的是作为 Google Cloud Service 的一部分复制数据。 此页面展示了如何使用这些操作符。 另请参阅 Google Cloud Storage 操作符,了解用于管理 Google Cloud Storage 存储桶的操作符。

Cloud Storage Transfer Service

有许多操作符用于管理 Google Cloud Data Transfer 服务。 如果您想创建新的数据传输任务,请使用操作符 CloudDataTransferServiceCreateJobOperator 您也可以使用此服务之前的操作符 - CloudDataTransferServiceGCSToGCSOperator

这些操作符不控制本地的复制过程,而是使用 Google 资源,这使得它们能够更快、更经济地执行此任务。当 Airflow 未托管在 Google Cloud 中时,经济效益尤其显著,因为这些操作符减少了出口流量。

如果启用了指定在传输到接收器后是否应从源删除对象的选项,则这些操作符会修改源对象。

当您使用 Google Cloud Data Transfer 服务时,您可以指定是否允许覆盖接收器中已存在的对象,是否应删除仅存在于接收器中的对象,或者是否应在传输到接收器后从源删除对象。

可以使用包含和排除前缀,以及基于文件修改日期来指定源对象。

如果您需要有关如何使用它的信息,请查看指南:Google Cloud Transfer Service 操作符

Google Cloud Storage 的专用传输操作符

有关进出 Google Cloud Storage 的专用传输操作符列表,请参阅 Google 传输操作符

本地传输

有两个操作符用于复制数据,其中整个过程在本地控制。

在下一节中将对它们进行描述。

先决条件任务

要使用这些操作符,您必须执行以下几项操作

操作符

GCSToGCSOperator

GCSToGCSOperator 允许您在 GCS 中复制一个或多个文件。 文件可以在两个不同的存储桶之间或在一个存储桶内复制。 复制始终在不考虑目标存储桶初始状态的情况下进行。

仅当文件移动选项处于活动状态时,此操作符才会删除源存储桶中的对象。 在两个不同的存储桶之间复制文件时,此操作符永远不会删除目标存储桶中的数据。

当您使用此操作符时,您可以指定是否应在传输到接收器后从源删除对象。 可以使用单个通配符以及基于文件修改日期来指定源对象。

可以使用 match_glob 字段来根据其路径筛选对象。 您应避免使用 delimiter 字段,也不要在源对象路径中使用通配符,因为这两种做法都已弃用。 此外,还可以基于文件的创建日期 (is_older_than) 或修改日期 (last_modified_timemaximum_modified_time) 来进行筛选。

默认情况下,此操作符的工作方式可以与 cp 命令进行比较。 当文件移动选项处于活动状态时,此操作符的功能类似于 mv 命令。

以下是使用 GCSToGCSOperator 复制单个文件、使用通配符复制多个文件、复制多个文件、移动单个文件和移动多个文件的示例。

复制单个文件

以下示例将把单个文件 OBJECT_1BUCKET_1_SRC GCS 存储桶复制到 BUCKET_1_DST 存储桶。 请注意,如果标志 exact_match=False,则 source_object 将被视为在 BUCKET_1_SRC GCS 存储桶中搜索对象的前缀。 这就是为什么如果找到任何对象,它们也将被复制。 为了防止这种情况发生,请使用 exact_match=False

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

copy_single_file = GCSToGCSOperator(
    task_id="copy_single_gcs_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,  # If not supplied the source_bucket value will be used
    destination_object="backup_" + OBJECT_1,  # If not supplied the source_object value will be used
    exact_match=True,
)

复制多个文件

有几种方法可以复制多个文件,下面展示了其中的各种示例。

如前所述,delimiter 字段以及在源对象中使用通配符 (*) 都已弃用。 因此,不建议使用它们,而是使用 match_glob,如下所示

以下示例将把 data/ 文件夹中与 glob 模式匹配的文件从 BUCKET_1_SRC GCS 存储桶复制到 BUCKET_1_DST 存储桶中的 backup/ 文件夹。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

copy_files_with_match_glob = GCSToGCSOperator(
    task_id="copy_files_with_match_glob",
    source_bucket=BUCKET_NAME_SRC,
    source_object="data/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
    match_glob="**/*.txt",
)

以下示例将把 subdir/ 文件夹中的所有文件(即 subdir/a.csv、subdir/b.csv、subdir/c.csv)从 BUCKET_1_SRC GCS 存储桶复制到 BUCKET_1_DST 存储桶中的 backup/ 文件夹。(即 backup/a.csv、backup/b.csv、backup/c.csv)

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

copy_files = GCSToGCSOperator(
    task_id="copy_files",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + "subdir/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

copy_files_with_list = GCSToGCSOperator(
    task_id="copy_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[
        PREFIX + OBJECT_1,
        PREFIX + OBJECT_2,
    ],  # Instead of files each element could be a wildcard expression
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

最后,可以通过省略 source_object 参数,而是向 source_objects 参数提供列表来复制文件。 在此示例中,OBJECT_1OBJECT_2 将从 BUCKET_1_SRC 复制到 BUCKET_1_DST。 提供大小为 1 的列表的功能与向 source_object 参数提供值相同。

移动单个文件

move 参数提供 True 会导致操作符在复制完成后删除 source_object。 请注意,如果标志 exact_match=False,则 source_object 将被视为在 BUCKET_1_SRC GCS 存储桶中搜索对象的前缀。 这就是为什么如果找到任何对象,它们也将被复制。 为了防止这种情况发生,请使用 exact_match=False

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

move_single_file = GCSToGCSOperator(
    task_id="move_single_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup_" + OBJECT_1,
    exact_match=True,
    move_object=True,
)

移动多个文件

可以通过向 move 参数提供 True 来移动多个文件。 与复制一样,有关通配符和 delimiter 参数的规则也适用于移动。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

move_files_with_list = GCSToGCSOperator(
    task_id="move_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[PREFIX + OBJECT_1, PREFIX + OBJECT_2],
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

GCSSynchronizeBucketsOperator

GCSSynchronizeBucketsOperator 操作符检查目标存储桶的初始状态,然后将其与源存储桶进行比较。 基于此,它会创建一个操作计划,该计划描述了应从目标存储桶中删除哪些对象、应覆盖哪些对象以及应复制哪些对象。

此操作符永远不会修改源存储桶中的数据。

当您使用此运算符时,您可以指定是否允许覆盖接收器中已存在的对象、是否应删除仅存在于接收器中的对象、是否要处理子目录或要处理哪个子目录。

此运算符的工作方式可以与 rsync 命令进行比较。

基本同步

以下示例将确保 BUCKET_1_SRC 中的所有文件(包括任何子目录中的文件)也存在于 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名文件,则不会覆盖它们。不会删除 BUCKET_1_DST 中而不在 BUCKET_1_SRC 中的任何文件。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

sync_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)

完整存储桶同步

此示例将确保 BUCKET_1_SRC 中的所有文件(包括任何子目录中的文件)也存在于 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名文件,则会覆盖它们。会删除 BUCKET_1_DST 中而不在 BUCKET_1_SRC 中的任何文件。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

sync_full_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_full_bucket",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    delete_extra_files=True,
    allow_overwrite=True,
)

同步到子目录

以下示例将确保 BUCKET_1_SRC 中的所有文件(包括任何子目录中的文件)也存在于 BUCKET_1_DST 中的 subdir 文件夹中。如果 BUCKET_1_DST/subdir 中已存在同名文件,则不会覆盖它们,也不会删除 BUCKET_1_DST/subdir 中而不在 BUCKET_1_SRC 中的任何文件。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

sync_to_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_to_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="subdir/",
)

从子目录同步

此示例将确保 BUCKET_1_SRC/subdir 中的所有文件(包括任何子目录中的文件)也存在于 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名文件,则不会覆盖它们,也不会删除 BUCKET_1_DST 中而不在 BUCKET_1_SRC/subdir 中的任何文件。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py[源代码]

sync_from_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_from_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_NAME_DST,
)

此条目是否有帮助?