Google Cloud Storage 操作符¶
Cloud Storage 允许随时随地存储和检索任意数量的数据。你可以将 Cloud Storage 用于各种场景,包括提供网站内容、存储归档和灾难恢复数据,或通过直接下载向用户分发大型数据对象。
请参阅 Google 传输操作符,了解到和从 Google Cloud Storage 的专门传输操作符列表。
先决任务¶
要使用这些操作符,你必须执行以下操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为你的项目启用结算,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
运算符¶
GCSTimeSpanFileTransformOperator¶
使用 GCSTimeSpanFileTransformOperator
转换在特定时间跨度(数据间隔)内修改的文件。时间跨度由时间跨度的开始和结束时间戳定义。如果 DAG 没有安排下一个 DAG 实例,则时间跨度结束无限,这意味着运算符处理所有早于 data_interval_start
的文件。
gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
task_id="gcs_timespan_transform_files",
source_bucket=BUCKET_NAME_SRC,
source_prefix=SOURCE_PREFIX,
source_gcp_conn_id=SOURCE_GCP_CONN_ID,
destination_bucket=BUCKET_NAME_DST,
destination_prefix=DESTINATION_PREFIX,
destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
transform_script=["python", TRANSFORM_SCRIPT_PATH],
)
GCSBucketCreateAclEntryOperator¶
在指定存储分区上创建新的 ACL 条目。
有关参数定义,请参阅 GCSBucketCreateAclEntryOperator
使用运算符¶
gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
bucket=BUCKET_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_BUCKET_ROLE,
task_id="gcs_bucket_create_acl_entry_task",
)
模板化¶
template_fields: Sequence[str] = (
"bucket",
"entity",
"role",
"user_project",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Storage 文档以 为存储分区创建新的 ACL 条目。
GCSObjectCreateAclEntryOperator¶
在指定对象上创建新的 ACL 条目。
有关参数定义,请参阅 GCSObjectCreateAclEntryOperator
使用运算符¶
gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
bucket=BUCKET_NAME,
object_name=FILE_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_OBJECT_ROLE,
task_id="gcs_object_create_acl_entry_task",
)
模板¶
template_fields: Sequence[str] = (
"bucket",
"object_name",
"entity",
"generation",
"role",
"user_project",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Storage 插入文档,以创建 ObjectAccess 的 ACL 条目。
删除存储分区¶
删除存储分区允许您从 Google Cloud Storage 中移除存储分区对象。它通过GCSDeleteBucketOperator
操作员执行。
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME)
您可以将Jinja 模板与bucket_name
, gcp_conn_id
, impersonation_chain
, user_project
参数一起使用,这些参数允许您动态确定值。
传感器¶
GCSObjectExistenceSensor¶
使用GCSObjectExistenceSensor
来等待(轮询)Google Cloud Storage 中的文件是否存在。
gcs_object_exists = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task",
)
此外,如果您希望在传感器运行时释放工作程序槽,则可以在此操作员中使用可延迟模式。
gcs_object_exists_defered = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)
GCSObjectExistenceAsyncSensor¶
GCSObjectExistenceAsyncSensor
已弃用,将在将来的版本中移除。请使用 GCSObjectExistenceSensor
并使用该操作符中的可延迟模式。
gcs_object_exists_async = GCSObjectExistenceAsyncSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task_async",
)
GCSObjectsWithPrefixExistenceSensor¶
使用 GCSObjectsWithPrefixExistenceSensor
等待(轮询)Google Cloud Storage 中具有指定前缀的文件的存在。
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task",
)
如果希望此传感器异步运行,从而更有效地利用 Airflow 部署中的资源,则可以将 deferrable
参数设置为 True。但是,此功能需要启用触发器组件才能正常工作。
gcs_object_with_prefix_exists_async = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task_async",
deferrable=True,
)
GCSUploadSessionCompleteSensor¶
使用 GCSUploadSessionCompleteSensor
检查 Google Cloud Storage 中具有指定前缀的文件数量的变化。
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_complete_task",
)
如果希望在传感器运行时释放工作程序槽,则可以将参数 deferrable
设置为 True。
gcs_upload_session_async_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_async_complete",
deferrable=True,
)
GCSObjectUpdateSensor¶
使用 GCSObjectUpdateSensor
检查 Google Cloud Storage 中的对象是否已更新。
gcs_update_object_exists = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task",
)
如果希望此传感器异步运行,从而有效利用 Airflow 部署中的资源,则可以将 deferrable
参数设置为 True。但是,触发器组件需要启用才能使用此功能。
gcs_update_object_exists_async = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task_async",
deferrable=True,
)
更多信息¶
传感器有不同的模式,这些模式决定了任务执行期间资源的行为。有关使用传感器时的最佳实践,请参阅 Airflow 传感器文档。