Google Cloud Storage 操作符¶
Cloud Storage 允许随时随地存储和检索任意数量的数据。您可以使用 Cloud Storage 进行各种场景,包括服务网站内容、存储用于存档和灾难恢复的数据,或者通过直接下载向用户分发大型数据对象。
有关 Google Cloud Storage 之间进行专门传输的操作符列表,请参阅 Google Transfer 操作符。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用结算,如 Google Cloud 文档 中所述。
启用 API,如 Cloud Console 文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
操作符¶
GCSTimeSpanFileTransformOperator¶
使用 GCSTimeSpanFileTransformOperator
来转换在特定时间跨度(数据间隔)内修改的文件。时间跨度由时间跨度的开始和结束时间戳定义。如果 DAG 没有计划的下一个 DAG 实例,则时间跨度结束为无限,这意味着操作符处理所有早于 data_interval_start
的文件。
gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
task_id="gcs_timespan_transform_files",
source_bucket=BUCKET_NAME_SRC,
source_prefix=SOURCE_PREFIX,
source_gcp_conn_id=SOURCE_GCP_CONN_ID,
destination_bucket=BUCKET_NAME_DST,
destination_prefix=DESTINATION_PREFIX,
destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
transform_script=["python", TRANSFORM_SCRIPT_PATH],
)
GCSBucketCreateAclEntryOperator¶
在指定的存储桶上创建一个新的 ACL 条目。
有关参数定义,请参阅 GCSBucketCreateAclEntryOperator
使用操作符¶
gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
bucket=BUCKET_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_BUCKET_ROLE,
task_id="gcs_bucket_create_acl_entry_task",
)
模板化¶
template_fields: Sequence[str] = (
"bucket",
"entity",
"role",
"user_project",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Storage 文档,了解如何为存储桶创建一个新的 ACL 条目。
GCSObjectCreateAclEntryOperator¶
在指定的对象上创建一个新的 ACL 条目。
有关参数定义,请参阅 GCSObjectCreateAclEntryOperator
使用操作符¶
gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
bucket=BUCKET_NAME,
object_name=FILE_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_OBJECT_ROLE,
task_id="gcs_object_create_acl_entry_task",
)
模板化¶
template_fields: Sequence[str] = (
"bucket",
"object_name",
"entity",
"generation",
"role",
"user_project",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Storage 插入文档,了解如何为 ObjectAccess 创建 ACL 条目。
删除存储桶¶
删除存储桶允许您从 Google Cloud Storage 中删除存储桶对象。它通过 GCSDeleteBucketOperator
操作符执行。
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME)
您可以将 Jinja 模板 与 bucket_name
, gcp_conn_id
, impersonation_chain
, user_project
参数一起使用,这允许您动态地确定值。
传感器¶
GCSObjectExistenceSensor¶
使用 GCSObjectExistenceSensor
等待(轮询)Google Cloud Storage 中文件的存在。
gcs_object_exists = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task",
)
如果您希望在传感器运行时释放 worker 插槽,您也可以在此操作符中使用可延迟模式。
gcs_object_exists_defered = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)
GCSObjectsWithPrefixExistenceSensor¶
使用 GCSObjectsWithPrefixExistenceSensor
等待(轮询)Google Cloud Storage 中具有指定前缀的文件的存在。
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task",
)
如果您希望此传感器异步运行,从而更有效地利用 Airflow 部署中的资源,则可以将 deferrable
参数设置为 True。但是,需要启用触发器组件才能使此功能工作。
gcs_object_with_prefix_exists_async = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task_async",
deferrable=True,
)
GCSUploadSessionCompleteSensor¶
使用 GCSUploadSessionCompleteSensor
来检查 Google Cloud Storage 中具有指定前缀的文件数量是否发生变化。
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_complete_task",
)
如果您希望在传感器运行时释放 worker 插槽,可以将参数 deferrable
设置为 True。
gcs_upload_session_async_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_async_complete",
deferrable=True,
)
GCSObjectUpdateSensor¶
使用 GCSObjectUpdateSensor
来检查 Google Cloud Storage 中对象是否已更新。
gcs_update_object_exists = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task",
)
如果您希望此传感器异步运行,从而更有效地利用 Airflow 部署中的资源,则可以将 deferrable
参数设置为 True。但是,需要启用触发器组件才能使此功能工作。
gcs_update_object_exists_async = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task_async",
deferrable=True,
)
更多信息¶
传感器具有不同的模式,这些模式决定了任务执行时资源的行为。有关使用传感器的最佳实践,请参阅 Airflow 传感器文档。