airflow.providers.openlineage.sqlparser

属性

log

DEFAULT_NAMESPACE

DEFAULT_INFORMATION_SCHEMA_COLUMNS

DEFAULT_INFORMATION_SCHEMA_TABLE_NAME

GetTableSchemasParams

get_table_schemas 参数。

DatabaseInfo

包含处理 SQL 语句解析结果所需的数据库特定信息。

SQLParser

openlineage-sql 的接口。

函数

default_normalize_name_method(name)

from_table_meta(table_meta, database, namespace, ...)

get_openlineage_facets_with_sql(hook, sql, conn_id, ...)

模块内容

airflow.providers.openlineage.sqlparser.log[source]
airflow.providers.openlineage.sqlparser.DEFAULT_NAMESPACE = 'default'[source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_COLUMNS = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name'][source]
airflow.providers.openlineage.sqlparser.DEFAULT_INFORMATION_SCHEMA_TABLE_NAME = 'information_schema.columns'[source]
airflow.providers.openlineage.sqlparser.default_normalize_name_method(name)[source]
class airflow.providers.openlineage.sqlparser.GetTableSchemasParams[source]

基类: TypedDict

get_table_schemas 参数。

normalize_name: Callable[[str], str][source]
is_cross_db: bool[source]
information_schema_columns: list[str][source]
information_schema_table: str[source]
use_flat_cross_db_query: bool[source]
is_uppercase_names: bool[source]
database: str | None[source]
class airflow.providers.openlineage.sqlparser.DatabaseInfo[source]

包含处理 SQL 语句解析结果所需的数据库特定信息。

参数:
  • scheme – OpenLineage 命名空间中 URI 的 Scheme 部分。

  • authority – OpenLineage 命名空间中 URI 的 Authority 部分。在大多数情况下,它应该返回 Airflow 连接的 {host}:{port} 部分。参阅: https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md

  • database – 优先于解析的数据库名称。

  • information_schema_columns – 信息模式表中的列名称列表。

  • information_schema_table_name – 信息模式表名称。

  • use_flat_cross_db_query

    指定是应使用单个“全局”信息模式表进行跨数据库查询(例如,在 Redshift 中),还是应分别查询多个基于数据库的“本地”信息模式表。

    如果为 True,则假设存在一个单一、通用的信息模式表(例如,在 Redshift 中,SVV_REDSHIFT_COLUMNS 视图)[https://docs.aws.amazon.com/redshift/latest/dg/r_SVV_REDSHIFT_COLUMNS.html]。在此模式下,我们仅直接查询 information_schema_table_name。根据 is_information_schema_cross_db 参数,您还可以在 WHERE 子句中按数据库名称进行过滤。

    如果为 False,则将每个数据库视为拥有包含该数据库元数据的自己的本地信息模式表。因此,可能会为每个数据库生成一个查询,然后进行合并(通常通过 UNION ALL)。对于不维护所有元数据的单一全局视图或需要按数据库查询的方言,此方法是必需的。根据 is_information_schema_cross_db 参数,查询可以在标识符和过滤器中包含或省略数据库信息。

    请参阅 is_information_schema_cross_db,它也会影响最终查询的构建方式。

  • is_information_schema_cross_db

    指定是否应跟踪数据库信息并将其包含在从 information_schema_table 检索模式信息的查询中。简而言之,这决定了查询是否能够跨越多个数据库。

    如果为 True,则在适用时包含数据库标识符,从而允许从多个数据库检索元数据。例如,在 Snowflake 或 MS SQL(其中每个数据库都被视为顶级命名空间)中,您可能会有如下查询:

    ` SELECT ... FROM db1.information_schema.columns WHERE ... UNION ALL SELECT ... FROM db2.information_schema.columns WHERE ... `

    在 Redshift 中,将其设置为 True 并结合 use_flat_cross_db_query=True 允许向查询添加数据库过滤器,例如

    ` SELECT ... FROM SVV_REDSHIFT_COLUMNS WHERE SVV_REDSHIFT_COLUMNS.database == db1  # 当为 False AND 时跳过此项 SVV_REDSHIFT_COLUMNS.schema == schema1 AND SVV_REDSHIFT_COLUMNS.table IN (table1, table2) OR ... `

    但是,某些数据库(例如 PostgreSQL)不允许真正的跨数据库查询。在此类方言中,启用跨数据库支持可能会导致错误或不必要。请始终查阅您的方言文档或测试示例查询,以确认是否支持跨数据库查询。

    如果为 False,则忽略数据库限定符,有效地将查询限制到单个数据库(或使数据库级别限定符成为可选)。这对于不支持跨数据库操作或仅提供两级命名空间(模式 + 表)而不是三级命名空间(数据库 + 模式 + 表)的数据库通常更安全。例如,某些 MySQL 或 PostgreSQL 上下文可能根本不需要或不允许跨数据库查询。

    请参阅 use_flat_cross_db_query,它也会影响最终查询的构建方式。

  • is_uppercase_names – 指定数据库是否仅接受大写名称(例如 Snowflake)。

  • normalize_name_method – 用于规范化数据库、模式和表名称的方法。默认为 name.lower()

scheme: str[source]
authority: str | None = None[source]
database: str | None = None[source]
information_schema_columns: list[str] = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name'][source]
information_schema_table_name: str = 'information_schema.columns'[source]
use_flat_cross_db_query: bool = False[source]
is_information_schema_cross_db: bool = False[source]
is_uppercase_names: bool = False[source]
normalize_name_method: Callable[[str], str][source]
airflow.providers.openlineage.sqlparser.from_table_meta(table_meta, database, namespace, is_uppercase)[source]
class airflow.providers.openlineage.sqlparser.SQLParser(dialect=None, default_schema=None)[source]

基类: airflow.utils.log.logging_mixin.LoggingMixin

openlineage-sql 的接口。

参数:
  • dialect (str | None) – 数据库特定的方言

  • default_schema (str | None) – 应用于每个未解析模式的表的模式

dialect = None[source]
default_schema = None[source]
parse(sql)[source]

解析单条或多条 SQL 语句。

parse_table_schemas(hook, inputs, outputs, database_info, namespace=DEFAULT_NAMESPACE, database=None, sqlalchemy_engine=None)[source]

解析输入表和输出表的模式。

get_metadata_from_parser(inputs, outputs, database_info, namespace=DEFAULT_NAMESPACE, database=None)[source]
attach_column_lineage(datasets, database, parse_result)[source]

将列沿袭方面附加到数据集列表中。

请注意,目前每个数据集设置了相同的列沿袭信息。这将在 OpenLineage SQL Parser 改进后有所变化。

generate_openlineage_metadata_from_sql(sql, hook, database_info, database=None, sqlalchemy_engine=None, use_connection=True)[source]

解析 SQL 语句并生成 OpenLineage 元数据。

生成的 OpenLineage 元数据包含

  • 已解析模式的输入表

  • 已解析模式的输出表

  • 运行方面 (run facets)

  • 作业方面 (job facets)。

参数:
static create_namespace(database_info)[source]
classmethod normalize_sql(sql)[source]

确保返回以分号分隔的 SQL 语句。

classmethod split_sql_string(sql)[source]

将 SQL 字符串拆分为语句列表。

尝试使用 DbApiHook.split_sql_string 如果可用。否则,使用相同的逻辑。

create_information_schema_query(tables, normalize_name, is_cross_db, information_schema_columns, information_schema_table, is_uppercase_names, use_flat_cross_db_query, database=None, sqlalchemy_engine=None)[source]

创建 SELECT 语句以查询信息模式表。

airflow.providers.openlineage.sqlparser.get_openlineage_facets_with_sql(hook, sql, conn_id, database)[source]

此条目有帮助吗?