在 Google Cloud Storage 中传输数据¶
Google Cloud Storage (GCS) 用于存储来自各种应用程序的大数据。请注意,在 GCS 术语中,文件被称为对象,因此本指南中“对象”和“文件”这两个术语可以互换使用。有几个操作符的目的是复制数据作为 Google Cloud 服务的一部分。此页面展示了如何使用这些操作符。另请参阅 Google Cloud Storage 操作符,了解用于管理 Google Cloud Storage 存储分区的操作符。
Cloud Storage Transfer Service¶
有许多操作员管理 Google Cloud 数据传输服务。如果你想创建一个新的数据传输任务,请使用操作员 CloudDataTransferServiceCreateJobOperator
你还可以为此服务使用之前的操作员 - CloudDataTransferServiceGCSToGCSOperator
这些操作员不会在本地控制复制过程,而是使用 Google 资源,这使他们能够更快、更经济地执行此任务。当 Airflow 未托管在 Google Cloud 中时,经济效益尤为突出,因为这些操作员减少了出站流量。
如果启用了指定在将对象传输到接收器后是否应从源中删除对象的选项,则这些操作员会修改源对象。
当你使用 Google Cloud 数据传输服务时,你可以指定是否允许覆盖接收器中已存在的对象,是否应删除仅存在于接收器中的对象,或者是否应在将对象传输到接收器后从源中删除对象。
可以使用包含和排除前缀以及基于文件修改日期来指定源对象。
如果你需要有关如何使用它的信息,请查看指南:Google Cloud 传输服务操作员
适用于 Google Cloud Storage 的专用传输操作员¶
请参阅 Google 传输操作员,了解到和来自 Google Cloud Storage 的专用传输操作员列表。
本地传输¶
有两个用于复制数据的运算符,整个过程由本地控制。
在下一节中将对其进行描述。
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
使用Cloud 控制台选择或创建 Cloud Platform 项目。
为您的项目启用帐单,如Google Cloud 文档中所述。
启用 API,如Cloud 控制台文档中所述。
通过pip安装 API 库。
pip install 'apache-airflow[google]'有关安装的详细信息,请访问此处。
运算符¶
GCSToGCSOperator¶
GCSToGCSOperator
允许您在 GCS 中复制一个或多个文件。这些文件可以在两个不同的存储分区之间或在一个存储分区内进行复制。复制始终在不考虑目标存储分区初始状态的情况下进行。
此运算符仅在文件移动选项处于活动状态时才删除源存储分区中的对象。在两个不同的存储分区之间复制文件时,此运算符绝不会删除目标存储分区中的数据。
使用此运算符时,您可以指定是否应在将对象传输到接收器后从源中删除对象。源对象可以使用单个通配符以及基于文件修改日期来指定。
可以通过使用 match_glob 字段 根据其路径筛选对象。应避免在源对象路径中使用 delimiter
字段或通配符,因为这两种做法均已弃用。此外,还可以根据文件的创建日期 (is_older_than
) 或修改日期 (last_modified_time
和 maximum_modified_time
) 来筛选。
此运算符的默认工作方式可与 cp
命令相比较。当文件移动选项处于活动状态时,此运算符的功能类似于 mv
命令。
以下是使用 GCSToGCSOperator 复制单个文件、使用通配符复制多个文件、复制多个文件、移动单个文件和移动多个文件的示例。
复制单个文件¶
以下示例将从 BUCKET_1_SRC
GCS 存储分区复制单个文件 OBJECT_1
到 BUCKET_1_DST
存储分区。请注意,如果标志 exact_match=False
,则 source_object
将被视为 BUCKET_1_SRC
GCS 存储分区中搜索对象的词缀。这就是为什么如果找到任何对象,它们也将被复制。为防止这种情况发生,请使用 exact_match=False
。
copy_single_file = GCSToGCSOperator(
task_id="copy_single_gcs_file",
source_bucket=BUCKET_NAME_SRC,
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used
destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used
exact_match=True,
)
复制多个文件¶
有多种复制多个文件的方法,以下列出了各种示例。
如前所述,delimiter
字段以及在源对象中使用通配符 (*
) 已弃用。因此,不建议使用它们,而应使用 match_glob
,如下所示
以下示例将复制与 data/
文件夹中的 glob 模式匹配的文件,从 BUCKET_1_SRC
GCS 存储分区复制到 backup/
文件夹中的 BUCKET_1_DST
存储分区。
copy_files_with_match_glob = GCSToGCSOperator(
task_id="copy_files_with_match_glob",
source_bucket=BUCKET_NAME_SRC,
source_object="data/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
match_glob="**/*.txt",
)
以下示例将复制 subdir/
文件夹中的所有文件(即 subdir/a.csv、subdir/b.csv、subdir/c.csv),从 BUCKET_1_SRC
GCS 存储分区复制到 backup/
文件夹中的 BUCKET_1_DST
存储分区。(即 backup/a.csv、backup/b.csv、backup/c.csv)
copy_files = GCSToGCSOperator(
task_id="copy_files",
source_bucket=BUCKET_NAME_SRC,
source_object="subdir/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
copy_files_with_list = GCSToGCSOperator(
task_id="copy_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[OBJECT_1, OBJECT_2], # Instead of files each element could be a wildcard expression
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
最后,可以通过省略 source_object
参数并向 source_objects
参数提供一个列表来复制文件。在此示例中,OBJECT_1
和 OBJECT_2
将从 BUCKET_1_SRC
复制到 BUCKET_1_DST
。提供大小为 1 的列表与向 source_object
参数提供一个值的作用相同。
移动单个文件¶
向 move
参数提供 True
会导致操作员在复制完成后删除 source_object
。请注意,如果标志 exact_match=False
,则 source_object
将被视为 BUCKET_1_SRC
GCS 存储分区中搜索对象的限定符。这就是为什么如果找到任何对象,它们也将被复制。要防止这种情况发生,请使用 exact_match=False
。
move_single_file = GCSToGCSOperator(
task_id="move_single_file",
source_bucket=BUCKET_NAME_SRC,
source_object=OBJECT_1,
destination_bucket=BUCKET_NAME_DST,
destination_object="backup_" + OBJECT_1,
exact_match=True,
move_object=True,
)
移动多个文件¶
可以通过向 move
参数提供 True
来移动多个文件。关于通配符和 delimiter
参数的规则同样适用于移动和复制。
move_files_with_list = GCSToGCSOperator(
task_id="move_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[OBJECT_1, OBJECT_2],
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
GCSSynchronizeBucketsOperator¶
GCSSynchronizeBucketsOperator
运算符检查目标存储区的初始状态,然后将其与源存储区进行比较。在此基础上,它创建一个操作计划,描述应从目标存储区中删除哪些对象,应覆盖哪些对象,以及应复制哪些对象。
此运算符绝不会修改源存储区中的数据。
使用此运算符时,可以指定是否允许覆盖已存在于接收器中的对象,是否应删除仅存在于接收器中的对象,是否处理子目录或处理哪个子目录。
此运算符的工作方式可与 rsync
命令进行比较。
基本同步¶
以下示例将确保 BUCKET_1_SRC
中的所有文件(包括子目录中的任何文件)也位于 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名文件,则不会覆盖它们。它不会删除 BUCKET_1_DST
中不存在于 BUCKET_1_SRC
中的任何文件。
sync_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)
完整存储区同步¶
此示例将确保 BUCKET_1_SRC
中的所有文件(包括子目录中的任何文件)也位于 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名文件,则会覆盖它们。它将删除 BUCKET_1_DST
中不存在于 BUCKET_1_SRC
中的任何文件。
sync_full_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_full_bucket",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
delete_extra_files=True,
allow_overwrite=True,
)
同步到子目录¶
以下示例将确保 BUCKET_1_SRC
中的所有文件(包括子目录中的文件)也位于 BUCKET_1_DST
中的 subdir
文件夹中。如果 BUCKET_1_DST/subdir
中已存在同名文件,它不会覆盖这些文件;如果 BUCKET_1_DST/subdir
中存在不在 BUCKET_1_SRC
中的文件,它也不会删除这些文件。
sync_to_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_to_subdirectory",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
destination_object="subdir/",
)
从子目录同步¶
此示例将确保 BUCKET_1_SRC/subdir
中的所有文件(包括子目录中的文件)也位于 BUCKET_1_DST
中。如果 BUCKET_1_DST
中已存在同名文件,它不会覆盖这些文件;如果 BUCKET_1_DST
中存在不在 BUCKET_1_SRC/subdir
中的文件,它也不会删除这些文件。
sync_from_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_from_subdirectory",
source_bucket=BUCKET_NAME_SRC,
source_object="subdir/",
destination_bucket=BUCKET_NAME_DST,
)