Amazon S3¶
Amazon Simple Storage Service (Amazon S3) 是互联网存储服务。 您可以使用 Amazon S3 随时随地从 Web 存储和检索任意数量的数据。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 的安装
设置连接.
操作符¶
创建一个 Amazon S3 存储桶¶
要创建 Amazon S3 存储桶,可以使用 S3CreateBucketOperator
。
tests/system/amazon/aws/example_s3.py
create_bucket = S3CreateBucketOperator(
task_id="create_bucket",
bucket_name=bucket_name,
)
删除一个 Amazon S3 存储桶¶
要删除 Amazon S3 存储桶,可以使用 S3DeleteBucketOperator
。
tests/system/amazon/aws/example_s3.py
delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
bucket_name=bucket_name,
force_delete=True,
)
设置 Amazon S3 存储桶的标签¶
要设置 Amazon S3 存储桶的标签,可以使用 S3PutBucketTaggingOperator
。
tests/system/amazon/aws/example_s3.py
put_tagging = S3PutBucketTaggingOperator(
task_id="put_tagging",
bucket_name=bucket_name,
key=TAG_KEY,
value=TAG_VALUE,
)
获取 Amazon S3 存储桶的标签¶
要获取与 Amazon S3 存储桶关联的标签集,可以使用 S3GetBucketTaggingOperator
。
tests/system/amazon/aws/example_s3.py
get_tagging = S3GetBucketTaggingOperator(
task_id="get_tagging",
bucket_name=bucket_name,
)
删除 Amazon S3 存储桶的标签¶
要删除 Amazon S3 存储桶的标签,可以使用 S3DeleteBucketTaggingOperator
。
tests/system/amazon/aws/example_s3.py
delete_tagging = S3DeleteBucketTaggingOperator(
task_id="delete_tagging",
bucket_name=bucket_name,
)
创建一个 Amazon S3 对象¶
要创建新的(或替换)Amazon S3 对象,可以使用 S3CreateObjectOperator
。
tests/system/amazon/aws/example_s3.py
create_object = S3CreateObjectOperator(
task_id="create_object",
s3_bucket=bucket_name,
s3_key=key,
data=DATA,
replace=True,
)
复制一个 Amazon S3 对象¶
要将 Amazon S3 对象从一个存储桶复制到另一个存储桶,可以使用 S3CopyObjectOperator
。此处使用的 Amazon S3 连接需要有权访问源存储桶/键和目标存储桶/键。
tests/system/amazon/aws/example_s3.py
copy_object = S3CopyObjectOperator(
task_id="copy_object",
source_bucket_name=bucket_name,
dest_bucket_name=bucket_name_2,
source_bucket_key=key,
dest_bucket_key=key_2,
)
删除 Amazon S3 对象¶
要删除一个或多个 Amazon S3 对象,可以使用 S3DeleteObjectsOperator
。
tests/system/amazon/aws/example_s3.py
delete_objects = S3DeleteObjectsOperator(
task_id="delete_objects",
bucket=bucket_name_2,
keys=key_2,
)
转换一个 Amazon S3 对象¶
要转换来自一个 Amazon S3 对象的数据并将其保存到另一个对象,可以使用 S3FileTransformOperator
。您还可以应用可选的 Amazon S3 Select 表达式,使用 select_expression
从 source_s3_key
中选择要检索的数据。
tests/system/amazon/aws/example_s3.py
file_transform = S3FileTransformOperator(
task_id="file_transform",
source_s3_key=f"s3://{bucket_name}/{key}",
dest_s3_key=f"s3://{bucket_name_2}/{key_2}",
# Use `cp` command as transform script as an example
transform_script="cp",
replace=True,
)
列出 Amazon S3 前缀¶
要列出 Amazon S3 存储桶中的所有 Amazon S3 前缀,可以使用 S3ListPrefixesOperator
。有关 Amazon S3 前缀的更多信息,请参阅此处。
tests/system/amazon/aws/example_s3.py
list_prefixes = S3ListPrefixesOperator(
task_id="list_prefixes",
bucket=bucket_name,
prefix=PREFIX,
delimiter=DELIMITER,
)
列出 Amazon S3 对象¶
要列出 Amazon S3 存储桶中的所有 Amazon S3 对象,可以使用 S3ListOperator
。您可以指定一个 prefix
来筛选名称以此前缀开头的对象。
tests/system/amazon/aws/example_s3.py
list_keys = S3ListOperator(
task_id="list_keys",
bucket=bucket_name,
prefix=PREFIX,
)
传感器¶
等待 Amazon S3 键¶
要等待一个或多个键出现在 Amazon S3 存储桶中,可以使用 S3KeySensor
。对于每个键,它调用 head_object API(如果 wildcard_match
为 True
,则调用 list_objects_v2 API),以检查它是否存在。请记住,尤其是在用于检查大量键时,它会为每个键进行一次 API 调用。
检查一个文件
tests/system/amazon/aws/example_s3.py
# Check if a file exists
sensor_one_key = S3KeySensor(
task_id="sensor_one_key",
bucket_name=bucket_name,
bucket_key=key,
)
检查多个文件
tests/system/amazon/aws/example_s3.py
# Check if both files exist
sensor_two_keys = S3KeySensor(
task_id="sensor_two_keys",
bucket_name=bucket_name,
bucket_key=[key, key_2],
)
使用正则表达式检查文件
tests/system/amazon/aws/example_s3.py
# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex = S3KeySensor(
task_id="sensor_key_with_regex", bucket_name=bucket_name, bucket_key=key_regex_pattern, use_regex=True
)
要使用其他自定义检查,您可以定义一个接收匹配的 S3 对象属性列表并返回布尔值的函数
True
:满足特定条件False
:不满足条件
此函数会为作为参数在 bucket_key
中传递的每个键调用。此函数的参数是对象列表的原因是,当 wildcard_match
为 True
时,多个文件可以匹配一个键。匹配的 S3 对象属性列表仅包含大小,格式如下
[{"Size": int}]
tests/system/amazon/aws/example_s3.py
def check_fn(files: list, **kwargs) -> bool:
"""
Example of custom check: check if all files are bigger than ``20 bytes``
:param files: List of S3 object attributes.
:return: true if the criteria is met
"""
return all(f.get("Size", 0) > 20 for f in files)
tests/system/amazon/aws/example_s3.py
# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
task_id="sensor_key_with_function",
bucket_name=bucket_name,
bucket_key=key,
check_fn=check_fn,
)
您还可以通过将参数 deferrable
设置为 True,在可延期模式下运行此操作符。 这将导致有效地利用 Airflow 工作线程,因为作业状态的轮询会在触发器上异步进行。 请注意,这需要您的 Airflow 部署中提供触发器。
检查一个文件
tests/system/amazon/aws/example_s3.py
# Check if a file exists
sensor_one_key_deferrable = S3KeySensor(
task_id="sensor_one_key_deferrable",
bucket_name=bucket_name,
bucket_key=key,
deferrable=True,
)
检查多个文件
tests/system/amazon/aws/example_s3.py
# Check if both files exist
sensor_two_keys_deferrable = S3KeySensor(
task_id="sensor_two_keys_deferrable",
bucket_name=bucket_name,
bucket_key=[key, key_2],
deferrable=True,
)
使用正则表达式检查文件
tests/system/amazon/aws/example_s3.py
# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex_deferrable = S3KeySensor(
task_id="sensor_key_with_regex_deferrable",
bucket_name=bucket_name,
bucket_key=key_regex_pattern,
use_regex=True,
deferrable=True,
)
等待 Amazon S3 前缀更改¶
要检查 Amazon S3 存储桶中特定前缀的对象数量是否发生变化,并等待不活动期过去,且对象数量没有增加,可以使用 S3KeysUnchangedSensor
。 请注意,此传感器在重新调度模式下不会正常运行,因为在重新调度的调用之间,Amazon S3 存储桶中列出的对象的状态将会丢失。
tests/system/amazon/aws/example_s3.py
sensor_keys_unchanged = S3KeysUnchangedSensor(
task_id="sensor_keys_unchanged",
bucket_name=bucket_name_2,
prefix=PREFIX,
inactivity_period=10, # inactivity_period in seconds
)
您还可以通过将参数 deferrable
设置为 True,在可延期模式下运行此传感器。 这将导致有效地利用 Airflow 工作线程,因为作业状态的轮询会在触发器上异步进行。 请注意,这需要您的 Airflow 部署中提供触发器。