在 Google Cloud Storage 中传输数据

Google Cloud Storage (GCS) 用于存储来自各种应用程序的大数据。请注意,在 GCS 术语中,文件被称为对象,因此本指南中“对象”和“文件”这两个术语可以互换使用。有几个操作符的目的是复制数据作为 Google Cloud 服务的一部分。此页面展示了如何使用这些操作符。另请参阅 Google Cloud Storage 操作符,了解用于管理 Google Cloud Storage 存储分区的操作符。

Cloud Storage Transfer Service

有许多操作员管理 Google Cloud 数据传输服务。如果你想创建一个新的数据传输任务,请使用操作员 CloudDataTransferServiceCreateJobOperator 你还可以为此服务使用之前的操作员 - CloudDataTransferServiceGCSToGCSOperator

这些操作员不会在本地控制复制过程,而是使用 Google 资源,这使他们能够更快、更经济地执行此任务。当 Airflow 未托管在 Google Cloud 中时,经济效益尤为突出,因为这些操作员减少了出站流量。

如果启用了指定在将对象传输到接收器后是否应从源中删除对象的选项,则这些操作员会修改源对象。

当你使用 Google Cloud 数据传输服务时,你可以指定是否允许覆盖接收器中已存在的对象,是否应删除仅存在于接收器中的对象,或者是否应在将对象传输到接收器后从源中删除对象。

可以使用包含和排除前缀以及基于文件修改日期来指定源对象。

如果你需要有关如何使用它的信息,请查看指南:Google Cloud 传输服务操作员

适用于 Google Cloud Storage 的专用传输操作员

请参阅 Google 传输操作员,了解到和来自 Google Cloud Storage 的专用传输操作员列表。

本地传输

有两个用于复制数据的运算符,整个过程由本地控制。

在下一节中将对其进行描述。

先决条件任务

要使用这些运算符,您必须执行以下几项操作

运算符

GCSToGCSOperator

GCSToGCSOperator允许您在 GCS 中复制一个或多个文件。这些文件可以在两个不同的存储分区之间或在一个存储分区内进行复制。复制始终在不考虑目标存储分区初始状态的情况下进行。

此运算符仅在文件移动选项处于活动状态时才删除源存储分区中的对象。在两个不同的存储分区之间复制文件时,此运算符绝不会删除目标存储分区中的数据。

使用此运算符时,您可以指定是否应在将对象传输到接收器后从源中删除对象。源对象可以使用单个通配符以及基于文件修改日期来指定。

可以通过使用 match_glob 字段 根据其路径筛选对象。应避免在源对象路径中使用 delimiter 字段或通配符,因为这两种做法均已弃用。此外,还可以根据文件的创建日期 (is_older_than) 或修改日期 (last_modified_timemaximum_modified_time) 来筛选。

此运算符的默认工作方式可与 cp 命令相比较。当文件移动选项处于活动状态时,此运算符的功能类似于 mv 命令。

以下是使用 GCSToGCSOperator 复制单个文件、使用通配符复制多个文件、复制多个文件、移动单个文件和移动多个文件的示例。

复制单个文件

以下示例将从 BUCKET_1_SRC GCS 存储分区复制单个文件 OBJECT_1BUCKET_1_DST 存储分区。请注意,如果标志 exact_match=False,则 source_object 将被视为 BUCKET_1_SRC GCS 存储分区中搜索对象的词缀。这就是为什么如果找到任何对象,它们也将被复制。为防止这种情况发生,请使用 exact_match=False

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

copy_single_file = GCSToGCSOperator(
    task_id="copy_single_gcs_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,  # If not supplied the source_bucket value will be used
    destination_object="backup_" + OBJECT_1,  # If not supplied the source_object value will be used
    exact_match=True,
)

复制多个文件

有多种复制多个文件的方法,以下列出了各种示例。

如前所述,delimiter 字段以及在源对象中使用通配符 (*) 已弃用。因此,不建议使用它们,而应使用 match_glob,如下所示

以下示例将复制与 data/ 文件夹中的 glob 模式匹配的文件,从 BUCKET_1_SRC GCS 存储分区复制到 backup/ 文件夹中的 BUCKET_1_DST 存储分区。

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

copy_files_with_match_glob = GCSToGCSOperator(
    task_id="copy_files_with_match_glob",
    source_bucket=BUCKET_NAME_SRC,
    source_object="data/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
    match_glob="**/*.txt",
)

以下示例将复制 subdir/ 文件夹中的所有文件(即 subdir/a.csv、subdir/b.csv、subdir/c.csv),从 BUCKET_1_SRC GCS 存储分区复制到 backup/ 文件夹中的 BUCKET_1_DST 存储分区。(即 backup/a.csv、backup/b.csv、backup/c.csv)

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

copy_files = GCSToGCSOperator(
    task_id="copy_files",
    source_bucket=BUCKET_NAME_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

copy_files_with_list = GCSToGCSOperator(
    task_id="copy_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[OBJECT_1, OBJECT_2],  # Instead of files each element could be a wildcard expression
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

最后,可以通过省略 source_object 参数并向 source_objects 参数提供一个列表来复制文件。在此示例中,OBJECT_1OBJECT_2 将从 BUCKET_1_SRC 复制到 BUCKET_1_DST。提供大小为 1 的列表与向 source_object 参数提供一个值的作用相同。

移动单个文件

move 参数提供 True 会导致操作员在复制完成后删除 source_object。请注意,如果标志 exact_match=False,则 source_object 将被视为 BUCKET_1_SRC GCS 存储分区中搜索对象的限定符。这就是为什么如果找到任何对象,它们也将被复制。要防止这种情况发生,请使用 exact_match=False

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

move_single_file = GCSToGCSOperator(
    task_id="move_single_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup_" + OBJECT_1,
    exact_match=True,
    move_object=True,
)

移动多个文件

可以通过向 move 参数提供 True 来移动多个文件。关于通配符和 delimiter 参数的规则同样适用于移动和复制。

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

move_files_with_list = GCSToGCSOperator(
    task_id="move_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[OBJECT_1, OBJECT_2],
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

GCSSynchronizeBucketsOperator

GCSSynchronizeBucketsOperator 运算符检查目标存储区的初始状态,然后将其与源存储区进行比较。在此基础上,它创建一个操作计划,描述应从目标存储区中删除哪些对象,应覆盖哪些对象,以及应复制哪些对象。

此运算符绝不会修改源存储区中的数据。

使用此运算符时,可以指定是否允许覆盖已存在于接收器中的对象,是否应删除仅存在于接收器中的对象,是否处理子目录或处理哪个子目录。

此运算符的工作方式可与 rsync 命令进行比较。

基本同步

以下示例将确保 BUCKET_1_SRC 中的所有文件(包括子目录中的任何文件)也位于 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名文件,则不会覆盖它们。它不会删除 BUCKET_1_DST 中不存在于 BUCKET_1_SRC 中的任何文件。

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

sync_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)

完整存储区同步

此示例将确保 BUCKET_1_SRC 中的所有文件(包括子目录中的任何文件)也位于 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名文件,则会覆盖它们。它将删除 BUCKET_1_DST 中不存在于 BUCKET_1_SRC 中的任何文件。

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

sync_full_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_full_bucket",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    delete_extra_files=True,
    allow_overwrite=True,
)

同步到子目录

以下示例将确保 BUCKET_1_SRC 中的所有文件(包括子目录中的文件)也位于 BUCKET_1_DST 中的 subdir 文件夹中。如果 BUCKET_1_DST/subdir 中已存在同名文件,它不会覆盖这些文件;如果 BUCKET_1_DST/subdir 中存在不在 BUCKET_1_SRC 中的文件,它也不会删除这些文件。

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

sync_to_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_to_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="subdir/",
)

从子目录同步

此示例将确保 BUCKET_1_SRC/subdir 中的所有文件(包括子目录中的文件)也位于 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名文件,它不会覆盖这些文件;如果 BUCKET_1_DST 中存在不在 BUCKET_1_SRC/subdir 中的文件,它也不会删除这些文件。

tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py[source]

sync_from_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_from_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_NAME_DST,
)

此条目是否有用?