airflow.providers.google.cloud.operators.gcs

此模块包含一个 Google Cloud Storage 存储桶操作符。

模块内容

GCSCreateBucketOperator

创建一个新的存储桶。

GCSListObjectsOperator

列出存储桶中所有以给定字符串前缀和分隔符过滤的对象,或者匹配 match_glob 的对象。

GCSDeleteObjectsOperator

从 Google Cloud Storage 存储桶中删除列表中的对象或所有匹配前缀的对象。

GCSBucketCreateAclEntryOperator

在指定的存储桶上创建一个新的 ACL 条目。

GCSObjectCreateAclEntryOperator

在指定对象上创建一个新的 ACL 条目。

GCSFileTransformOperator

将数据从源 GCS 位置复制到本地文件系统上的临时位置。

GCSTimeSpanFileTransformOperator

复制在时间跨度内修改的对象,运行转换,并将结果上传到存储桶。

GCSDeleteBucketOperator

从 Google Cloud Storage 中删除存储桶。

GCSSynchronizeBucketsOperator

同步 Google Cloud Services 中存储桶或存储桶目录的内容。

class airflow.providers.google.cloud.operators.gcs.GCSCreateBucketOperator(*, bucket_name, resource=None, storage_class='MULTI_REGIONAL', location='US', project_id=PROVIDE_PROJECT_ID, labels=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[源代码]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

创建一个新的存储桶。

Google Cloud Storage 使用扁平命名空间,因此您无法创建名称已被使用的存储桶。

另请参阅

有关详细信息,请参阅存储桶命名指南:https://cloud.google.com/storage/docs/bucketnaming.html#requirements

参数
  • bucket_name (str) – 存储桶的名称。(已模板化)

  • resource (dict | None) – 一个可选的字典,包含创建存储桶的参数。有关可用参数的信息,请参阅 Cloud Storage API 文档:https://cloud.google.com/storage/docs/json_api/v1/buckets/insert

  • storage_class (str) –

    这定义了存储桶中对象的存储方式,并确定了 SLA 和存储成本。(已模板化)值包括

    • MULTI_REGIONAL

    • REGIONAL

    • STANDARD

    • NEARLINE

    • COLDLINE.

    如果创建存储桶时未指定此值,则默认值为 STANDARD。

  • location (str) –

    存储桶的位置。(已模板化)存储桶中对象的数据位于该区域内的物理存储中。默认为美国。

  • project_id (str) – Google Cloud 项目的 ID。(已模板化)

  • labels (dict | None) – 用户提供的标签,以键/值对形式。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接前面的身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

以下操作符将使用 MULTI_REGIONAL 存储类在 EU 区域中创建一个新的存储桶 test-bucket

CreateBucket = GCSCreateBucketOperator(
    task_id="CreateNewBucket",
    bucket_name="test-bucket",
    storage_class="MULTI_REGIONAL",
    location="EU",
    labels={"env": "dev", "team": "airflow"},
    gcp_conn_id="airflow-conn-id",
)
template_fields: collections.abc.Sequence[str] = ('bucket_name', 'storage_class', 'location', 'project_id', 'impersonation_chain')[源代码]
ui_color = '#f0eee4'[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.gcs.GCSListObjectsOperator(*, bucket, prefix=None, delimiter=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, match_glob=None, **kwargs)[源代码]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

列出存储桶中所有以给定字符串前缀和分隔符过滤的对象,或者匹配 match_glob 的对象。

此操作符返回一个 python 列表,其中包含对象的名称,下游任务可以使用 XCom 获取这些名称。

参数
  • bucket (str) – 要查找对象的 Google Cloud Storage 存储桶。(已模板化)

  • prefix (str | list[str] | None) – 字符串或字符串列表,用于过滤名称以其开头/它们开头的对象。(已模板化)

  • delimiter (str | None) – (已弃用)您想要通过其过滤对象的分隔符。(已模板化)例如,要列出 GCS 中目录中的 CSV 文件,您将使用 delimiter=’.csv’。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接前面的身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

  • match_glob (str | None) – (可选)基于字符串给定的 glob 模式过滤对象(例如,'**/*/.json'

示例:

以下操作符将列出 data 存储桶中 sales/sales-2017 文件夹中的所有 Avro 文件。

GCS_Files = GCSListOperator(
    task_id="GCS_Files",
    bucket="data",
    prefix="sales/sales-2017/",
    match_glob="**/*/.avro",
    gcp_conn_id=google_cloud_conn_id,
)
template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter', 'impersonation_chain')[源代码]
ui_color = '#f0eee4'[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.gcs.GCSDeleteObjectsOperator(*, bucket_name, objects=None, prefix=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[源代码]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

从 Google Cloud Storage 存储桶中删除列表中的对象或所有匹配前缀的对象。

参数
  • bucket_name (str) – 要从中删除的 GCS 存储桶的名称。

  • objects (list[str] | None) – 要删除的对象列表。这些应该是存储桶中对象的名称,不包括 gs://bucket/。

  • prefix (str | list[str] | None) – 字符串或字符串列表,用于过滤名称以其开头/它们的的对象。(已模板化)

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接前面的身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('bucket_name', 'prefix', 'objects', 'impersonation_chain')[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_start()[源代码]
class airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator(*, bucket, entity, role, user_project=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[源代码]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的存储桶上创建一个新的 ACL 条目。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:GCSBucketCreateAclEntryOperator

参数
  • bucket (str) – 存储桶的名称。

  • entity (str) – 持有权限的实体,采用以下形式之一:user-userId、user-email、group-groupId、group-email、domain-domain、project-team-projectId、allUsers、allAuthenticatedUsers。

  • role (str) – 实体的访问权限。可接受的值为:“OWNER”、“READER”、“WRITER”。

  • user_project (str | None) – (可选)为此请求计费的项目。对于请求者付费的存储桶是必需的。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接前面的身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('bucket', 'entity', 'role', 'user_project', 'impersonation_chain')[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.gcs.GCSObjectCreateAclEntryOperator(*, bucket, object_name, entity, role, generation=None, user_project=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[源代码]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定对象上创建一个新的 ACL 条目。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:GCSObjectCreateAclEntryOperator

参数
  • bucket (str) – 存储桶的名称。

  • object_name (str) – 对象的名称。有关如何对对象名称进行 URL 编码以使其路径安全的信息,请参阅:https://cloud.google.com/storage/docs/json_api/#encoding

  • entity (str) – 持有权限的实体,采用以下形式之一:user-userId、user-email、group-groupId、group-email、domain-domain、project-team-projectId、allUsers、allAuthenticatedUsers。

  • role (str) – 实体的访问权限。可接受的值为:“OWNER”、“READER”。

  • generation (int | None) – 可选。如果存在,则选择此对象的特定修订版本。

  • user_project (str | None) – (可选)为此请求计费的项目。对于请求者付费的存储桶是必需的。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接前面的身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('bucket', 'object_name', 'entity', 'generation', 'role', 'user_project', 'impersonation_chain')[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.gcs.GCSFileTransformOperator(*, source_bucket, source_object, transform_script, destination_bucket=None, destination_object=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

将数据从源 GCS 位置复制到本地文件系统上的临时位置。

根据转换脚本的指定,对该文件执行转换,并将输出上传到目标存储桶。如果未指定输出存储桶,则原始文件将被覆盖。

源文件和目标文件在本地文件系统中的位置作为第一个和第二个参数提供给转换脚本。转换脚本应从源文件读取数据,对其进行转换,并将输出写入本地目标文件。

参数
  • source_bucket (str) – 查找 source_object 的存储桶。(已模板化)

  • source_object (str) – 要从 GCS 检索的键。(已模板化)

  • destination_bucket (str | None) – 转换后上传键的存储桶。如果未提供,则将使用 source_bucket。(已模板化)

  • destination_object (str | None) – 要在 GCS 中写入的键。如果未提供,则将使用 source_object。(已模板化)

  • transform_script (str | list[str]) – 可执行转换脚本的位置,或传递给子进程的参数列表,例如 ['python', 'script.py', 10]。(已模板化)

  • gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接前面的身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('source_bucket', 'source_object', 'destination_bucket', 'destination_object',...[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_start()[source]
class airflow.providers.google.cloud.operators.gcs.GCSTimeSpanFileTransformOperator(*, source_bucket, source_prefix, source_gcp_conn_id, destination_bucket, destination_prefix, destination_gcp_conn_id, transform_script, source_impersonation_chain=None, destination_impersonation_chain=None, chunk_size=None, download_continue_on_fail=False, download_num_attempts=1, upload_continue_on_fail=False, upload_num_attempts=1, **kwargs)[source]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

复制在时间跨度内修改的对象,运行转换,并将结果上传到存储桶。

确定在特定时间跨度内在 GCS 源位置添加或修改的对象列表,将其复制到本地文件系统上的临时位置,根据转换脚本的指定对该文件运行转换,并将输出上传到目标存储桶。

另请参阅

有关如何使用此运算符的更多信息,请查看指南: GCSTimeSpanFileTransformOperator

本地文件系统中源文件和目标文件的位置作为转换脚本的第一个和第二个参数提供。时间跨度作为第三个和第四个参数以 UTC ISO 8601 字符串传递给转换脚本。

转换脚本应从源文件读取数据,对其进行转换,并将输出写入本地目标文件。

参数
  • source_bucket (str) – 要从中获取数据的存储桶。(已模板化)

  • source_prefix (str) – 前缀字符串,用于过滤名称以此前缀开头的对象。可以插入逻辑日期和时间组件。(已模板化)

  • source_gcp_conn_id (str) – 用于连接到 Google Cloud 以下载要处理的文件的连接 ID。

  • source_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟(以下载要处理的文件),或用于获取列表中最后一个帐户的 access_token 的链接帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予直接前一个身份,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

  • destination_bucket (str) – 用于写入数据的存储桶。(已模板化)

  • destination_prefix (str) – 上传位置的前缀字符串。可以插值逻辑日期和时间组件。(已模板化)

  • destination_gcp_conn_id (str) – 用于连接 Google Cloud 以上传已处理文件的连接 ID。

  • destination_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟(以上传已处理的文件),或用于获取列表中最后一个帐户的 access_token 的链接帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予直接前一个身份,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

  • transform_script (str | list[str]) – 可执行转换脚本的位置,或传递给子进程的参数列表,例如 ['python', 'script.py', 10]。(已模板化)

  • chunk_size (int | None) – 下载或上传数据块的大小(以字节为单位)。这必须是 256 KB 的倍数(根据 Google Cloud Storage API 规范)。

  • download_continue_on_fail (bool | None) – 如果将其设置为 true,则在下载失败时,任务不会出错,但仍会继续。

  • upload_chunk_size – 上传数据块的大小(以字节为单位)。这必须是 256 KB 的倍数(根据 Google Cloud Storage API 规范)。

  • upload_continue_on_fail (bool | None) – 如果将其设置为 true,则在上传失败时,任务不会出错,但仍会继续。

  • upload_num_attempts (int) – 尝试上传单个文件的次数。

template_fields: collections.abc.Sequence[str] = ('source_bucket', 'source_prefix', 'destination_bucket', 'destination_prefix',...[source]
static interpolate_prefix(prefix, dt)[source]

使用日期时间插值前缀。

参数
  • prefix (str) – 要插值的前缀

  • dt (datetime.datetime) – 要插值的日期时间

execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(task_instance)[source]

将 on_complete 实现为 execute() 解析对象前缀。

class airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator(*, bucket_name, force=True, gcp_conn_id='google_cloud_default', impersonation_chain=None, user_project=None, **kwargs)[source]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

从 Google Cloud Storage 中删除存储桶。

另请参阅

有关如何使用此运算符的更多信息,请查看指南:删除存储桶

参数
  • bucket_name (str) – 将被删除的存储桶的名称

  • force (bool) – 如果设置为 false,则不允许删除非空存储桶;如果设置为 force=True,则允许删除非空存储桶

  • gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接前面的身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

  • user_project (str | None) – (可选) 用于此请求计费的项目标识符。对于请求者付费存储桶是必需的。

template_fields: collections.abc.Sequence[str] = ('bucket_name', 'gcp_conn_id', 'impersonation_chain', 'user_project')[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.gcs.GCSSynchronizeBucketsOperator(*, source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, delete_extra_files=False, allow_overwrite=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

同步 Google Cloud Services 中存储桶或存储桶目录的内容。

参数 source_objectdestination_object 描述根同步目录。如果未传递它们,则将同步整个存储桶。它们应该指向目录。

注意

不支持同步单个文件。只能同步整个目录。

另请参阅

有关如何使用此运算符的更多信息,请查看指南:GCSSynchronizeBucketsOperator

参数
  • source_bucket (str) – 包含源对象的存储桶的名称。

  • destination_bucket (str) – 包含目标对象的存储桶的名称。

  • source_object (str | None) – 源存储桶中的根同步目录。

  • destination_object (str | None) – 目标存储桶中的根同步目录。

  • recursive (bool) – 如果为 True,将考虑子目录。

  • allow_overwrite (bool) – 如果为 True,则在发现不匹配的文件时将覆盖文件。默认情况下,不允许覆盖文件。

  • delete_extra_files (bool) –

    如果为 True,则从源中删除目标中不存在的额外文件。默认情况下,不删除额外文件。

    注意

    如果您指定了错误的源/目标组合,此选项可能会快速删除数据。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据进行模拟,或获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须授予直接前面的身份“服务帐户令牌创建者”IAM 角色,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('source_bucket', 'destination_bucket', 'source_object', 'destination_object', 'recursive',...[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

此条目是否对您有帮助?