TeradataOperator¶
TeradataOperator 的目的是定义涉及与 Teradata 交互的任务。
要在 Teradata 中执行任意 SQL,请使用 TeradataOperator
。
使用 TeradataOperator 的常见数据库操作¶
创建 Teradata 数据库表¶
TeradataOperator 的示例用法如下
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
您还可以使用外部文件来执行 SQL 命令。外部文件必须与 DAG.py 文件位于同一级别。这样,您可以轻松地将 SQL 查询与代码分开维护。
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
您的 dags/create_table.sql
应如下所示
-- create Users table
CREATE TABLE Users, FALLBACK (
username varchar(50),
description varchar(256)
);
将数据插入 Teradata 数据库表¶
然后,我们可以创建一个 TeradataOperator 任务,该任务填充 Users
表。
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
从 Teradata 数据库表中获取记录¶
从 Teradata 数据库表中获取记录可以像下面这样简单
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"SELECT * FROM Country;",
)
将参数传递到 TeradataOperator¶
TeradataOperator 提供 parameters
属性,它可以在运行时将值动态注入到 SQL 请求中。
找到亚洲大陆的国家
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
params={"column": "continent", "value": "Asia"},
)
删除 Teradata 数据库表¶
然后,我们可以创建一个 TeradataOperator 任务,删除 Users
表。
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"DROP TABLE Users;",
dag=dag,
)
完整的 Teradata Operator DAG¶
当我们将所有内容放在一起时,我们的 DAG 应如下所示
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"SELECT * FROM Country;",
)
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
params={"column": "continent", "value": "Asia"},
)
drop_country_table = TeradataOperator(
task_id="drop_country_table",
sql=r"DROP TABLE Country;",
dag=dag,
)
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"DROP TABLE Users;",
dag=dag,
)
create_schema = TeradataOperator(
task_id="create_schema",
sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
)
create_table_with_schema = TeradataOperator(
task_id="create_table_with_schema",
sql=r"""
CREATE TABLE schema_table (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
schema="airflow_temp",
)
drop_schema_table = TeradataOperator(
task_id="drop_schema_table",
sql=r"DROP TABLE schema_table;",
dag=dag,
schema="airflow_temp",
)
drop_schema = TeradataOperator(
task_id="drop_schema",
sql=r"DROP DATABASE airflow_temp;",
dag=dag,
)
(
create_table
>> create_table_from_external_file
>> populate_table
>> get_all_countries
>> get_countries_from_continent
>> drop_country_table
>> drop_users_table
>> create_schema
>> create_table_with_schema
>> drop_schema_table
>> drop_schema
)
TeradataStoredProcedureOperator¶
TeradataStoredProcedureOperator 的目的是定义涉及执行 Teradata 存储过程的任务。
在 Teradata 数据库中执行存储过程¶
要在 Teradata 中执行存储过程,请使用 TeradataStoredProcedureOperator
。
假设数据库中存在一个如下所示的存储过程
REPLACE PROCEDURE TEST_PROCEDURE ( IN val_in INTEGER, INOUT val_in_out INTEGER, OUT val_out INTEGER, OUT value_str_out varchar(100) ) BEGIN set val_out = val_in * 2; set val_in_out = val_in_out * 4; set value_str_out = 'string output'; END; /
此存储过程采用一个整数参数 val_in 作为输入。它使用一个单一的 inout 参数 val_in_out 进行操作,该参数既作为输入又作为输出。此外,它返回一个整数参数 val_out 和一个字符串参数 value_str_out。
可以使用 TeradataStoredProcedureOperator
以多种方式调用此存储过程。
一种方法涉及将参数按位置作为列表传递,并将输出参数指定为 Python 数据类型
opr_sp_types = TeradataStoredProcedureOperator(
task_id="opr_sp_types",
procedure="TEST_PROCEDURE",
parameters=[3, 1, int, str],
)
或者,可以将参数按位置作为列表传递,并将输出参数指定为占位符
opr_sp_place_holder = TeradataStoredProcedureOperator(
task_id="opr_sp_place_holder",
procedure="TEST_PROCEDURE",
parameters=[3, 1, "?", "?"],
)
另一种方法需要将参数按位置作为字典传递
opr_sp_dict = TeradataStoredProcedureOperator(
task_id="opr_sp_dict",
procedure="TEST_PROCEDURE",
parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
)
假设数据库中存在一个如下所示的存储过程
REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP) BEGIN -- Assign current timestamp to the OUT parameter SET out_timestamp = CURRENT_TIMESTAMP; END; /
此存储过程生成一个单一的时间戳参数 out_timestamp,并且可以通过 TeradataStoredProcedureOperator
调用,其中参数按位置作为列表传递
opr_sp_timestamp = TeradataStoredProcedureOperator(
task_id="opr_sp_timestamp",
procedure="GetTimestampOutParameter",
parameters=["?"],
)
假设数据库中存在一个如下所示的存储过程
REPLACE PROCEDURE TEST_PROCEDURE (IN val_in INTEGER, OUT val_out INTEGER) BEGIN DECLARE cur1 CURSOR WITH RETURN FOR SELECT * from DBC.DBCINFO ORDER BY 1 ; DECLARE cur2 CURSOR WITH RETURN FOR SELECT infodata, infokey from DBC.DBCINFO order by 1 ; open cur1 ; open cur2 ; set val_out = val_in * 2; END; /
此存储过程将一个整数参数 val_in 作为输入,并生成一个整数参数 val_out。此外,它还生成两个游标,表示选择查询的输出。可以使用 TeradataStoredProcedureOperator
调用此存储过程,其中参数按位置作为列表传递
create_sp_param_dr = TeradataOperator(
task_id="create_sp_param_dr",
sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
dynamic result sets 2
begin
declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
open cur1 ;
open cur2 ;
set p2 = p1 + p2 ;
set p3 = p1 * p2 ;
end ;
""",
)
完整的 TeradataStoredProcedureOperator DAG¶
当我们将所有内容放在一起时,我们的 DAG 应如下所示
CONN_ID = "teradata_sp_call"
DAG_ID = "example_teradata_call_sp"
with DAG(
dag_id=DAG_ID,
max_active_runs=1,
max_active_tasks=3,
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
schedule="@once",
start_date=datetime(2023, 1, 1),
) as dag:
create_sp_in_inout = TeradataOperator(
task_id="create_sp_in_inout",
sql=r"""REPLACE PROCEDURE TEST_PROCEDURE (
IN val_in INTEGER,
INOUT val_in_out INTEGER,
OUT val_out INTEGER,
OUT value_str_out varchar(100)
)
BEGIN
set val_out = val_in * 2;
set val_in_out = val_in_out * 4;
set value_str_out = 'string output';
END;
""",
)
opr_sp_types = TeradataStoredProcedureOperator(
task_id="opr_sp_types",
procedure="TEST_PROCEDURE",
parameters=[3, 1, int, str],
)
opr_sp_place_holder = TeradataStoredProcedureOperator(
task_id="opr_sp_place_holder",
procedure="TEST_PROCEDURE",
parameters=[3, 1, "?", "?"],
)
opr_sp_dict = TeradataStoredProcedureOperator(
task_id="opr_sp_dict",
procedure="TEST_PROCEDURE",
parameters={"val_in": 3, "val_in_out": 1, "val_out": int, "str_out": str},
)
create_sp_timestamp = TeradataOperator(
task_id="create_sp_timestamp",
sql=r"""REPLACE PROCEDURE GetTimestampOutParameter (OUT out_timestamp TIMESTAMP)
BEGIN
-- Assign current timestamp to the OUT parameter
SET out_timestamp = CURRENT_TIMESTAMP;
END;
""",
)
opr_sp_timestamp = TeradataStoredProcedureOperator(
task_id="opr_sp_timestamp",
procedure="GetTimestampOutParameter",
parameters=["?"],
)
create_sp_param_dr = TeradataOperator(
task_id="create_sp_param_dr",
sql=r"""replace procedure examplestoredproc (in p1 integer, inout p2 integer, out p3 integer)
dynamic result sets 2
begin
declare cur1 cursor with return for select * from dbc.dbcinfo order by 1 ;
declare cur2 cursor with return for select infodata, infokey from dbc.dbcinfo order by 1 ;
open cur1 ;
open cur2 ;
set p2 = p1 + p2 ;
set p3 = p1 * p2 ;
end ;
""",
)
opr_sp_param_dr = TeradataStoredProcedureOperator(
task_id="opr_sp_param_dr",
procedure="examplestoredproc",
parameters=[3, 2, int],
)
drop_sp = TeradataOperator(
task_id="drop_sp",
sql=r"drop procedure examplestoredproc;",
)
drop_sp_test = TeradataOperator(
task_id="drop_sp_test",
sql=r"drop procedure TEST_PROCEDURE;",
)
drop_sp_timestamp = TeradataOperator(
task_id="drop_sp_timestamp",
sql=r"drop procedure GetTimestampOutParameter;",
)
(
create_sp_in_inout
>> opr_sp_types
>> opr_sp_dict
>> opr_sp_place_holder
>> create_sp_param_dr
>> opr_sp_param_dr
>> drop_sp
>> drop_sp_test
>> create_sp_timestamp
>> opr_sp_timestamp
>> drop_sp_timestamp
)