Amazon Redshift 数据

Amazon Redshift 管理着数据仓库的设置、操作和扩展的所有工作:配置容量、监控和备份集群,以及对 Amazon Redshift 引擎应用补丁和升级。您可以专注于使用您的数据来为您的业务和客户获取新的见解。

先决条件任务

要使用这些操作符,您必须执行以下几项操作

通用参数

aws_conn_id

引用 Amazon Web Services 连接 ID。如果此参数设置为 None,则使用默认的 boto3 行为,不进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果要使用与 botocore 使用的不同的 CA 证书包,可以指定此参数。

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None

botocore_config

提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免节流异常、超时等。

示例,有关参数的更多详细信息,请查看 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None

注意

指定一个空字典 {},将覆盖 botocore.config.Config 的连接配置

操作符

在 Amazon Redshift 集群上执行语句

使用 RedshiftDataOperator 对 Amazon Redshift 集群执行语句。

这与 RedshiftSQLOperator 的不同之处在于,它允许用户通过 AWS API 查询和检索数据,并避免了使用 Postgres 连接的必要性。

tests/system/amazon/aws/example_redshift.py

create_table_redshift_data = RedshiftDataOperator(
    task_id="create_table_redshift_data",
    cluster_identifier=redshift_cluster_identifier,
    database=DB_NAME,
    db_user=DB_LOGIN,
    sql=[
        """
        CREATE TABLE IF NOT EXISTS fruit (
        fruit_id INTEGER,
        name VARCHAR NOT NULL,
        color VARCHAR NOT NULL
        );
    """
    ],
    poll_interval=POLL_INTERVAL,
    wait_for_completion=True,
)

在执行多个语句时重用会话

在上游任务中指定 session_keep_alive_seconds 参数。在下游任务中,从 XCom 获取会话 ID 并将其传递给 session_id 参数。当您使用临时表时,这非常有用。

tests/system/amazon/aws/example_redshift.py

create_tmp_table_data_api = RedshiftDataOperator(
    task_id="create_tmp_table_data_api",
    cluster_identifier=redshift_cluster_identifier,
    database=DB_NAME,
    db_user=DB_LOGIN,
    sql=[
        """
        CREATE TEMPORARY TABLE tmp_people (
        id INTEGER,
        first_name VARCHAR(100),
        age INTEGER
        );
    """
    ],
    poll_interval=POLL_INTERVAL,
    wait_for_completion=True,
    session_keep_alive_seconds=600,
)

insert_data_reuse_session = RedshiftDataOperator(
    task_id="insert_data_reuse_session",
    sql=[
        "INSERT INTO tmp_people VALUES ( 1, 'Bob', 30);",
        "INSERT INTO tmp_people VALUES ( 2, 'Alice', 35);",
        "INSERT INTO tmp_people VALUES ( 3, 'Charlie', 40);",
    ],
    poll_interval=POLL_INTERVAL,
    wait_for_completion=True,
    session_id="{{ task_instance.xcom_pull(task_ids='create_tmp_table_data_api', key='session_id') }}",
)

此条目是否有帮助?