airflow.providers.amazon.aws.hooks.s3

使用 boto3 库与 AWS S3 进行交互。

模块内容

S3Hook

与 Amazon Simple Storage Service (S3) 交互。

函数

provide_bucket_name(func)

如果未将存储桶名称传递给函数,则提供从连接中获取的存储桶名称。

unify_bucket_name_and_key(func)

如果没有传递存储桶名称,并且至少传递了一个键,则统一存储桶名称和键。

属性

logger

NO_ACL

airflow.providers.amazon.aws.hooks.s3.logger[source]
airflow.providers.amazon.aws.hooks.s3.NO_ACL = 'no-acl'[source]
airflow.providers.amazon.aws.hooks.s3.provide_bucket_name(func)[source]

如果未将存储桶名称传递给函数,则提供从连接中获取的存储桶名称。

airflow.providers.amazon.aws.hooks.s3.unify_bucket_name_and_key(func)[source]

如果没有传递存储桶名称,并且至少传递了一个键,则统一存储桶名称和键。

class airflow.providers.amazon.aws.hooks.s3.S3Hook(aws_conn_id=AwsBaseHook.default_conn_name, transfer_config_args=None, extra_args=None, *args, **kwargs)[source]

基类:airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 Amazon Simple Storage Service (S3) 交互。

boto3.client("s3")boto3.resource("s3") 提供厚包装。

参数
  • transfer_config_args (dict | None) – 用于托管 S3 传输的配置对象。

  • extra_args (dict | None) – 可以传递给下载/上传操作的额外参数。

另请参阅

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#s3-transfers

  • 有关允许的上传额外参数,请参阅 boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS

  • 有关允许的下载额外参数,请参阅 boto3.s3.transfer.S3Transfer.ALLOWED_DOWNLOAD_ARGS

可以指定其他参数(例如 aws_conn_id),并且会将这些参数传递给底层的 AwsBaseHook。

property extra_args[source]

返回 hook 的额外参数(不可变)。

resource()[source]
static parse_s3_url(s3url)[source]

将 S3 Url 解析为存储桶名称和键。

有关有效 URL 格式,请参阅 https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html

参数

s3url (str) – 要解析的 S3 Url。

返回

解析后的存储桶名称和键

返回类型

tuple[str, str]

static get_s3_bucket_key(bucket, key, bucket_param_name, key_param_name)[source]

获取 S3 存储桶名称和键。

来自以下任意一项:- 存储桶名称和键。在检查 key 是否为相对路径后,按原样返回信息。- 键。必须是完整的 s3:// URL。

参数
  • bucket (str | None) – S3 存储桶名称

  • key (str) – S3 键

  • bucket_param_name (str) – 包含存储桶名称的参数名称

  • key_param_name (str) – 包含键名称的参数名称

返回

解析后的存储桶名称和键

返回类型

tuple[str, str]

check_for_bucket(bucket_name=None)[source]

检查 bucket_name 是否存在。

参数

bucket_name (str | None) – 存储桶的名称

返回

如果存在则为 True,否则为 False。

返回类型

bool

get_bucket(bucket_name=None)[source]

返回 S3.Bucket 对象。

参数

bucket_name (str | None) – 存储桶的名称

返回

存储桶名称的存储桶对象。

返回类型

mypy_boto3_s3.service_resource.Bucket

create_bucket(bucket_name=None, region_name=None)[source]

创建 Amazon S3 存储桶。

参数
  • bucket_name (str | None) – 存储桶的名称

  • region_name (str | None) – 要在其中创建存储桶的 AWS 区域的名称。

check_for_prefix(prefix, delimiter, bucket_name=None)[source]

检查存储桶中是否存在指定前缀。

参数
  • bucket_name (str | None) – 存储桶的名称

  • prefix (str) – 键前缀

  • delimiter (str) – 分隔符,用于标记键的层级结构。

返回

如果前缀在存储桶中不存在,则返回 False;如果存在,则返回 True。

返回类型

bool

list_prefixes(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]

列出存储桶中指定前缀下的所有前缀。

参数
  • bucket_name (str | None) – 存储桶的名称

  • prefix (str | None) – 键前缀

  • delimiter (str | None) – 分隔符,用于标记键的层级结构。

  • page_size (int | None) – 分页大小

  • max_items (int | None) – 要返回的最大条目数

返回

匹配的前缀列表

返回类型

list

async get_head_object_async(client, key, bucket_name=None)[source]

检索对象的元数据。

参数
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端

  • bucket_name (str | None) – 存储文件的存储桶的名称

  • key (str) – 指向文件的 S3 键

async list_prefixes_async(client, bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]

列出存储桶中指定前缀下的所有前缀。

参数
  • client (aiobotocore.client.AioBaseClient) – ClientCreatorContext

  • bucket_name (str | None) – 存储桶的名称

  • prefix (str | None) – 键前缀

  • delimiter (str | None) – 分隔符,用于标记键的层级结构。

  • page_size (int | None) – 分页大小

  • max_items (int | None) – 要返回的最大条目数

返回

匹配的前缀列表

返回类型

list[Any]

async get_file_metadata_async(client, bucket_name, key=None)[source]

异步获取与通配符表达式匹配的键在存储桶中存在的文件列表。

参数
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端

  • bucket_name (str) – 存储桶的名称

  • key (str | None) – 键的路径

async check_key_async(client, bucket, bucket_keys, wildcard_match, use_regex=False)[source]

获取与通配符表达式匹配的键的文件列表,或者获取 head 对象。

如果 wildcard_match 为 True,则异步获取存储桶中与通配符表达式匹配的键存在的文件列表,并返回布尔值。如果 wildcard_match 为 False,则从存储桶获取 head 对象并返回布尔值。

参数
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端

  • bucket (str) – 存储桶的名称

  • bucket_keys (str | list[str]) – 指向文件的 S3 键

  • wildcard_match (bool) – 键的路径

  • use_regex (bool) – 是否使用正则表达式来检查存储桶

async check_for_prefix_async(client, prefix, delimiter, bucket_name=None)[source]

检查存储桶中是否存在指定前缀。

参数
  • bucket_name (str | None) – 存储桶的名称

  • prefix (str) – 键前缀

  • delimiter (str) – 分隔符,用于标记键的层级结构。

返回

如果前缀在存储桶中不存在,则返回 False;如果存在,则返回 True。

返回类型

bool

async get_files_async(client, bucket, bucket_keys, wildcard_match, delimiter='/')[source]

获取存储桶中的文件列表。

async is_keys_unchanged_async(client, bucket_name, prefix, inactivity_period=60 * 60, min_objects=1, previous_objects=None, inactivity_seconds=0, allow_delete=True, last_activity_time=None)[source]

检查是否已上传新对象,并且该时间段已过去;相应地更新传感器状态。

参数
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端

  • bucket_name (str) – 存储桶的名称

  • prefix (str) – 键前缀

  • inactivity_period (float) – 指定键未更改的总不活动秒数。请注意,此机制不是实时的,并且此运算符可能在经过此时间段且没有感知到其他对象后,经过 poke_interval 后才会返回。

  • min_objects (int) – 键未更改传感器被视为有效所需的最少对象数。

  • previous_objects (set[str] | None) – 上次轮询期间找到的对象 ID 集。

  • inactivity_seconds (int) – 不活动的秒数

  • allow_delete (bool) – 此传感器是否应考虑在两次轮询之间被删除的对象是有效的行为。如果为 true,则发生这种情况时将记录一条警告消息。如果为 false,则会引发错误。

  • last_activity_time (datetime.datetime | None) – 上次活动时间。

list_keys(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None, start_after_key=None, from_datetime=None, to_datetime=None, object_filter=None, apply_wildcard=False)[source]

列出桶中指定前缀且不包含分隔符的键。

参数
  • bucket_name (str | None) – 存储桶的名称

  • prefix (str | None) – 键前缀

  • delimiter (str | None) – 分隔符,用于标记键的层级结构。

  • page_size (int | None) – 分页大小

  • max_items (int | None) – 要返回的最大条目数

  • start_after_key (str | None) – 应该只返回大于此键的键

  • from_datetime (datetime.datetime | None) – 应该只返回 LastModified 属性大于或等于 from_datetime 的键

  • to_datetime (datetime.datetime | None) – 应该只返回 LastModified 属性小于 to_datetime 的键

  • object_filter (Callable[Ellipsis, list] | None) – 接收 S3 对象列表、from_datetime 和 to_datetime,并返回匹配键列表的函数。

  • apply_wildcard (bool) – 是否将 ‘*’ 视为通配符,还是前缀中的普通符号。

示例:返回 LastModified 属性大于 from_datetime 并且

小于 to_datetime 的 S3 对象列表

def object_filter(
    keys: list,
    from_datetime: datetime | None = None,
    to_datetime: datetime | None = None,
) -> list:
    def _is_in_period(input_date: datetime) -> bool:
        if from_datetime is not None and input_date < from_datetime:
            return False

        if to_datetime is not None and input_date > to_datetime:
            return False
        return True

    return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
返回

一个匹配键的列表

返回类型

list

get_file_metadata(prefix, bucket_name=None, page_size=None, max_items=None)[source]

列出桶中指定前缀下的元数据对象。

参数
  • prefix (str) – 键前缀

  • bucket_name (str | None) – 存储桶的名称

  • page_size (int | None) – 分页大小

  • max_items (int | None) – 要返回的最大条目数

返回

对象的元数据列表

返回类型

list

head_object(key, bucket_name=None)[source]

检索对象的元数据。

参数
  • key (str) – 指向文件的 S3 键

  • bucket_name (str | None) – 存储文件的存储桶的名称

返回

对象的元数据

返回类型

dict | None

check_for_key(key, bucket_name=None)[source]

检查桶中是否存在键。

参数
  • key (str) – 指向文件的 S3 键

  • bucket_name (str | None) – 存储文件的存储桶的名称

返回

如果键存在,则返回 True,否则返回 False。

返回类型

bool

get_key(key, bucket_name=None)[source]

返回一个 S3.Object

参数
  • key (str) – 键的路径

  • bucket_name (str | None) – 存储桶的名称

返回

桶中的键对象

返回类型

mypy_boto3_s3.service_resource.Object

read_key(key, bucket_name=None)[source]

从 S3 读取一个键。

另请参阅

参数
  • key (str) – 指向文件的 S3 键

  • bucket_name (str | None) – 存储文件的存储桶的名称

返回

键的内容

返回类型

str

select_key(key, bucket_name=None, expression=None, expression_type=None, input_serialization=None, output_serialization=None)[source]

使用 S3 Select 读取一个键。

参数
  • key (str) – 指向文件的 S3 键

  • bucket_name (str | None) – 存储文件的存储桶的名称

  • expression (str | None) – S3 Select 表达式

  • expression_type (str | None) – S3 Select 表达式类型

  • input_serialization (dict[str, Any] | None) – S3 Select 输入数据序列化格式

  • output_serialization (dict[str, Any] | None) – S3 Select 输出数据序列化格式

返回

S3 Select 检索的原始数据子集

返回类型

str

check_for_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[source]

检查桶中是否存在与通配符表达式匹配的键。

参数
  • wildcard_key (str) – 键的路径

  • bucket_name (str | None) – 存储桶的名称

  • delimiter (str) – 分隔符标记键的层次结构

返回

如果键存在,则返回 True,否则返回 False。

返回类型

bool

get_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[source]

返回与通配符表达式匹配的 boto3.s3.Object 对象。

参数
  • wildcard_key (str) – 键的路径

  • bucket_name (str | None) – 存储桶的名称

  • delimiter (str) – 分隔符标记键的层次结构

返回

桶中的键对象,如果没有找到,则返回 None。

返回类型

mypy_boto3_s3.service_resource.Object | None

load_file(filename, key, bucket_name=None, replace=False, encrypt=False, gzip=False, acl_policy=None)[source]

将本地文件加载到 S3。

参数
  • filename (pathlib.Path | str) – 要加载的文件路径。

  • key (str) – 指向文件的 S3 键

  • bucket_name (str | None) – 存储文件的存储桶名称。

  • replace (bool) – 一个标志,用于决定是否覆盖已存在的键。如果 replace 为 False 且键已存在,则会引发错误。

  • encrypt (bool) – 如果为 True,则该文件将在服务器端由 S3 加密,并且在 S3 中静态存储时将以加密形式存储。

  • gzip (bool) – 如果为 True,则该文件将在本地压缩。

  • acl_policy (str | None) – 指定上传到 S3 存储桶的文件的预设 ACL 策略的字符串。

load_string(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding=None, acl_policy=None, compression=None)[source]

将字符串加载到 S3。

这提供了一个方便的方法,可以将字符串放入 S3。它使用 boto 基础结构将文件发送到 s3。

参数
  • string_data (str) – 要设置为键内容的字符串。

  • key (str) – 指向文件的 S3 键

  • bucket_name (str | None) – 存储文件的存储桶名称。

  • replace (bool) – 一个标志,用于决定是否覆盖已存在的键。

  • encrypt (bool) – 如果为 True,则该文件将在服务器端由 S3 加密,并且在 S3 中静态存储时将以加密形式存储。

  • encoding (str | None) – 字符串到字节的编码。

  • acl_policy (str | None) – 用于指定要上传的对象的预设 ACL 策略的字符串。

  • compression (str | None) – 要使用的压缩类型,目前仅支持 gzip。

load_bytes(bytes_data, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[source]

将字节加载到 S3。

这提供了一种方便的方法,可以将字节数据放入 S3。它使用 boto 基础结构将文件发送到 s3。

参数
  • bytes_data (bytes) – 要设置为键内容的字节。

  • key (str) – 指向文件的 S3 键

  • bucket_name (str | None) – 存储文件的存储桶名称。

  • replace (bool) – 一个标志,用于决定是否覆盖已存在的键。

  • encrypt (bool) – 如果为 True,则该文件将在服务器端由 S3 加密,并且在 S3 中静态存储时将以加密形式存储。

  • acl_policy (str | None) – 用于指定要上传的对象的预设 ACL 策略的字符串。

load_file_obj(file_obj, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[source]

将文件对象加载到 S3。

参数
  • file_obj (io.BytesIO) – 要设置为 S3 键内容的文件类对象。

  • key (str) – 指向文件的 S3 键

  • bucket_name (str | None) – 存储文件的存储桶名称。

  • replace (bool) – 一个标志,指示是否覆盖已存在的键。

  • encrypt (bool) – 如果为 True,S3 会在服务器上加密文件,并且该文件以加密形式静态存储在 S3 中。

  • acl_policy (str | None) – 用于指定要上传的对象的预设 ACL 策略的字符串。

copy_object(source_bucket_key, dest_bucket_key, source_bucket_name=None, dest_bucket_name=None, source_version_id=None, acl_policy=None, meta_data_directive=None, **kwargs)[source]

创建已存储在 S3 中的对象的副本。

注意:此处使用的 S3 连接需要同时具有对源存储桶/键和目标存储桶/键的访问权限。

参数
  • source_bucket_key (str) –

    源对象的键。

    它可以是完整的 s3:// 样式 URL 或从根级别的相对路径。

    当它指定为完整的 s3:// URL 时,请省略 source_bucket_name。

  • dest_bucket_key (str) –

    要复制到的对象的键。

    指定 dest_bucket_key 的约定与 source_bucket_key 相同。

  • source_bucket_name (str | None) –

    源对象所在的 S3 存储桶的名称。

    source_bucket_key 以完整的 s3:// URL 提供时,应省略此项。

  • dest_bucket_name (str | None) –

    将对象复制到的 S3 存储桶的名称。

    dest_bucket_key 以完整的 s3:// URL 提供时,应省略此项。

  • source_version_id (str | None) – 源对象的版本 ID(可选)

  • acl_policy (str | None) – 用于指定要复制的对象的预设 ACL 策略的字符串,默认为私有。

  • meta_data_directive (str | None) – 是否从源对象 COPY 元数据,或者用请求中提供的元数据 REPLACE 元数据。

delete_bucket(bucket_name, force_delete=False, max_retries=5)[source]

要删除 S3 存储桶,先删除所有 S3 存储桶对象,然后再删除存储桶。

参数
  • bucket_name (str) – 存储桶名称

  • force_delete (bool) – 启用此选项,即使存储桶不为空也删除存储桶

  • max_retries (int) – 存储桶必须为空才能被删除。如果 force_delete 为 true,则重试可能有助于防止删除存储桶中的对象和尝试删除存储桶之间的竞争条件。

返回

None

返回类型

None

delete_objects(bucket, keys)[source]

从存储桶中删除键。

参数
  • bucket (str) – 你要从中删除对象的存储桶的名称

  • keys (str | list) –

    要从 S3 存储桶中删除的键。

    keys 是字符串时,它应该是要删除的单个对象的键名。

    keys 是列表时,它应该是要删除的键的列表。

download_file(key, bucket_name=None, local_path=None, preserve_file_name=False, use_autogenerated_subdir=True)[source]

将文件从 S3 位置下载到本地文件系统。

注意

此函数会覆盖 S3 API 的 ‘download_file’ 方法,但它并不相同。如果要使用 S3 API 的原始方法,请使用 ‘S3Hook.get_conn().download_file()’

参数
  • key (str) – S3 中的键路径。

  • bucket_name (str | None) – 要使用的特定存储桶。

  • local_path (str | None) – 下载文件的本地路径。如果未提供路径,将使用系统的临时目录。

  • preserve_file_name (bool) – 如果你希望下载的文件名与 S3 中的文件名相同,请将此参数设置为 True。当设置为 False 时,将生成一个随机文件名。默认值:False。

  • use_autogenerated_subdir (bool) – 与 ‘preserve_file_name = True’ 配对,将文件下载到 ‘local_path’ 内的随机生成文件夹中,这对于避免多个可能下载相同文件名的任务之间的冲突非常有用。如果你不需要它,并且想要一个可预测的路径,请将其设置为 ‘False’。默认值:True。

返回

文件名。

返回类型

str

generate_presigned_url(client_method, params=None, expires_in=3600, http_method=None)[source]

给定客户端、其方法和参数,生成预签名 URL。

参数
  • client_method (str) – 要预签名的客户端方法。

  • params (dict | None) – 通常传递给 ClientMethod 的参数。

  • expires_in (int) – 预签名 URL 有效的秒数。默认情况下,它在一小时(3600 秒)后过期。

  • http_method (str | None) – 在生成的 URL 上使用的 http 方法。默认情况下,http 方法是方法模型中使用的任何方法。

返回

预签名 URL。

返回类型

str | None

get_bucket_tagging(bucket_name=None)[source]

从存储桶获取标签列表。

参数

bucket_name (str | None) – 存储桶的名称。

返回

包含标签的键/值对的列表

返回类型

list[dict[str, str]] | None

put_bucket_tagging(tag_set=None, key=None, value=None, bucket_name=None)[source]

用提供的标签覆盖现有的 TagSet;必须提供 TagSet、键/值对或两者都提供。

参数
  • tag_set (dict[str, str] | list[dict[str, str]] | None) – 包含标签的键/值对的字典,或已为 API 格式化的列表

  • key (str | None) – 新 TagSet 条目的键。

  • value (str | None) – 新 TagSet 条目的值。

  • bucket_name (str | None) – 存储桶的名称。

返回

None

返回类型

None

delete_bucket_tagging(bucket_name=None)[source]

删除存储桶中的所有标签。

参数

bucket_name (str | None) – 存储桶的名称。

返回

None

返回类型

None

这个条目有用吗?