在 Google Cloud Storage 中传输数据¶
Google Cloud Storage (GCS) 用于存储来自各种应用程序的大量数据。 请注意,在 GCS 术语中,文件称为对象,因此本指南中“对象”和“文件”这两个术语可以互换使用。 有几个操作符的目的是作为 Google Cloud Service 的一部分复制数据。 此页面展示了如何使用这些操作符。 另请参阅 Google Cloud Storage 操作符,了解用于管理 Google Cloud Storage 存储桶的操作符。
Cloud Storage Transfer Service¶
有许多操作符用于管理 Google Cloud Data Transfer 服务。 如果您想创建新的数据传输任务,请使用操作符 CloudDataTransferServiceCreateJobOperator
您也可以使用此服务之前的操作符 - CloudDataTransferServiceGCSToGCSOperator
这些操作符不控制本地的复制过程,而是使用 Google 资源,这使得它们能够更快、更经济地执行此任务。当 Airflow 未托管在 Google Cloud 中时,经济效益尤其显著,因为这些操作符减少了出口流量。
如果启用了指定在传输到接收器后是否应从源删除对象的选项,则这些操作符会修改源对象。
当您使用 Google Cloud Data Transfer 服务时,您可以指定是否允许覆盖接收器中已存在的对象,是否应删除仅存在于接收器中的对象,或者是否应在传输到接收器后从源删除对象。
可以使用包含和排除前缀,以及基于文件修改日期来指定源对象。
如果您需要有关如何使用它的信息,请查看指南:Google Cloud Transfer Service 操作符
Google Cloud Storage 的专用传输操作符¶
有关进出 Google Cloud Storage 的专用传输操作符列表,请参阅 Google 传输操作符。
本地传输¶
有两个操作符用于复制数据,其中整个过程在本地控制。
在下一节中将对它们进行描述。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
按照 Google Cloud 文档中的说明,为您的项目启用结算。
按照 Cloud Console 文档中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请访问 安装。
操作符¶
GCSToGCSOperator¶
GCSToGCSOperator
允许您在 GCS 中复制一个或多个文件。 文件可以在两个不同的存储桶之间或在一个存储桶内复制。 复制始终在不考虑目标存储桶初始状态的情况下进行。
仅当文件移动选项处于活动状态时,此操作符才会删除源存储桶中的对象。 在两个不同的存储桶之间复制文件时,此操作符永远不会删除目标存储桶中的数据。
当您使用此操作符时,您可以指定是否应在传输到接收器后从源删除对象。 可以使用单个通配符以及基于文件修改日期来指定源对象。
可以使用 match_glob 字段来根据其路径筛选对象。 您应避免使用 delimiter
字段,也不要在源对象路径中使用通配符,因为这两种做法都已弃用。 此外,还可以基于文件的创建日期 (is_older_than
) 或修改日期 (last_modified_time
和 maximum_modified_time
) 来进行筛选。
默认情况下,此操作符的工作方式可以与 cp
命令进行比较。 当文件移动选项处于活动状态时,此操作符的功能类似于 mv
命令。
以下是使用 GCSToGCSOperator 复制单个文件、使用通配符复制多个文件、复制多个文件、移动单个文件和移动多个文件的示例。
复制单个文件¶
以下示例将把单个文件 OBJECT_1
从 BUCKET_1_SRC
GCS 存储桶复制到 BUCKET_1_DST
存储桶。 请注意,如果标志 exact_match=False
,则 source_object
将被视为在 BUCKET_1_SRC
GCS 存储桶中搜索对象的前缀。 这就是为什么如果找到任何对象,它们也将被复制。 为了防止这种情况发生,请使用 exact_match=False
。
copy_single_file = GCSToGCSOperator(
task_id="copy_single_gcs_file",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + OBJECT_1,
destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used
destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used
exact_match=True,
)
复制多个文件¶
有几种方法可以复制多个文件,下面展示了其中的各种示例。
如前所述,delimiter
字段以及在源对象中使用通配符 (*
) 都已弃用。 因此,不建议使用它们,而是使用 match_glob
,如下所示
以下示例将把 data/
文件夹中与 glob 模式匹配的文件从 BUCKET_1_SRC
GCS 存储桶复制到 BUCKET_1_DST
存储桶中的 backup/
文件夹。
copy_files_with_match_glob = GCSToGCSOperator(
task_id="copy_files_with_match_glob",
source_bucket=BUCKET_NAME_SRC,
source_object="data/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
match_glob="**/*.txt",
)
以下示例将把 subdir/
文件夹中的所有文件(即 subdir/a.csv、subdir/b.csv、subdir/c.csv)从 BUCKET_1_SRC
GCS 存储桶复制到 BUCKET_1_DST
存储桶中的 backup/
文件夹。(即 backup/a.csv、backup/b.csv、backup/c.csv)
copy_files = GCSToGCSOperator(
task_id="copy_files",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + "subdir/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
copy_files_with_list = GCSToGCSOperator(
task_id="copy_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[
PREFIX + OBJECT_1,
PREFIX + OBJECT_2,
], # Instead of files each element could be a wildcard expression
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
最后,可以通过省略 source_object
参数,而是向 source_objects
参数提供列表来复制文件。 在此示例中,OBJECT_1
和 OBJECT_2
将从 BUCKET_1_SRC
复制到 BUCKET_1_DST
。 提供大小为 1 的列表的功能与向 source_object
参数提供值相同。
移动单个文件¶
向 move
参数提供 True
会导致操作符在复制完成后删除 source_object
。 请注意,如果标志 exact_match=False
,则 source_object
将被视为在 BUCKET_1_SRC
GCS 存储桶中搜索对象的前缀。 这就是为什么如果找到任何对象,它们也将被复制。 为了防止这种情况发生,请使用 exact_match=False
。
move_single_file = GCSToGCSOperator(
task_id="move_single_file",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + OBJECT_1,
destination_bucket=BUCKET_NAME_DST,
destination_object="backup_" + OBJECT_1,
exact_match=True,
move_object=True,
)
移动多个文件¶
可以通过向 move
参数提供 True
来移动多个文件。 与复制一样,有关通配符和 delimiter
参数的规则也适用于移动。
move_files_with_list = GCSToGCSOperator(
task_id="move_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[PREFIX + OBJECT_1, PREFIX + OBJECT_2],
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
GCSSynchronizeBucketsOperator¶
GCSSynchronizeBucketsOperator
操作符检查目标存储桶的初始状态,然后将其与源存储桶进行比较。 基于此,它会创建一个操作计划,该计划描述了应从目标存储桶中删除哪些对象、应覆盖哪些对象以及应复制哪些对象。
此操作符永远不会修改源存储桶中的数据。
当您使用此运算符时,您可以指定是否允许覆盖接收器中已存在的对象、是否应删除仅存在于接收器中的对象、是否要处理子目录或要处理哪个子目录。
此运算符的工作方式可以与 rsync
命令进行比较。
基本同步¶
以下示例将确保 BUCKET_1_SRC
中的所有文件(包括任何子目录中的文件)也存在于 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名文件,则不会覆盖它们。不会删除 BUCKET_1_DST
中而不在 BUCKET_1_SRC
中的任何文件。
sync_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)
完整存储桶同步¶
此示例将确保 BUCKET_1_SRC
中的所有文件(包括任何子目录中的文件)也存在于 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名文件,则会覆盖它们。会删除 BUCKET_1_DST
中而不在 BUCKET_1_SRC
中的任何文件。
sync_full_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_full_bucket",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
delete_extra_files=True,
allow_overwrite=True,
)
同步到子目录¶
以下示例将确保 BUCKET_1_SRC
中的所有文件(包括任何子目录中的文件)也存在于 BUCKET_1_DST
中的 subdir
文件夹中。如果 BUCKET_1_DST/subdir
中已存在同名文件,则不会覆盖它们,也不会删除 BUCKET_1_DST/subdir
中而不在 BUCKET_1_SRC
中的任何文件。
sync_to_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_to_subdirectory",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
destination_object="subdir/",
)
从子目录同步¶
此示例将确保 BUCKET_1_SRC/subdir
中的所有文件(包括任何子目录中的文件)也存在于 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名文件,则不会覆盖它们,也不会删除 BUCKET_1_DST
中而不在 BUCKET_1_SRC/subdir
中的任何文件。
sync_from_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_from_subdirectory",
source_bucket=BUCKET_NAME_SRC,
source_object="subdir/",
destination_bucket=BUCKET_NAME_DST,
)