SQL 运算符

这些运算符对 SQL 数据库执行各种查询,包括列级和表级数据质量检查。

执行 SQL 查询

使用 SQLExecuteQueryOperator 对不同的数据库运行 SQL 查询。运算符的参数是

  • sql - 要执行的单个字符串、字符串列表或指向模板文件的字符串;

  • autocommit (可选)如果为 True,则自动提交每个命令(默认值:False);

  • parameters (可选)用于渲染 SQL 查询的参数。

  • handler (可选)将应用于游标的函数。如果它是 None,则不会返回结果(默认值:fetch_all_handler)。

  • split_statements (可选)如果将单个 SQL 字符串拆分为语句并分别运行(默认值:False)。

  • return_last (可选)取决于 split_statements,如果它是 True,则此参数用于仅返回最后一个语句或所有拆分语句的结果(默认值:True)。

下面的示例显示了如何实例化 SQLExecuteQueryOperator 任务。

tests/system/providers/common/sql/example_sql_execute_query.py[源代码]

execute_query = SQLExecuteQueryOperator(
    task_id="execute_query",
    sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
    split_statements=True,
    return_last=False,
)

检查 SQL 表列

使用 SQLColumnCheckOperator 对给定表的列运行数据质量检查。除了连接 ID 和表之外,还必须提供描述列与要运行的测试之间关系的 column_mapping。一个示例列映射是一组三个嵌套字典,如下所示

column_mapping = {
    "col_name": {
        "null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
        "min": {
            "greater_than": 5,
            "leq_to": 10,
            "tolerance": 0.2,
        },
        "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
    }
}

其中 col_name 是要对其运行检查的列的名称,其字典中的每个条目都是一个检查。有效的检查是

  • null_check:检查列中 NULL 值的数量

  • distinct_check:检查列中不同值的 COUNT

  • unique_check:根据行数检查列中不同值的數量

  • min:检查列中的最小值

  • max:检查列中的最大值

检查字典中的每个条目都是检查成功的条件、容差或分区子句。成功的条件是

  • greater_than

  • geq_to

  • less_than

  • leq_to

  • equal_to

指定条件时,equal_to 与其他条件不兼容。可以在同一个检查中指定下限和上限条件。容差是结果可能超出界限但仍被视为成功的百分比。

分区子句可以在运算符级别作为对所有检查进行分区的参数给出,也可以在列级别作为对该列的所有检查进行分区的 column_mapping 中给出,也可以在列的检查级别作为仅对该检查进行分区的参数给出。

如果未使用提供的连接中的数据库,也可以指定数据库。

accept_none 参数默认为 true,它会将查询返回的 None 值转换为 0,从而允许空表返回有效的整数。

下面的示例演示了如何实例化 SQLColumnCheckOperator 任务。

tests/system/providers/common/sql/example_sql_column_table_check.py[源代码]

column_check = SQLColumnCheckOperator(
    task_id="column_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    column_mapping={
        "id": {
            "null_check": {
                "equal_to": 0,
                "tolerance": 0,
            },
            "distinct_check": {
                "equal_to": 1,
            },
        }
    },
)

检查 SQL 表值

使用 SQLTableCheckOperator 对给定表运行数据质量检查。除了连接 ID 和表之外,还必须提供描述表与要运行的测试之间关系的 checks 字典。一个示例 checks 参数是一组两个嵌套字典,如下所示

checks = (
    {
        "row_count_check": {
            "check_statement": "COUNT(*) = 1000",
        },
        "column_sum_check": {
            "check_statement": "col_a + col_b < col_c",
            "partition_clause": "col_a IS NOT NULL",
        },
    },
)

第一组键是检查名称,它们在运算符构建的模板化查询中被引用。检查名称下的字典键必须包含 check_statement,其值为解析为布尔值的 SQL 语句(这可以是任何在 airflow.operators.sql.parse_boolean 中解析为布尔值的字符串或整数)。可以提供的另一个键是 partition_clause,它是一个检查级别语句,将使用 WHERE 子句对该检查的表中的数据进行分区。此语句与参数 partition_clause 兼容,后者过滤所有检查。

下面的示例演示了如何实例化 SQLTableCheckOperator 任务。

tests/system/providers/common/sql/example_sql_column_table_check.py[源代码]

row_count_check = SQLTableCheckOperator(
    task_id="row_count_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    checks={
        "row_count_check": {
            "check_statement": "COUNT(*) = 1",
        }
    },
)

此条目有帮助吗?