airflow.providers.amazon.aws.hooks.s3
¶
使用 boto3 库与 AWS S3 进行交互。
模块内容¶
函数¶
|
如果未将存储桶名称传递给函数,则提供从连接中获取的存储桶名称。 |
如果没有传递存储桶名称,并且至少传递了一个键,则统一存储桶名称和键。 |
属性¶
- airflow.providers.amazon.aws.hooks.s3.provide_bucket_name(func)[source]¶
如果未将存储桶名称传递给函数,则提供从连接中获取的存储桶名称。
- airflow.providers.amazon.aws.hooks.s3.unify_bucket_name_and_key(func)[source]¶
如果没有传递存储桶名称,并且至少传递了一个键,则统一存储桶名称和键。
- class airflow.providers.amazon.aws.hooks.s3.S3Hook(aws_conn_id=AwsBaseHook.default_conn_name, transfer_config_args=None, extra_args=None, *args, **kwargs)[source]¶
基类:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
与 Amazon Simple Storage Service (S3) 交互。
为
boto3.client("s3")
和boto3.resource("s3")
提供厚包装。- 参数
另请参阅
有关允许的上传额外参数,请参阅
boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS
。有关允许的下载额外参数,请参阅
boto3.s3.transfer.S3Transfer.ALLOWED_DOWNLOAD_ARGS
。
可以指定其他参数(例如
aws_conn_id
),并且会将这些参数传递给底层的 AwsBaseHook。- static parse_s3_url(s3url)[source]¶
将 S3 Url 解析为存储桶名称和键。
有关有效 URL 格式,请参阅 https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html。
- static get_s3_bucket_key(bucket, key, bucket_param_name, key_param_name)[source]¶
获取 S3 存储桶名称和键。
来自以下任意一项:- 存储桶名称和键。在检查 key 是否为相对路径后,按原样返回信息。- 键。必须是完整的 s3:// URL。
- get_bucket(bucket_name=None)[source]¶
返回
S3.Bucket
对象。- 参数
bucket_name (str | None) – 存储桶的名称
- 返回
存储桶名称的存储桶对象。
- 返回类型
mypy_boto3_s3.service_resource.Bucket
- list_prefixes(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]¶
列出存储桶中指定前缀下的所有前缀。
- async list_prefixes_async(client, bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]¶
列出存储桶中指定前缀下的所有前缀。
- async check_key_async(client, bucket, bucket_keys, wildcard_match, use_regex=False)[source]¶
获取与通配符表达式匹配的键的文件列表,或者获取 head 对象。
如果 wildcard_match 为 True,则异步获取存储桶中与通配符表达式匹配的键存在的文件列表,并返回布尔值。如果 wildcard_match 为 False,则从存储桶获取 head 对象并返回布尔值。
- async get_files_async(client, bucket, bucket_keys, wildcard_match, delimiter='/')[source]¶
获取存储桶中的文件列表。
- async is_keys_unchanged_async(client, bucket_name, prefix, inactivity_period=60 * 60, min_objects=1, previous_objects=None, inactivity_seconds=0, allow_delete=True, last_activity_time=None)[source]¶
检查是否已上传新对象,并且该时间段已过去;相应地更新传感器状态。
- 参数
client (aiobotocore.client.AioBaseClient) – aiobotocore 客户端
bucket_name (str) – 存储桶的名称
prefix (str) – 键前缀
inactivity_period (float) – 指定键未更改的总不活动秒数。请注意,此机制不是实时的,并且此运算符可能在经过此时间段且没有感知到其他对象后,经过 poke_interval 后才会返回。
min_objects (int) – 键未更改传感器被视为有效所需的最少对象数。
inactivity_seconds (int) – 不活动的秒数
allow_delete (bool) – 此传感器是否应考虑在两次轮询之间被删除的对象是有效的行为。如果为 true,则发生这种情况时将记录一条警告消息。如果为 false,则会引发错误。
last_activity_time (datetime.datetime | None) – 上次活动时间。
- list_keys(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None, start_after_key=None, from_datetime=None, to_datetime=None, object_filter=None, apply_wildcard=False)[source]¶
列出桶中指定前缀且不包含分隔符的键。
- 参数
bucket_name (str | None) – 存储桶的名称
prefix (str | None) – 键前缀
delimiter (str | None) – 分隔符,用于标记键的层级结构。
page_size (int | None) – 分页大小
max_items (int | None) – 要返回的最大条目数
start_after_key (str | None) – 应该只返回大于此键的键
from_datetime (datetime.datetime | None) – 应该只返回 LastModified 属性大于或等于 from_datetime 的键
to_datetime (datetime.datetime | None) – 应该只返回 LastModified 属性小于 to_datetime 的键
object_filter (Callable[Ellipsis, list] | None) – 接收 S3 对象列表、from_datetime 和 to_datetime,并返回匹配键列表的函数。
apply_wildcard (bool) – 是否将 ‘*’ 视为通配符,还是前缀中的普通符号。
- 示例:返回 LastModified 属性大于 from_datetime 并且
小于 to_datetime 的 S3 对象列表
def object_filter( keys: list, from_datetime: datetime | None = None, to_datetime: datetime | None = None, ) -> list: def _is_in_period(input_date: datetime) -> bool: if from_datetime is not None and input_date < from_datetime: return False if to_datetime is not None and input_date > to_datetime: return False return True return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
- 返回
一个匹配键的列表
- 返回类型
- get_file_metadata(prefix, bucket_name=None, page_size=None, max_items=None)[source]¶
列出桶中指定前缀下的元数据对象。
- get_key(key, bucket_name=None)[source]¶
返回一个
S3.Object
。- 参数
key (str) – 键的路径
bucket_name (str | None) – 存储桶的名称
- 返回
桶中的键对象
- 返回类型
mypy_boto3_s3.service_resource.Object
- select_key(key, bucket_name=None, expression=None, expression_type=None, input_serialization=None, output_serialization=None)[source]¶
使用 S3 Select 读取一个键。
- 参数
- 返回
S3 Select 检索的原始数据子集
- 返回类型
- get_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[source]¶
返回与通配符表达式匹配的 boto3.s3.Object 对象。
- 参数
wildcard_key (str) – 键的路径
bucket_name (str | None) – 存储桶的名称
delimiter (str) – 分隔符标记键的层次结构
- 返回
桶中的键对象,如果没有找到,则返回 None。
- 返回类型
mypy_boto3_s3.service_resource.Object | None
- load_file(filename, key, bucket_name=None, replace=False, encrypt=False, gzip=False, acl_policy=None)[source]¶
将本地文件加载到 S3。
- 参数
filename (pathlib.Path | str) – 要加载的文件路径。
key (str) – 指向文件的 S3 键
bucket_name (str | None) – 存储文件的存储桶名称。
replace (bool) – 一个标志,用于决定是否覆盖已存在的键。如果 replace 为 False 且键已存在,则会引发错误。
encrypt (bool) – 如果为 True,则该文件将在服务器端由 S3 加密,并且在 S3 中静态存储时将以加密形式存储。
gzip (bool) – 如果为 True,则该文件将在本地压缩。
acl_policy (str | None) – 指定上传到 S3 存储桶的文件的预设 ACL 策略的字符串。
- load_string(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding=None, acl_policy=None, compression=None)[source]¶
将字符串加载到 S3。
这提供了一个方便的方法,可以将字符串放入 S3。它使用 boto 基础结构将文件发送到 s3。
- 参数
string_data (str) – 要设置为键内容的字符串。
key (str) – 指向文件的 S3 键
bucket_name (str | None) – 存储文件的存储桶名称。
replace (bool) – 一个标志,用于决定是否覆盖已存在的键。
encrypt (bool) – 如果为 True,则该文件将在服务器端由 S3 加密,并且在 S3 中静态存储时将以加密形式存储。
encoding (str | None) – 字符串到字节的编码。
acl_policy (str | None) – 用于指定要上传的对象的预设 ACL 策略的字符串。
compression (str | None) – 要使用的压缩类型,目前仅支持 gzip。
- load_bytes(bytes_data, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[source]¶
将字节加载到 S3。
这提供了一种方便的方法,可以将字节数据放入 S3。它使用 boto 基础结构将文件发送到 s3。
- 参数
bytes_data (bytes) – 要设置为键内容的字节。
key (str) – 指向文件的 S3 键
bucket_name (str | None) – 存储文件的存储桶名称。
replace (bool) – 一个标志,用于决定是否覆盖已存在的键。
encrypt (bool) – 如果为 True,则该文件将在服务器端由 S3 加密,并且在 S3 中静态存储时将以加密形式存储。
acl_policy (str | None) – 用于指定要上传的对象的预设 ACL 策略的字符串。
- load_file_obj(file_obj, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[source]¶
将文件对象加载到 S3。
- 参数
file_obj (io.BytesIO) – 要设置为 S3 键内容的文件类对象。
key (str) – 指向文件的 S3 键
bucket_name (str | None) – 存储文件的存储桶名称。
replace (bool) – 一个标志,指示是否覆盖已存在的键。
encrypt (bool) – 如果为 True,S3 会在服务器上加密文件,并且该文件以加密形式静态存储在 S3 中。
acl_policy (str | None) – 用于指定要上传的对象的预设 ACL 策略的字符串。
- copy_object(source_bucket_key, dest_bucket_key, source_bucket_name=None, dest_bucket_name=None, source_version_id=None, acl_policy=None, meta_data_directive=None, **kwargs)[source]¶
创建已存储在 S3 中的对象的副本。
注意:此处使用的 S3 连接需要同时具有对源存储桶/键和目标存储桶/键的访问权限。
- 参数
source_bucket_key (str) –
源对象的键。
它可以是完整的 s3:// 样式 URL 或从根级别的相对路径。
当它指定为完整的 s3:// URL 时,请省略 source_bucket_name。
dest_bucket_key (str) –
要复制到的对象的键。
指定 dest_bucket_key 的约定与 source_bucket_key 相同。
source_bucket_name (str | None) –
源对象所在的 S3 存储桶的名称。
当 source_bucket_key 以完整的 s3:// URL 提供时,应省略此项。
dest_bucket_name (str | None) –
将对象复制到的 S3 存储桶的名称。
当 dest_bucket_key 以完整的 s3:// URL 提供时,应省略此项。
source_version_id (str | None) – 源对象的版本 ID(可选)
acl_policy (str | None) – 用于指定要复制的对象的预设 ACL 策略的字符串,默认为私有。
meta_data_directive (str | None) – 是否从源对象 COPY 元数据,或者用请求中提供的元数据 REPLACE 元数据。
- delete_bucket(bucket_name, force_delete=False, max_retries=5)[source]¶
要删除 S3 存储桶,先删除所有 S3 存储桶对象,然后再删除存储桶。
- 参数
bucket_name (str) – 存储桶名称
force_delete (bool) – 启用此选项,即使存储桶不为空也删除存储桶
max_retries (int) – 存储桶必须为空才能被删除。如果 force_delete 为 true,则重试可能有助于防止删除存储桶中的对象和尝试删除存储桶之间的竞争条件。
- 返回
None
- 返回类型
None
- delete_objects(bucket, keys)[source]¶
从存储桶中删除键。
- 参数
bucket (str) – 你要从中删除对象的存储桶的名称
keys (str | list) –
要从 S3 存储桶中删除的键。
当
keys
是字符串时,它应该是要删除的单个对象的键名。当
keys
是列表时,它应该是要删除的键的列表。
- download_file(key, bucket_name=None, local_path=None, preserve_file_name=False, use_autogenerated_subdir=True)[source]¶
将文件从 S3 位置下载到本地文件系统。
- 注意
此函数会覆盖 S3 API 的 ‘download_file’ 方法,但它并不相同。如果要使用 S3 API 的原始方法,请使用 ‘S3Hook.get_conn().download_file()’
- 参数
key (str) – S3 中的键路径。
bucket_name (str | None) – 要使用的特定存储桶。
local_path (str | None) – 下载文件的本地路径。如果未提供路径,将使用系统的临时目录。
preserve_file_name (bool) – 如果你希望下载的文件名与 S3 中的文件名相同,请将此参数设置为 True。当设置为 False 时,将生成一个随机文件名。默认值:False。
use_autogenerated_subdir (bool) – 与 ‘preserve_file_name = True’ 配对,将文件下载到 ‘local_path’ 内的随机生成文件夹中,这对于避免多个可能下载相同文件名的任务之间的冲突非常有用。如果你不需要它,并且想要一个可预测的路径,请将其设置为 ‘False’。默认值:True。
- 返回
文件名。
- 返回类型
- generate_presigned_url(client_method, params=None, expires_in=3600, http_method=None)[source]¶
给定客户端、其方法和参数,生成预签名 URL。
- 参数
client_method (str) – 要预签名的客户端方法。
params (dict | None) – 通常传递给 ClientMethod 的参数。
expires_in (int) – 预签名 URL 有效的秒数。默认情况下,它在一小时(3600 秒)后过期。
http_method (str | None) – 在生成的 URL 上使用的 http 方法。默认情况下,http 方法是方法模型中使用的任何方法。
- 返回
预签名 URL。
- 返回类型
str | None
- get_bucket_tagging(bucket_name=None)[source]¶
从存储桶获取标签列表。
- 参数
bucket_name (str | None) – 存储桶的名称。
- 返回
包含标签的键/值对的列表
- 返回类型
list[dict[str, str]] | None
- put_bucket_tagging(tag_set=None, key=None, value=None, bucket_name=None)[source]¶
用提供的标签覆盖现有的 TagSet;必须提供 TagSet、键/值对或两者都提供。
- 参数
tag_set (dict[str, str] | list[dict[str, str]] | None) – 包含标签的键/值对的字典,或已为 API 格式化的列表
key (str | None) – 新 TagSet 条目的键。
value (str | None) – 新 TagSet 条目的值。
bucket_name (str | None) – 存储桶的名称。
- 返回
None
- 返回类型
None