AWS DataSync¶
AWS DataSync 是一种数据传输服务,可以简化、自动化并加速在本地存储系统和 AWS 存储服务之间通过互联网或 AWS Direct Connect 移动和复制数据。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参考 Airflow® 的安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,而无需进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则将使用来自 AWS 连接额外参数 的 region_name。否则,请使用指定的值代替连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果要使用与 botocore 使用的 CA 证书包不同的证书包,则可以指定此参数。
如果此参数设置为
None
或省略,则将使用来自 AWS 连接额外参数 的 verify。否则,请使用指定的值代替连接值。默认值:None
- botocore_config
提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。
{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则将使用来自 AWS 连接额外参数 的 config_kwargs。否则,请使用指定的值代替连接值。默认值:None
注意
指定空字典
{}
将覆盖 botocore.config.Config 的连接配置
操作符¶
与 AWS DataSync 任务交互¶
您可以使用 DataSyncOperator
来查找、创建、更新、执行和删除 AWS DataSync 任务。
一旦 DataSyncOperator
识别出要运行的正确的 TaskArn(因为您指定了它,或者因为它被找到了),它将被执行。每当执行 AWS DataSync 任务时,它都会创建一个 AWS DataSync TaskExecution,由 TaskExecutionArn 标识。
TaskExecutionArn 将被监控直到完成(成功/失败),并且其状态将定期写入 Airflow 任务日志。
DataSyncOperator
支持将额外的 kwargs 可选地传递给底层的 boto3.start_task_execution()
API。这是通过 task_execution_kwargs
参数完成的。例如,这对于限制带宽或过滤包含的文件非常有用,有关更多详细信息,请参阅 boto3 Datasync 文档。
执行任务¶
要执行特定任务,您可以将 task_arn
传递给操作符。
tests/system/amazon/aws/example_datasync.py
# Execute a specific task
execute_task_by_arn = DataSyncOperator(
task_id="execute_task_by_arn",
task_arn=created_task_arn,
)
搜索并执行任务¶
要搜索任务,您可以将 source_location_uri
和 destination_location_uri
指定给操作符。如果找到一个任务,则将执行该任务。如果找到多个任务,则操作符将引发异常。要避免这种情况,您可以将 allow_random_task_choice
设置为 True
以从候选任务中随机选择。
tests/system/amazon/aws/example_datasync.py
# Search and execute a task
execute_task_by_locations = DataSyncOperator(
task_id="execute_task_by_locations",
source_location_uri=f"s3://{s3_bucket_source}/test",
destination_location_uri=f"s3://{s3_bucket_destination}/test",
# Only transfer files from /test/subdir folder
task_execution_kwargs={
"Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
},
)
创建并执行任务¶
在搜索任务时,如果没有找到任务,您可以选择在执行前创建一个任务。为此,您需要提供额外的参数 create_task_kwargs
、create_source_location_kwargs
和 create_destination_location_kwargs
。
这些额外的参数为操作符提供了一种在未找到合适的现有任务时自动创建任务和/或位置的方法。如果这些参数保留为其默认值 (None),则不会尝试创建。
此外,由于 delete_task_after_execution
设置为 True
,因此该任务将在成功完成后从 AWS DataSync 中删除。
tests/system/amazon/aws/example_datasync.py
# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
task_id="create_and_execute_task",
source_location_uri=f"s3://{s3_bucket_source}/test_create",
destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
create_task_kwargs={"Name": "Created by Airflow"},
create_source_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
create_destination_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
delete_task_after_execution=False,
)
在创建任务时,DataSyncOperator
将尝试查找和使用现有的 LocationArns,而不是创建新的 LocationArns。如果多个 LocationArns 与指定的 URI 匹配,则我们需要选择一个使用。在这种情况下,操作符的行为类似于它如何从多个任务中选择单个任务
操作符将引发异常。要避免这种情况,您可以将 allow_random_location_choice
设置为 True
以从候选位置中随机选择。