airflow.providers.amazon.aws.operators.s3

此模块包含 AWS S3 运算符。

模块内容

S3CreateBucketOperator

此运算符创建一个 S3 存储桶。

S3DeleteBucketOperator

此运算符删除一个 S3 存储桶。

S3GetBucketTaggingOperator

此运算符从 S3 存储桶获取标记。

S3PutBucketTaggingOperator

此运算符为 S3 存储桶添加标记。

S3DeleteBucketTaggingOperator

此运算符从 S3 存储桶删除标记。

S3CopyObjectOperator

创建已存储在 S3 中的对象的副本。

S3CreateObjectOperator

data(字符串或字节)创建一个新对象。

S3DeleteObjectsOperator

允许用户使用单个 HTTP 请求从存储桶中删除单个或多个对象。

S3FileTransformOperator

将数据从源 S3 位置复制到本地文件系统上的临时位置。

S3ListOperator

列出存储桶中具有给定字符串前缀的所有对象。

S3ListPrefixesOperator

列出存储桶中具有给定字符串前缀的所有子文件夹。

属性

BUCKET_DOES_NOT_EXIST_MSG

airflow.providers.amazon.aws.operators.s3.BUCKET_DOES_NOT_EXIST_MSG = "名称为: %s 的存储桶不存在"[源代码]
class airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator(*, bucket_name, aws_conn_id='aws_default', region_name=None, **kwargs)[源代码]

基类:airflow.models.BaseOperator

此运算符创建一个 S3 存储桶。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:创建 Amazon S3 存储桶

参数
  • bucket_name (str) – 这是您要创建的存储桶名称

  • aws_conn_id (str | None) – 用于 AWS 凭据的 Airflow 连接。如果此项为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

  • region_name (str | None) – AWS region_name。如果未指定,则从连接中获取。

template_fields: collections.abc.Sequence[str] = ('bucket_name',)[源代码]
execute(context)[源代码]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.amazon.aws.operators.s3.S3DeleteBucketOperator(bucket_name, force_delete=False, aws_conn_id='aws_default', **kwargs)[源代码]

基类:airflow.models.BaseOperator

此运算符删除一个 S3 存储桶。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:删除 Amazon S3 存储桶

参数
  • bucket_name (str) – 这是您要删除的存储桶名称

  • force_delete (bool) – 在删除存储桶之前,强制删除存储桶中的所有对象

  • aws_conn_id (str | None) – 用于 AWS 凭据的 Airflow 连接。如果此项为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

template_fields: collections.abc.Sequence[str] = ('bucket_name',)[源代码]
execute(context)[源代码]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.amazon.aws.operators.s3.S3GetBucketTaggingOperator(bucket_name, aws_conn_id='aws_default', **kwargs)[源代码]

基类:airflow.models.BaseOperator

此运算符从 S3 存储桶获取标记。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:获取 Amazon S3 存储桶的标签

参数
  • bucket_name (str) – 这是您要引用的存储桶名称

  • aws_conn_id (str | None) – 用于 AWS 凭据的 Airflow 连接。如果此项为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

template_fields: collections.abc.Sequence[str] = ('bucket_name',)[source]
execute(context)[source]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.amazon.aws.operators.s3.S3PutBucketTaggingOperator(bucket_name, key=None, value=None, tag_set=None, aws_conn_id='aws_default', **kwargs)[source]

基类:airflow.models.BaseOperator

此运算符为 S3 存储桶添加标记。

另请参阅

有关如何使用此运算符的更多信息,请查看以下指南: 设置 Amazon S3 存储桶的标签

参数
  • bucket_name (str) – 要添加标签的存储桶的名称。

  • key (str | None) – 要添加的标签的键/值对中的键部分。如果提供了键,则还必须提供值。

  • value (str | None) – 要添加的标签的键/值对中的值部分。如果提供了值,则还必须提供键。

  • tag_set (dict | list[dict[str, str]] | None) – 包含标签的字典或键/值对的列表。

  • aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此项为 None 或为空,则使用默认的 boto3 行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

template_fields: collections.abc.Sequence[str] = ('bucket_name',)[source]
template_fields_renderers[source]
execute(context)[source]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.amazon.aws.operators.s3.S3DeleteBucketTaggingOperator(bucket_name, aws_conn_id='aws_default', **kwargs)[source]

基类:airflow.models.BaseOperator

此运算符从 S3 存储桶删除标记。

另请参阅

有关如何使用此运算符的更多信息,请查看以下指南: 删除 Amazon S3 存储桶的标签

参数
  • bucket_name (str) – 这是要删除标签的存储桶的名称。

  • aws_conn_id (str | None) – 用于 AWS 凭据的 Airflow 连接。如果此项为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

template_fields: collections.abc.Sequence[str] = ('bucket_name',)[source]
execute(context)[source]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.amazon.aws.operators.s3.S3CopyObjectOperator(*, source_bucket_key, dest_bucket_key, source_bucket_name=None, dest_bucket_name=None, source_version_id=None, aws_conn_id='aws_default', verify=None, acl_policy=None, meta_data_directive=None, **kwargs)[source]

基类:airflow.models.BaseOperator

创建已存储在 S3 中的对象的副本。

注意:此处使用的 S3 连接需要有权访问源和目标存储桶/密钥。

另请参阅

有关如何使用此运算符的更多信息,请查看以下指南: 复制 Amazon S3 对象

参数
  • source_bucket_key (str) –

    源对象的键。(已模板化)

    它可以是完整的 s3:// 样式 URL 或从根级别起的相对路径。

    当指定为完整的 s3:// URL 时,请省略 source_bucket_name。

  • dest_bucket_key (str) –

    要复制到的对象的键。(已模板化)

    指定 dest_bucket_key 的约定与 source_bucket_key 相同。

  • source_bucket_name (str | None) –

    源对象所在的 S3 存储桶的名称。(已模板化)

    source_bucket_key 作为完整的 s3:// URL 提供时,应将其省略。

  • dest_bucket_name (str | None) –

    要将对象复制到的 S3 存储桶的名称。(已模板化)

    dest_bucket_key 作为完整的 s3:// URL 提供时,应将其省略。

  • source_version_id (str | None) – 源对象的版本 ID(可选)

  • aws_conn_id (str | None) – 要使用的 S3 连接的连接 ID

  • verify (str | bool | None) –

    是否验证 S3 连接的 SSL 证书。默认情况下,SSL 证书会经过验证。

    您可以提供以下值

    • False:不验证 SSL 证书。仍然会使用 SSL,

      但不会验证 SSL 证书。

    • path/to/cert/bundle.pem:要使用的 CA 证书捆绑包的文件名。

      如果要使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,则可以指定此参数。

  • acl_policy (str | None) – 指定上传到 S3 存储桶的文件的预置 ACL 策略的字符串。

  • meta_data_directive (str | None) – 是否从源对象 COPY 元数据,还是用请求中提供的元数据 REPLACE 元数据。

template_fields: collections.abc.Sequence[str] = ('source_bucket_key', 'dest_bucket_key', 'source_bucket_name', 'dest_bucket_name')[源代码]
execute(context)[源代码]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_start()[源代码]
class airflow.providers.amazon.aws.operators.s3.S3CreateObjectOperator(*, s3_bucket=None, s3_key, data, replace=False, encrypt=False, acl_policy=None, encoding=None, compression=None, aws_conn_id='aws_default', verify=None, **kwargs)[源代码]

基类:airflow.models.BaseOperator

data(字符串或字节)创建一个新对象。

另请参阅

有关如何使用此运算符的更多信息,请查看指南:创建 Amazon S3 对象

参数
  • s3_bucket (str | None) – 要在其中保存对象的 S3 存储桶的名称。(已模板化)当 s3_key 作为完整的 s3:// url 提供时,应省略它。

  • s3_key (str) – 要创建的对象的键。(已模板化)它可以是完整的 s3:// 样式的 url 或从根级别开始的相对路径。当指定为完整的 s3:// url 时,请省略 s3_bucket

  • data (str | bytes) – 要保存为内容的字符串或字节。

  • replace (bool) – 如果为 True,则如果密钥已存在,它将覆盖该密钥

  • encrypt (bool) – 如果为 True,则该文件将在服务器端由 S3 加密,并以加密形式存储在 S3 中。

  • acl_policy (str | None) – 指定上传到 S3 存储桶的文件的预置 ACL 策略的字符串。

  • encoding (str | None) – 字符串到字节的编码。仅当 data 以字符串形式提供时,才应指定它。

  • compression (str | None) – 要使用的压缩类型,目前仅支持 gzip。仅当 data 以字符串形式提供时,才可指定它。

  • aws_conn_id (str | None) – 要使用的 S3 连接的连接 ID

  • verify (str | bool | None) –

    是否验证 S3 连接的 SSL 证书。默认情况下,SSL 证书会经过验证。

    您可以提供以下值

    • False:不验证 SSL 证书。仍然会使用 SSL,

      但不会验证 SSL 证书。

    • path/to/cert/bundle.pem:要使用的 CA 证书捆绑包的文件名。

      如果要使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,则可以指定此参数。

template_fields: collections.abc.Sequence[str] = ('s3_bucket', 's3_key', 'data')[源代码]
execute(context)[源代码]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_start()[源代码]
class airflow.providers.amazon.aws.operators.s3.S3DeleteObjectsOperator(*, bucket, keys=None, prefix=None, from_datetime=None, to_datetime=None, aws_conn_id='aws_default', verify=None, **kwargs)[源代码]

基类:airflow.models.BaseOperator

允许用户使用单个 HTTP 请求从存储桶中删除单个或多个对象。

另请参阅

有关如何使用此运算符的更多信息,请查看指南:删除 Amazon S3 对象

参数
  • bucket (str) – 您将在其中删除对象的存储桶的名称。(已模板化)

  • keys (str | list | None) –

    要从 S3 存储桶中删除的键。(已模板化)

    keys 是字符串时,它应该是要删除的单个对象的键名。

    keys 是列表时,它应该是要删除的键的列表。

  • prefix (str | None) – 要删除的对象的前缀。(已模板化)存储桶中与此前缀匹配的所有对象都将被删除。

  • from_datetime (datetime.datetime | str | None) – 要删除的对象的 LastModified Date 的较大值。(已模板化)所有 LastModified Date 大于存储桶中此日期时间的对象都将被删除。

  • to_datetime (datetime.datetime | str | None) – 要删除的对象的 LastModified Date 的较小值。(已模板化)所有 LastModified Date 小于存储桶中此日期时间的对象都将被删除。

  • aws_conn_id (str | None) – 要使用的 S3 连接的连接 ID

  • verify (str | bool | None) –

    是否验证 S3 连接的 SSL 证书。默认情况下,SSL 证书会经过验证。

    您可以提供以下值

    • False:不验证 SSL 证书。仍将使用 SSL,

      但不会验证 SSL 证书。

    • path/to/cert/bundle.pem:要使用的 CA 证书包的文件名。

      如果要使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,则可以指定此参数。

template_fields: collections.abc.Sequence[str] = ('keys', 'bucket', 'prefix', 'from_datetime', 'to_datetime')[source]
execute(context)[source]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(task_instance)[source]

实现 _on_complete,因为对象键在 execute() 中解析。

class airflow.providers.amazon.aws.operators.s3.S3FileTransformOperator(*, source_s3_key, dest_s3_key, transform_script=None, select_expression=None, select_expr_serialization_config=None, script_args=None, source_aws_conn_id='aws_default', source_verify=None, dest_aws_conn_id='aws_default', dest_verify=None, replace=False, **kwargs)[source]

基类:airflow.models.BaseOperator

将数据从源 S3 位置复制到本地文件系统上的临时位置。

按照转换脚本的规定,对此文件运行转换,并将输出上传到目标 S3 位置。

本地文件系统中源文件和目标文件的位置作为转换脚本的第一个和第二个参数提供。转换脚本应从源读取数据,对其进行转换并将输出写入本地目标文件。然后,操作员将接管控制并将本地目标文件上传到 S3。

S3 Select 也可用于筛选源内容。如果指定了 S3 Select 表达式,用户可以省略转换脚本。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南:转换 Amazon S3 对象

参数
  • source_s3_key (str) – 要从 S3 检索的键。(已模板化)

  • dest_s3_key (str) – 要从 S3 写入的键。(已模板化)

  • transform_script (str | None) – 可执行转换脚本的位置

  • select_expression – S3 Select 表达式

  • select_expr_serialization_config (dict[str, dict[str, dict]] | None) – 一个字典,其中包含 S3 Select 的输入和输出序列化配置。

  • script_args (collections.abc.Sequence[str] | None) – 转换脚本的参数(已模板化)

  • source_aws_conn_id (str | None) – 源 S3 连接

  • source_verify (bool | str | None) –

    是否验证 S3 连接的 SSL 证书。默认情况下,会验证 SSL 证书。您可以提供以下值

    • False:不验证 SSL 证书。仍将使用 SSL

      (除非 use_ssl 为 False),但不会验证 SSL 证书。

    • path/to/cert/bundle.pem:要使用的 CA 证书包的文件名。

      如果要使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,则可以指定此参数。

    这也适用于 dest_verify

  • dest_aws_conn_id (str | None) – 目标 S3 连接

  • dest_verify (bool | str | None) – 是否验证 S3 连接的 SSL 证书。请参阅:source_verify

  • replace (bool) – 如果目标 S3 键已存在,则替换它

template_fields: collections.abc.Sequence[str] = ('source_s3_key', 'dest_s3_key', 'script_args')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
ui_color = '#f9c915'[source]
execute(context)[source]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_start()[source]
class airflow.providers.amazon.aws.operators.s3.S3ListOperator(*, bucket, prefix='', delimiter='', aws_conn_id='aws_default', verify=None, apply_wildcard=False, **kwargs)[source]

基类:airflow.models.BaseOperator

列出存储桶中具有给定字符串前缀的所有对象。

此操作符返回一个包含对象名称的 Python 列表,该列表可以在下游任务中被 xcom 使用。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:列出 Amazon S3 对象

参数
  • bucket (str) – 查找对象的 S3 存储桶。(可使用模板)

  • prefix (str) – 用于过滤名称以此前缀开头的对象的前缀字符串。(可使用模板)

  • delimiter (str) – 分隔符标记键层次结构。(可使用模板)

  • aws_conn_id (str | None) – 连接到 S3 存储时使用的连接 ID。

  • verify (str | bool | None) –

    是否验证 S3 连接的 SSL 证书。默认情况下,会验证 SSL 证书。您可以提供以下值

    • False:不验证 SSL 证书。仍将使用 SSL

      (除非 use_ssl 为 False),但不会验证 SSL 证书。

    • path/to/cert/bundle.pem:要使用的 CA 证书包的文件名。

      如果要使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,则可以指定此参数。

  • apply_wildcard (bool) – 是否将 ‘*’ 视为前缀中的通配符或纯符号。

示例:

以下操作符将列出 data 存储桶中 customers/2018/04/ 键下的所有文件(不包括子文件夹)。

s3_file = S3ListOperator(
    task_id="list_3s_files",
    bucket="data",
    prefix="customers/2018/04/",
    delimiter="/",
    aws_conn_id="aws_customers_conn",
)
template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter')[source]
ui_color = '#ffd700'[source]
execute(context)[source]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.amazon.aws.operators.s3.S3ListPrefixesOperator(*, bucket, prefix, delimiter, aws_conn_id='aws_default', verify=None, **kwargs)[source]

基类:airflow.models.BaseOperator

列出存储桶中具有给定字符串前缀的所有子文件夹。

此操作符返回一个包含所有子文件夹名称的 Python 列表,该列表可以在下游任务中被 xcom 使用。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:列出 Amazon S3 前缀

参数
  • bucket (str) – 查找子文件夹的 S3 存储桶。(可使用模板)

  • prefix (str) – 用于过滤名称以此前缀开头的子文件夹的前缀字符串。(可使用模板)

  • delimiter (str) – 分隔符标记子文件夹层次结构。(可使用模板)

  • aws_conn_id (str | None) – 连接到 S3 存储时使用的连接 ID。

  • verify (str | bool | None) –

    是否验证 S3 连接的 SSL 证书。默认情况下,会验证 SSL 证书。您可以提供以下值

    • False:不验证 SSL 证书。仍将使用 SSL

      (除非 use_ssl 为 False),但不会验证 SSL 证书。

    • path/to/cert/bundle.pem:要使用的 CA 证书包的文件名。

      如果要使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,则可以指定此参数。

示例:

以下操作符将列出 data 存储桶中 customers/2018/04/ 前缀下的所有子文件夹。

s3_file = S3ListPrefixesOperator(
    task_id="list_s3_prefixes",
    bucket="data",
    prefix="customers/2018/04/",
    delimiter="/",
    aws_conn_id="aws_customers_conn",
)
template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter')[source]
ui_color = '#ffd700'[source]
execute(context)[source]

在创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

此条目是否有帮助?