Amazon S3

Amazon Simple Storage Service (Amazon S3) 是互联网存储服务。 您可以使用 Amazon S3 随时随地从 Web 存储和检索任意数量的数据。

先决条件任务

要使用这些操作符,您必须执行以下几项操作

操作符

创建一个 Amazon S3 存储桶

要创建 Amazon S3 存储桶,可以使用 S3CreateBucketOperator

tests/system/amazon/aws/example_s3.py

create_bucket = S3CreateBucketOperator(
    task_id="create_bucket",
    bucket_name=bucket_name,
)

删除一个 Amazon S3 存储桶

要删除 Amazon S3 存储桶,可以使用 S3DeleteBucketOperator

tests/system/amazon/aws/example_s3.py

delete_bucket = S3DeleteBucketOperator(
    task_id="delete_bucket",
    bucket_name=bucket_name,
    force_delete=True,
)

设置 Amazon S3 存储桶的标签

要设置 Amazon S3 存储桶的标签,可以使用 S3PutBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

put_tagging = S3PutBucketTaggingOperator(
    task_id="put_tagging",
    bucket_name=bucket_name,
    key=TAG_KEY,
    value=TAG_VALUE,
)

获取 Amazon S3 存储桶的标签

要获取与 Amazon S3 存储桶关联的标签集,可以使用 S3GetBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

get_tagging = S3GetBucketTaggingOperator(
    task_id="get_tagging",
    bucket_name=bucket_name,
)

删除 Amazon S3 存储桶的标签

要删除 Amazon S3 存储桶的标签,可以使用 S3DeleteBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

delete_tagging = S3DeleteBucketTaggingOperator(
    task_id="delete_tagging",
    bucket_name=bucket_name,
)

创建一个 Amazon S3 对象

要创建新的(或替换)Amazon S3 对象,可以使用 S3CreateObjectOperator

tests/system/amazon/aws/example_s3.py

create_object = S3CreateObjectOperator(
    task_id="create_object",
    s3_bucket=bucket_name,
    s3_key=key,
    data=DATA,
    replace=True,
)

复制一个 Amazon S3 对象

要将 Amazon S3 对象从一个存储桶复制到另一个存储桶,可以使用 S3CopyObjectOperator。此处使用的 Amazon S3 连接需要有权访问源存储桶/键和目标存储桶/键。

tests/system/amazon/aws/example_s3.py

copy_object = S3CopyObjectOperator(
    task_id="copy_object",
    source_bucket_name=bucket_name,
    dest_bucket_name=bucket_name_2,
    source_bucket_key=key,
    dest_bucket_key=key_2,
)

删除 Amazon S3 对象

要删除一个或多个 Amazon S3 对象,可以使用 S3DeleteObjectsOperator

tests/system/amazon/aws/example_s3.py

delete_objects = S3DeleteObjectsOperator(
    task_id="delete_objects",
    bucket=bucket_name_2,
    keys=key_2,
)

转换一个 Amazon S3 对象

要转换来自一个 Amazon S3 对象的数据并将其保存到另一个对象,可以使用 S3FileTransformOperator。您还可以应用可选的 Amazon S3 Select 表达式,使用 select_expressionsource_s3_key 中选择要检索的数据。

tests/system/amazon/aws/example_s3.py

file_transform = S3FileTransformOperator(
    task_id="file_transform",
    source_s3_key=f"s3://{bucket_name}/{key}",
    dest_s3_key=f"s3://{bucket_name_2}/{key_2}",
    # Use `cp` command as transform script as an example
    transform_script="cp",
    replace=True,
)

列出 Amazon S3 前缀

要列出 Amazon S3 存储桶中的所有 Amazon S3 前缀,可以使用 S3ListPrefixesOperator。有关 Amazon S3 前缀的更多信息,请参阅此处

tests/system/amazon/aws/example_s3.py

list_prefixes = S3ListPrefixesOperator(
    task_id="list_prefixes",
    bucket=bucket_name,
    prefix=PREFIX,
    delimiter=DELIMITER,
)

列出 Amazon S3 对象

要列出 Amazon S3 存储桶中的所有 Amazon S3 对象,可以使用 S3ListOperator。您可以指定一个 prefix 来筛选名称以此前缀开头的对象。

tests/system/amazon/aws/example_s3.py

list_keys = S3ListOperator(
    task_id="list_keys",
    bucket=bucket_name,
    prefix=PREFIX,
)

传感器

等待 Amazon S3 键

要等待一个或多个键出现在 Amazon S3 存储桶中,可以使用 S3KeySensor。对于每个键,它调用 head_object API(如果 wildcard_matchTrue,则调用 list_objects_v2 API),以检查它是否存在。请记住,尤其是在用于检查大量键时,它会为每个键进行一次 API 调用。

检查一个文件

tests/system/amazon/aws/example_s3.py

# Check if a file exists
sensor_one_key = S3KeySensor(
    task_id="sensor_one_key",
    bucket_name=bucket_name,
    bucket_key=key,
)

检查多个文件

tests/system/amazon/aws/example_s3.py

# Check if both files exist
sensor_two_keys = S3KeySensor(
    task_id="sensor_two_keys",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
)

使用正则表达式检查文件

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex = S3KeySensor(
    task_id="sensor_key_with_regex", bucket_name=bucket_name, bucket_key=key_regex_pattern, use_regex=True
)

要使用其他自定义检查,您可以定义一个接收匹配的 S3 对象属性列表并返回布尔值的函数

  • True:满足特定条件

  • False:不满足条件

此函数会为作为参数在 bucket_key 中传递的每个键调用。此函数的参数是对象列表的原因是,当 wildcard_matchTrue 时,多个文件可以匹配一个键。匹配的 S3 对象属性列表仅包含大小,格式如下

[{"Size": int}]

tests/system/amazon/aws/example_s3.py

def check_fn(files: list, **kwargs) -> bool:
    """
    Example of custom check: check if all files are bigger than ``20 bytes``

    :param files: List of S3 object attributes.
    :return: true if the criteria is met
    """
    return all(f.get("Size", 0) > 20 for f in files)

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
    task_id="sensor_key_with_function",
    bucket_name=bucket_name,
    bucket_key=key,
    check_fn=check_fn,
)

您还可以通过将参数 deferrable 设置为 True,在可延期模式下运行此操作符。 这将导致有效地利用 Airflow 工作线程,因为作业状态的轮询会在触发器上异步进行。 请注意,这需要您的 Airflow 部署中提供触发器。

检查一个文件

tests/system/amazon/aws/example_s3.py

# Check if a file exists
sensor_one_key_deferrable = S3KeySensor(
    task_id="sensor_one_key_deferrable",
    bucket_name=bucket_name,
    bucket_key=key,
    deferrable=True,
)

检查多个文件

tests/system/amazon/aws/example_s3.py

# Check if both files exist
sensor_two_keys_deferrable = S3KeySensor(
    task_id="sensor_two_keys_deferrable",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
    deferrable=True,
)

使用正则表达式检查文件

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex_deferrable = S3KeySensor(
    task_id="sensor_key_with_regex_deferrable",
    bucket_name=bucket_name,
    bucket_key=key_regex_pattern,
    use_regex=True,
    deferrable=True,
)

等待 Amazon S3 前缀更改

要检查 Amazon S3 存储桶中特定前缀的对象数量是否发生变化,并等待不活动期过去,且对象数量没有增加,可以使用 S3KeysUnchangedSensor。 请注意,此传感器在重新调度模式下不会正常运行,因为在重新调度的调用之间,Amazon S3 存储桶中列出的对象的状态将会丢失。

tests/system/amazon/aws/example_s3.py

sensor_keys_unchanged = S3KeysUnchangedSensor(
    task_id="sensor_keys_unchanged",
    bucket_name=bucket_name_2,
    prefix=PREFIX,
    inactivity_period=10,  # inactivity_period in seconds
)

您还可以通过将参数 deferrable 设置为 True,在可延期模式下运行此传感器。 这将导致有效地利用 Airflow 工作线程,因为作业状态的轮询会在触发器上异步进行。 请注意,这需要您的 Airflow 部署中提供触发器。

此条目是否有帮助?