TeradataOperator

TeradataOperator 的目的是定义涉及与 Teradata 交互的任务。

要在 Teradata 中执行任意 SQL,请使用 TeradataOperator

使用 TeradataOperator 的常见数据库操作

创建 Teradata 数据库表

TeradataOperator 的示例用法如下

tests/system/providers/teradata/example_teradata.py[源代码]

create_table = TeradataOperator(
    task_id="create_table",
    sql=r"""
    CREATE TABLE Country (
        country_id INTEGER,
        name CHAR(25),
        continent CHAR(25)
    );
    """,
)

您还可以使用外部文件来执行 SQL 命令。外部文件必须与 DAG.py 文件位于同一级别。这样,您可以轻松地将 SQL 查询与代码分开维护。

tests/system/providers/teradata/example_teradata.py[源代码]

    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )

您的 dags/create_table.sql 应如下所示

  -- create Users table
  CREATE TABLE Users, FALLBACK (
    username   varchar(50),
    description           varchar(256)
);

将数据插入 Teradata 数据库表

然后,我们可以创建一个 TeradataOperator 任务,该任务填充 Users 表。

tests/system/providers/teradata/example_teradata.py[源代码]

    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )

从 Teradata 数据库表中获取记录

从 Teradata 数据库表中获取记录可以像下面这样简单

tests/system/providers/teradata/example_teradata.py[源代码]

    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"SELECT * FROM Country;",
    )

将参数传递到 TeradataOperator

TeradataOperator 提供 parameters 属性,它可以在运行时将值动态注入到 SQL 请求中。

找到亚洲大陆的国家

tests/system/providers/teradata/example_teradata.py[源代码]

    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
        params={"column": "continent", "value": "Asia"},
    )

删除 Teradata 数据库表

然后,我们可以创建一个 TeradataOperator 任务,删除 Users 表。

tests/system/providers/teradata/example_teradata.py[源代码]

    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"DROP TABLE Users;",
        dag=dag,
    )

完整的 Teradata Operator DAG

当我们将所有内容放在一起时,我们的 DAG 应如下所示

tests/system/providers/teradata/example_teradata.py[源代码]



ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    create_table = TeradataOperator(
        task_id="create_table",
        sql=r"""
        CREATE TABLE Country (
            country_id INTEGER,
            name CHAR(25),
            continent CHAR(25)
        );
        """,
    )
    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )
    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )
    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"SELECT * FROM Country;",
    )
    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
        params={"column": "continent", "value": "Asia"},
    )
    drop_country_table = TeradataOperator(
        task_id="drop_country_table",
        sql=r"DROP TABLE Country;",
        dag=dag,
    )
    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"DROP TABLE Users;",
        dag=dag,
    )
    create_schema = TeradataOperator(
        task_id="create_schema",
        sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
    )
    create_table_with_schema = TeradataOperator(
        task_id="create_table_with_schema",
        sql=r"""
        CREATE TABLE schema_table (
           country_id INTEGER,
           name CHAR(25),
           continent CHAR(25)
        );
        """,
        schema="airflow_temp",
    )
    drop_schema_table = TeradataOperator(
        task_id="drop_schema_table",
        sql=r"DROP TABLE schema_table;",
        dag=dag,
        schema="airflow_temp",
    )
    drop_schema = TeradataOperator(
        task_id="drop_schema",
        sql=r"DROP DATABASE airflow_temp;",
        dag=dag,
    )
    (
        create_table
        >> create_table_from_external_file
        >> populate_table
        >> get_all_countries
        >> get_countries_from_continent
        >> drop_country_table
        >> drop_users_table
        >> create_schema
        >> create_table_with_schema
        >> drop_schema_table
        >> drop_schema
    )

TeradataStoredProcedureOperator

TeradataStoredProcedureOperator 的目的是定义涉及执行 Teradata 存储过程的任务。

在 Teradata 数据库中执行存储过程

要在 Teradata 中执行存储过程,请使用 TeradataStoredProcedureOperator

假设数据库中存在一个如下所示的存储过程

REPLACE PROCEDURE TEST_PROCEDURE (
    IN val_in INTEGER,
    INOUT val_in_out INTEGER,
    OUT val_out INTEGER,
    OUT value_str_out varchar(100)
)
    BEGIN
        set val_out = val_in * 2;
        set val_in_out = val_in_out * 4;
        set value_str_out = 'string output';
    END;
/

此存储过程采用一个整数参数 val_in 作为输入。它使用一个单一的 inout 参数 val_in_out 进行操作,该参数既作为输入又作为输出。此外,它返回一个整数参数 val_out 和一个字符串参数 value_str_out。

可以使用 TeradataStoredProcedureOperator 以多种方式调用此存储过程。

一种方法涉及将参数按位置作为列表传递,并将输出参数指定为 Python 数据类型

tests/system/providers/teradata/example_teradata_call_sp.py[源代码]

    opr_sp_types = TeradataStoredProcedureOperator(
        task_id="opr_sp_types",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, int, str],
    )

或者,可以将参数按位置作为列表传递,并将输出参数指定为占位符

tests/system/providers/teradata/example_teradata_call_sp.py[源代码]

    opr_sp_place_holder = TeradataStoredProcedureOperator(
        task_id="opr_sp_place_holder",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, "?", "?"],
    )

另一种方法需要将参数按位置作为字典传递

tests/system/providers/teradata/example_teradata_call_sp.py[源代码]

    opr_sp_dict = TeradataStoredProcedureOperator(
        task_id="opr_sp_dict",
        procedure="TEST_PROCEDURE",
        parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
    )

假设数据库中存在一个如下所示的存储过程

REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
   BEGIN
       -- Assign current timestamp to the OUT parameter
       SET out_timestamp = CURRENT_TIMESTAMP;
   END;
 /

此存储过程生成一个单一的时间戳参数 out_timestamp,并且可以通过 TeradataStoredProcedureOperator 调用,其中参数按位置作为列表传递

tests/system/providers/teradata/example_teradata_call_sp.py[源代码]

    opr_sp_timestamp = TeradataStoredProcedureOperator(
        task_id="opr_sp_timestamp",
        procedure="GetTimestampOutParameter",
        parameters=["?"],
    )

假设数据库中存在一个如下所示的存储过程

REPLACE PROCEDURE
TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER)
  BEGIN
    DECLARE cur1 CURSOR WITH RETURN FOR SELECT * from DBC.DBCINFO ORDER BY 1 ;
    DECLARE cur2 CURSOR WITH RETURN FOR SELECT infodata, infokey from DBC.DBCINFO order by 1 ;
    open cur1 ;
    open cur2 ;
    set val_out = val_in * 2;
  END;
/

此存储过程将一个整数参数 val_in 作为输入,并生成一个整数参数 val_out。此外,它还生成两个游标,表示选择查询的输出。可以使用 TeradataStoredProcedureOperator 调用此存储过程,其中参数按位置作为列表传递

tests/system/providers/teradata/example_teradata_call_sp.py[源代码]

    create_sp_param_dr = TeradataOperator(
        task_id="create_sp_param_dr",
        sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
                dynamic result sets 2
                begin
                    declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
                    declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
                    open cur1 ;
                    open cur2 ;
                    set p2 = p1 + p2 ;
                    set p3 = p1 * p2 ;
                end ;
            """,
    )

完整的 TeradataStoredProcedureOperator DAG

当我们将所有内容放在一起时,我们的 DAG 应如下所示

tests/system/providers/teradata/example_teradata_call_sp.py[源代码]

CONN_ID = "teradata_sp_call"
DAG_ID = "example_teradata_call_sp"

with DAG(
    dag_id=DAG_ID,
    max_active_runs=1,
    max_active_tasks=3,
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
    schedule="@once",
    start_date=datetime(2023, 1, 1),
) as dag:
    create_sp_in_inout = TeradataOperator(
        task_id="create_sp_in_inout",
        sql=r"""REPLACE PROCEDURE TEST_PROCEDURE (
                    IN val_in INTEGER,
                    INOUT val_in_out INTEGER,
                    OUT val_out INTEGER,
                    OUT value_str_out varchar(100)
                )
                BEGIN
                    set val_out = val_in * 2;
                    set val_in_out = val_in_out * 4;
                    set value_str_out = 'string output';
                END;
            """,
    )
    opr_sp_types = TeradataStoredProcedureOperator(
        task_id="opr_sp_types",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, int, str],
    )
    opr_sp_place_holder = TeradataStoredProcedureOperator(
        task_id="opr_sp_place_holder",
        procedure="TEST_PROCEDURE",
        parameters=[3, 1, "?", "?"],
    )
    opr_sp_dict = TeradataStoredProcedureOperator(
        task_id="opr_sp_dict",
        procedure="TEST_PROCEDURE",
        parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
    )
    create_sp_timestamp = TeradataOperator(
        task_id="create_sp_timestamp",
        sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
                    BEGIN
                        -- Assign current timestamp to the OUT parameter
                        SET out_timestamp = CURRENT_TIMESTAMP;
                    END;
                 """,
    )
    opr_sp_timestamp = TeradataStoredProcedureOperator(
        task_id="opr_sp_timestamp",
        procedure="GetTimestampOutParameter",
        parameters=["?"],
    )
    create_sp_param_dr = TeradataOperator(
        task_id="create_sp_param_dr",
        sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
                dynamic result sets 2
                begin
                    declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
                    declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
                    open cur1 ;
                    open cur2 ;
                    set p2 = p1 + p2 ;
                    set p3 = p1 * p2 ;
                end ;
            """,
    )
    opr_sp_param_dr = TeradataStoredProcedureOperator(
        task_id="opr_sp_param_dr",
        procedure="examplestoredproc",
        parameters=[3, 2, int],
    )
    drop_sp = TeradataOperator(
        task_id="drop_sp",
        sql=r"drop procedure examplestoredproc;",
    )
    drop_sp_test = TeradataOperator(
        task_id="drop_sp_test",
        sql=r"drop procedure TEST_PROCEDURE;",
    )
    drop_sp_timestamp = TeradataOperator(
        task_id="drop_sp_timestamp",
        sql=r"drop procedure GetTimestampOutParameter;",
    )
    (
        create_sp_in_inout
        >> opr_sp_types
        >> opr_sp_dict
        >> opr_sp_place_holder
        >> create_sp_param_dr
        >> opr_sp_param_dr
        >> drop_sp
        >> drop_sp_test
        >> create_sp_timestamp
        >> opr_sp_timestamp
        >> drop_sp_timestamp
    )

此条目是否有用?