Amazon Redshift 数据¶
Amazon Redshift 管理着数据仓库的设置、操作和扩展的所有工作:配置容量、监控和备份集群,以及对 Amazon Redshift 引擎应用补丁和升级。您可以专注于使用您的数据来为您的业务和客户获取新的见解。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 的安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,不进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果要使用与 botocore 使用的不同的 CA 证书包,可以指定此参数。
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免节流异常、超时等。
{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None
注意
指定一个空字典
{}
,将覆盖 botocore.config.Config 的连接配置
操作符¶
在 Amazon Redshift 集群上执行语句¶
使用 RedshiftDataOperator
对 Amazon Redshift 集群执行语句。
这与 RedshiftSQLOperator
的不同之处在于,它允许用户通过 AWS API 查询和检索数据,并避免了使用 Postgres 连接的必要性。
tests/system/amazon/aws/example_redshift.py
create_table_redshift_data = RedshiftDataOperator(
task_id="create_table_redshift_data",
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=[
"""
CREATE TABLE IF NOT EXISTS fruit (
fruit_id INTEGER,
name VARCHAR NOT NULL,
color VARCHAR NOT NULL
);
"""
],
poll_interval=POLL_INTERVAL,
wait_for_completion=True,
)
在执行多个语句时重用会话¶
在上游任务中指定 session_keep_alive_seconds
参数。在下游任务中,从 XCom 获取会话 ID 并将其传递给 session_id
参数。当您使用临时表时,这非常有用。
tests/system/amazon/aws/example_redshift.py
create_tmp_table_data_api = RedshiftDataOperator(
task_id="create_tmp_table_data_api",
cluster_identifier=redshift_cluster_identifier,
database=DB_NAME,
db_user=DB_LOGIN,
sql=[
"""
CREATE TEMPORARY TABLE tmp_people (
id INTEGER,
first_name VARCHAR(100),
age INTEGER
);
"""
],
poll_interval=POLL_INTERVAL,
wait_for_completion=True,
session_keep_alive_seconds=600,
)
insert_data_reuse_session = RedshiftDataOperator(
task_id="insert_data_reuse_session",
sql=[
"INSERT INTO tmp_people VALUES ( 1, 'Bob', 30);",
"INSERT INTO tmp_people VALUES ( 2, 'Alice', 35);",
"INSERT INTO tmp_people VALUES ( 3, 'Charlie', 40);",
],
poll_interval=POLL_INTERVAL,
wait_for_completion=True,
session_id="{{ task_instance.xcom_pull(task_ids='create_tmp_table_data_api', key='session_id') }}",
)