SQL 运算符¶
这些运算符对 SQL 数据库执行各种查询,包括列级和表级数据质量检查。
执行 SQL 查询¶
使用 SQLExecuteQueryOperator
对不同的数据库运行 SQL 查询。运算符的参数是
sql
- 要执行的单个字符串、字符串列表或指向模板文件的字符串;autocommit
(可选)如果为 True,则自动提交每个命令(默认值:False);parameters
(可选)用于渲染 SQL 查询的参数。handler
(可选)将应用于游标的函数。如果它是None
,则不会返回结果(默认值:fetch_all_handler)。split_statements
(可选)如果将单个 SQL 字符串拆分为语句并分别运行(默认值:False)。return_last
(可选)取决于split_statements
,如果它是True
,则此参数用于仅返回最后一个语句或所有拆分语句的结果(默认值:True)。
下面的示例显示了如何实例化 SQLExecuteQueryOperator 任务。
execute_query = SQLExecuteQueryOperator(
task_id="execute_query",
sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
split_statements=True,
return_last=False,
)
检查 SQL 表列¶
使用 SQLColumnCheckOperator
对给定表的列运行数据质量检查。除了连接 ID 和表之外,还必须提供描述列与要运行的测试之间关系的 column_mapping。一个示例列映射是一组三个嵌套字典,如下所示
column_mapping = {
"col_name": {
"null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
"min": {
"greater_than": 5,
"leq_to": 10,
"tolerance": 0.2,
},
"max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
}
}
其中 col_name 是要对其运行检查的列的名称,其字典中的每个条目都是一个检查。有效的检查是
null_check:检查列中 NULL 值的数量
distinct_check:检查列中不同值的 COUNT
unique_check:根据行数检查列中不同值的數量
min:检查列中的最小值
max:检查列中的最大值
检查字典中的每个条目都是检查成功的条件、容差或分区子句。成功的条件是
greater_than
geq_to
less_than
leq_to
equal_to
指定条件时,equal_to 与其他条件不兼容。可以在同一个检查中指定下限和上限条件。容差是结果可能超出界限但仍被视为成功的百分比。
分区子句可以在运算符级别作为对所有检查进行分区的参数给出,也可以在列级别作为对该列的所有检查进行分区的 column_mapping 中给出,也可以在列的检查级别作为仅对该检查进行分区的参数给出。
如果未使用提供的连接中的数据库,也可以指定数据库。
accept_none 参数默认为 true,它会将查询返回的 None 值转换为 0,从而允许空表返回有效的整数。
下面的示例演示了如何实例化 SQLColumnCheckOperator 任务。
column_check = SQLColumnCheckOperator(
task_id="column_check",
table=AIRFLOW_DB_METADATA_TABLE,
column_mapping={
"id": {
"null_check": {
"equal_to": 0,
"tolerance": 0,
},
"distinct_check": {
"equal_to": 1,
},
}
},
)
检查 SQL 表值¶
使用 SQLTableCheckOperator
对给定表运行数据质量检查。除了连接 ID 和表之外,还必须提供描述表与要运行的测试之间关系的 checks 字典。一个示例 checks 参数是一组两个嵌套字典,如下所示
checks = (
{
"row_count_check": {
"check_statement": "COUNT(*) = 1000",
},
"column_sum_check": {
"check_statement": "col_a + col_b < col_c",
"partition_clause": "col_a IS NOT NULL",
},
},
)
第一组键是检查名称,它们在运算符构建的模板化查询中被引用。检查名称下的字典键必须包含 check_statement,其值为解析为布尔值的 SQL 语句(这可以是任何在 airflow.operators.sql.parse_boolean 中解析为布尔值的字符串或整数)。可以提供的另一个键是 partition_clause,它是一个检查级别语句,将使用 WHERE 子句对该检查的表中的数据进行分区。此语句与参数 partition_clause 兼容,后者过滤所有检查。
下面的示例演示了如何实例化 SQLTableCheckOperator 任务。
row_count_check = SQLTableCheckOperator(
task_id="row_count_check",
table=AIRFLOW_DB_METADATA_TABLE,
checks={
"row_count_check": {
"check_statement": "COUNT(*) = 1",
}
},
)