Amazon S3 到 DynamoDB¶
使用 S3ToDynamoDBOperator
传输操作符将存储在 Amazon 简单存储服务 (S3) 存储桶中的数据加载到现有或新的 Amazon DynamoDB 表中。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 的安装
设置连接.
操作符¶
Amazon S3 到 DynamoDB 传输操作符¶
此操作符将数据从 Amazon S3 加载到 Amazon DynamoDB 表。它使用 Amazon DynamoDB ImportTable 服务,该服务与不同的 AWS 服务(如 Amazon S3 和 CloudWatch)进行交互。默认行为是将 S3 数据加载到新的 Amazon DynamoDB 表中。该服务目前不支持导入到现有表中。因此,该操作符使用自定义方法。它会创建一个临时的 DynamoDB 表,并将 S3 数据加载到该表中。然后,它会扫描临时的 Amazon DynamoDB 表,并将接收到的记录写入目标表。
要获取更多信息,请访问: S3ToDynamoDBOperator
示例用法
tests/system/amazon/aws/example_s3_to_dynamodb.py
transfer_1 = S3ToDynamoDBOperator(
task_id="s3_to_dynamodb",
s3_bucket=bucket_name,
s3_key=s3_key,
dynamodb_table_name=new_table_name,
input_format="CSV",
import_table_kwargs={
"InputFormatOptions": {
"Csv": {
"Delimiter": ",",
}
}
},
dynamodb_attributes=[
{"AttributeName": "cocktail_id", "AttributeType": "S"},
],
dynamodb_key_schema=[
{"AttributeName": "cocktail_id", "KeyType": "HASH"},
],
)
要将 S3 数据加载到现有的 DynamoDB 表中,请使用
tests/system/amazon/aws/example_s3_to_dynamodb.py
transfer_2 = S3ToDynamoDBOperator(
task_id="s3_to_dynamodb_new_table",
s3_bucket=bucket_name,
s3_key=s3_key,
dynamodb_table_name=existing_table_name,
use_existing_table=True,
input_format="CSV",
import_table_kwargs={
"InputFormatOptions": {
"Csv": {
"Delimiter": ",",
}
}
},
dynamodb_attributes=[
{"AttributeName": "cocktail_id", "AttributeType": "S"},
],
dynamodb_key_schema=[
{"AttributeName": "cocktail_id", "KeyType": "HASH"},
],
)