Amazon S3 到 SQL¶
使用 S3ToSqlOperator
传输将数据从 Amazon Simple Storage Service (S3) 文件复制到现有的 SQL 表中。通过提供应用于下载文件的解析器函数,此操作符可以接受各种文件格式。
先决任务¶
要使用这些操作符,您必须执行以下操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow™ 安装
设置连接.
操作符¶
Amazon S3 到 SQL 传输操作符¶
要获取有关此操作符的更多信息,请访问: S3ToSqlOperator
使用 csv 文件解析器的用法示例。此解析器将文件加载到内存中并返回一行列表
#
# This operator requires a parser method. The Parser should take a filename as input
# and return an iterable of rows.
# This example parser uses the builtin csv library and returns a list of rows
#
def parse_csv_to_list(filepath):
import csv
with open(filepath, newline="") as file:
return list(csv.reader(file))
transfer_s3_to_sql = S3ToSqlOperator(
task_id="transfer_s3_to_sql",
s3_bucket=s3_bucket_name,
s3_key=s3_key,
table=SQL_TABLE_NAME,
column_list=SQL_COLUMN_LIST,
parser=parse_csv_to_list,
sql_conn_id=conn_id_name,
)
使用返回生成器的解析器函数的用法示例。
#
# As the parser can return any kind of iterator, a generator is also allowed.
# This example parser returns a generator which prevents python from loading
# the whole file into memory.
#
def parse_csv_to_generator(filepath):
import csv
with open(filepath, newline="") as file:
yield from csv.reader(file)
transfer_s3_to_sql_generator = S3ToSqlOperator(
task_id="transfer_s3_to_sql_paser_to_generator",
s3_bucket=s3_bucket_name,
s3_key=s3_key,
table=SQL_TABLE_NAME,
column_list=SQL_COLUMN_LIST,
parser=parse_csv_to_generator,
sql_conn_id=conn_id_name,
)