airflow.providers.google.cloud.transfers.s3_to_gcs

模块内容

S3ToGCSOperator

将 S3 密钥(可能是一个前缀)与 Google Cloud Storage 目标路径同步。

class airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator(*, bucket, prefix='', apply_gcs_prefix=False, delimiter='', aws_conn_id='aws_default', verify=None, gcp_conn_id='google_cloud_default', dest_gcs=None, replace=False, gzip=False, google_impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=10, **kwargs)[源代码]

基类: airflow.providers.amazon.aws.operators.s3.S3ListOperator

将 S3 密钥(可能是一个前缀)与 Google Cloud Storage 目标路径同步。

另请参阅

有关如何使用此操作符的更多信息,请查看以下指南: 将数据从 Amazon S3 传输到 Google Cloud Storage

参数
  • bucket – 要查找对象的 S3 存储桶。(已模板化)

  • prefix – 前缀字符串,用于筛选名称以此前缀开头的对象。(已模板化)

  • apply_gcs_prefix

    (可选)是否用给定的 GCS 目标路径替换源对象的路径。如果 apply_gcs_prefix 为 False(默认),则来自 S3 的对象将被复制到 GCS 存储桶中的给定 GSC 路径,并且源路径将放置在内部。例如,<s3_bucket><s3_prefix><content> => <gcs_prefix><s3_prefix><content>

    如果 apply_gcs_prefix 为 True,则来自 S3 的对象将被复制到 GCS 存储桶中的给定 GCS 路径,并且源路径将被省略。例如:<s3_bucket><s3_prefix><content> => <gcs_prefix><content>

  • delimiter – 分隔符标记密钥层次结构。(已模板化)

  • aws_conn_id – 源 S3 连接

  • verify

    是否验证 S3 连接的 SSL 证书。默认情况下,会验证 SSL 证书。您可以提供以下值

    • False:不验证 SSL 证书。仍然会使用 SSL

      (除非 use_ssl 为 False),但不会验证 SSL 证书。

    • path/to/cert/bundle.pem:要使用的 CA 证书捆绑包的文件名。

      如果您想使用与 botocore 使用的不同的 CA 证书捆绑包,则可以指定此参数。

  • gcp_conn_id – (可选)用于连接到 Google Cloud 的连接 ID。

  • dest_gcs – 您想要存储文件的目标 Google Cloud Storage 存储桶和前缀。(已模板化)

  • replace – 是否要替换现有的目标文件。

  • gzip – 用于上传的压缩文件选项。在可延迟模式下忽略参数。

  • google_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的 Google 服务帐户,用于使用短期凭据模拟,或用于获取列表中最后一个帐户的 access_token 所需的帐户的链式列表,该帐户将在请求中模拟。如果设置为字符串,则帐户必须授予原始帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予原始帐户(已模板化)。

  • deferrable – 在可延迟模式下运行操作符

  • poll_interval (int) – 轮询作业完成之间的时间间隔(以秒为单位)。仅在可延迟模式下运行时才考虑该值。必须大于 0。

示例:

s3_to_gcs_op = S3ToGCSOperator(
    task_id="s3_to_gcs_example",
    bucket="my-s3-bucket",
    prefix="data/customers-201804",
    gcp_conn_id="google_cloud_default",
    dest_gcs="gs://my.gcs.bucket/some/customers/",
    replace=False,
    gzip=True,
    dag=my_dag,
)

请注意,bucketprefixdelimiterdest_gcs 已模板化,因此您可以根据需要使用其中的变量。

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter', 'dest_gcs', 'google_impersonation_chain')[源代码]
ui_color = '#e09411'[源代码]
transfer_job_max_files_number = 1000[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与呈现 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

exclude_existing_objects(s3_objects, gcs_hook)[source]

从列表中排除已存在于 GCS 存储桶中的对象。

s3_to_gcs_object(s3_object)[source]

根据操作符的逻辑将 S3 路径转换为 GCS 路径。

如果 apply_gcs_prefix == True,则 <s3_prefix><content> => <gcs_prefix><content> 如果 apply_gcs_prefix == False,则 <s3_prefix><content> => <gcs_prefix><s3_prefix><content>

gcs_to_s3_object(gcs_object)[source]

根据操作符的逻辑将 GCS 路径转换为 S3 路径。

如果 apply_gcs_prefix == True,则 <gcs_prefix><content> => <s3_prefix><content> 如果 apply_gcs_prefix == False,则 <gcs_prefix><s3_prefix><content> => <s3_prefix><content>

transfer_files(s3_objects, gcs_hook, s3_hook)[source]
transfer_files_async(files, gcs_hook, s3_hook)[source]

提交 Google Cloud Storage Transfer Service 作业,将文件从 AWS S3 复制到 GCS。

submit_transfer_jobs(files, gcs_hook, s3_hook)[source]
execute_complete(context, event)[source]

立即返回,并依赖触发器抛出成功事件。触发器的回调。

依赖触发器抛出异常,否则它会假设执行成功。

get_transfer_hook()[source]
get_openlineage_facets_on_start()[source]

此条目是否有帮助?