SQL 操作符

这些操作符对 SQL 数据库执行各种查询,包括列级和表级的数据质量检查。

执行 SQL 查询

使用 SQLExecuteQueryOperator 对不同的数据库运行 SQL 查询。操作符的参数有:

  • sql - 单个字符串、字符串列表或指向要执行的模板文件的字符串;

  • autocommit (可选) 如果为 True,则每个命令会自动提交 (默认值:False);

  • parameters (可选) 用于渲染 SQL 查询的参数。

  • handler (可选) 将应用于游标的函数。如果它是 None,则不会返回结果 (默认值:fetch_all_handler)。

  • split_statements (可选) 是否将单个 SQL 字符串拆分为语句并单独运行 (默认值:False)。

  • return_last (可选) 取决于 split_statements,如果它是 True,则此参数用于仅返回最后一条语句的结果还是所有拆分语句的结果 (默认值:True)。

下面的示例展示了如何实例化 SQLExecuteQueryOperator 任务。

tests/system/common/sql/example_sql_execute_query.py[源代码]

execute_query = SQLExecuteQueryOperator(
    task_id="execute_query",
    sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
    split_statements=True,
    return_last=False,
)

检查 SQL 表列

使用 SQLColumnCheckOperator 对给定表的列运行数据质量检查。除了连接 ID 和表之外,还必须提供描述列和要运行的测试之间关系的 column_mapping。一个示例列映射是一组三个嵌套字典,如下所示:

column_mapping = {
    "col_name": {
        "null_check": {"equal_to": 0, "partition_clause": "other_col LIKE 'this'"},
        "min": {
            "greater_than": 5,
            "leq_to": 10,
            "tolerance": 0.2,
        },
        "max": {"less_than": 1000, "geq_to": 10, "tolerance": 0.01},
    }
}

其中 col_name 是要运行检查的列的名称,其字典中的每个条目都是一个检查。有效的检查是:

  • null_check: 检查列中 NULL 值的数量

  • distinct_check: 检查列中不同值的 COUNT

  • unique_check: 检查列中不同值的数量与行数是否一致

  • min: 检查列中的最小值

  • max: 检查列中的最大值

检查字典中的每个条目要么是检查成功的条件、容差,要么是分区子句。成功的条件是:

  • greater_than

  • geq_to

  • less_than

  • leq_to

  • equal_to

指定条件时,equal_to 与其他条件不兼容。可以在同一检查中指定下限和上限条件。容差是指结果可能超出范围但仍被视为成功的百分比。

分区子句可以在操作符级别作为参数给出(此时它将对所有检查进行分区),可以在列映射中的列级别给出(此时它将对该列的所有检查进行分区),也可以在列的检查级别给出(此时它只对该检查进行分区)。

如果未使用所提供连接中的数据库,也可以指定数据库。

accept_none 参数(默认为 true)会将查询返回的 None 值转换为 0,从而允许空表返回有效的整数。

下面的示例演示了如何实例化 SQLColumnCheckOperator 任务。

tests/system/common/sql/example_sql_column_table_check.py[源代码]

column_check = SQLColumnCheckOperator(
    task_id="column_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    column_mapping={
        "id": {
            "null_check": {
                "equal_to": 0,
                "tolerance": 0,
            },
            "distinct_check": {
                "equal_to": 1,
            },
        }
    },
)

检查 SQL 表值

使用 SQLTableCheckOperator 对给定表运行数据质量检查。除了连接 ID 和表之外,还必须提供描述表和要运行的测试之间关系的 checks 字典。一个示例 checks 参数是一组两个嵌套字典,如下所示:

checks = (
    {
        "row_count_check": {
            "check_statement": "COUNT(*) = 1000",
        },
        "column_sum_check": {
            "check_statement": "col_a + col_b < col_c",
            "partition_clause": "col_a IS NOT NULL",
        },
    },
)

第一组键是检查名称,这些名称在操作符构建的模板化查询中引用。检查名称下的字典键必须包含 check_statement,并且该值是一个解析为布尔值的 SQL 语句(这可以是任何解析为 airflow.operators.sql.parse_boolean 中的布尔值的字符串或整数)。要提供的另一个可能的键是 partition_clause,这是一个检查级语句,它将使用该检查的 WHERE 子句对表中的数据进行分区。此语句与参数 partition_clause 兼容,后者会在所有检查中进行筛选。

下面的示例演示了如何实例化 SQLTableCheckOperator 任务。

tests/system/common/sql/example_sql_column_table_check.py[源代码]

row_count_check = SQLTableCheckOperator(
    task_id="row_count_check",
    table=AIRFLOW_DB_METADATA_TABLE,
    checks={
        "row_count_check": {
            "check_statement": "COUNT(*) = 1",
        }
    },
)

根据阈值检查值

使用 SQLThresholdCheckOperator 将特定的 SQL 查询结果与定义的最小和最大阈值进行比较。这两个阈值都可以是数值或计算结果为数值的另一个 SQL 查询。此操作符需要一个连接 ID,以及要执行的 SQL 查询,并且允许选择指定数据库(如果应该覆盖连接_id 中的数据库)。参数包括:- sql - 要执行的 sql 查询,作为模板化字符串。- min_threshold - 要检查的最小阈值。可以作为数值或模板化 sql 查询。- max_threshold - 要检查的最大阈值。可以作为数值或模板化 sql 查询。- conn_id (可选) 用于连接到数据库的连接 ID。- database (可选) 覆盖连接中的数据库名称。

下面的示例演示了如何实例化 SQLThresholdCheckOperator 任务。

tests/system/common/sql/example_sql_threshold_check.py[源代码]

threshhold_check = SQLThresholdCheckOperator(
    task_id="threshhold_check",
    conn_id="sales_db",
    sql="SELECT count(distinct(customer_id)) FROM sales;",
    min_threshold=1,
    max_threshold=1000,
)

如果查询返回的值在阈值范围内,则任务通过。否则,它将失败。

此条目是否有帮助?