使用 OpenLineage 集成

OpenLineage 是一个用于数据沿袭收集和分析的开放框架。其核心是一个可扩展的规范,系统可以使用该规范来互操作沿袭元数据。 查看 OpenLineage 文档

使用 OpenLineage 不需要更改用户 DAG 文件。需要进行基本配置,以便 OpenLineage 知道将事件发送到哪里。

快速入门

注意

OpenLineage Provider 提供了多种数据传输选项(http、kafka、文件等),包括创建自定义解决方案的灵活性。可以通过多种方法管理配置,并且用户可以使用大量的设置来微调和增强其对 OpenLineage 的使用。有关这些功能的全面说明,请参阅本文档的后续部分。

此示例是 OpenLineage 设置的基本演示。

  1. 安装提供程序包或将其添加到 requirements.txt 文件中。

    pip install apache-airflow-providers-openlineage
    
  2. 提供 Transport 配置,以便 OpenLineage 知道将事件发送到哪里。在 airflow.cfg 文件中

    [openlineage]
    transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
    

    或使用 AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量

    AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
    
  3. 就是这样! 运行 DAG 时,OpenLineage 事件应发送到已配置的后端。

用法

启用并配置后,集成不需要用户采取进一步的操作。它将自动

  • 收集任务输入/输出元数据(来源、架构等)。

  • 收集任务运行级别元数据(执行时间、状态、参数等)

  • 收集任务作业级别元数据(所有者、类型、描述等)

  • 收集特定于任务的元数据(bigquery 作业 ID、python 源代码等) - 取决于操作器

所有这些数据都将作为 OpenLineage 事件发送到已配置的后端,如 作业层次结构 中所述。

传输设置

配置 OpenLineage Airflow Provider 的主要和推荐方法是 Airflow 配置(airflow.cfg 文件)。所有可能的配置选项以及示例值都可以在 配置部分 中找到。

至少,在每种情况下都需要设置一件事,即 Transport - 您希望事件最终出现在哪里 - 例如 Marquez

将传输作为 JSON 字符串

Airflow 配置中的 transport 选项用于此目的。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}

AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量是等效的。

AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'

如果您想查看 OpenLineage 事件而不将它们发送到任何地方,则可以设置 ConsoleTransport - 事件将最终出现在任务日志中。

[openlineage]
transport = {"type": "console"}

注意

有关内置传输类型的完整列表、特定传输的选项或有关如何实现自定义传输的说明,请参阅 Python 客户端文档

将传输作为配置文件

您还可以使用 YAML 文件(例如 openlineage.yml)配置 OpenLineage Transport。在 Airflow 配置中提供 YAML 文件的路径作为 config_path 选项。

[openlineage]
config_path = '/path/to/openlineage.yml'

AIRFLOW__OPENLINEAGE__CONFIG_PATH 环境变量是等效的。

AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'

配置文件 YAML 文件的内容示例

transport:
  type: http
  url: https://backend:5000
  endpoint: events/receive
  auth:
    type: api_key
    apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e

注意

有关该配置方法的详细说明以及配置文件示例,请参阅 Python 客户端文档

配置优先级

由于配置 OpenLineage 的方法有多种,因此牢记不同配置的优先级非常重要。OpenLineage Airflow Provider 按以下顺序查找配置

  1. openlineage 部分下检查 airflow.cfg 中的 config_path(或 AIRFLOW__OPENLINEAGE__CONFIG_PATH 环境变量)

  2. openlineage 部分下检查 airflow.cfg 中的 transport(或 AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量)

  3. 如果缺少上述所有选项,则下面使用的 OpenLineage Python 客户端将按照 文档中描述的顺序查找配置。请注意,鼓励使用 Airflow 配置,并且这是唯一面向未来的解决方案。

向后兼容性

警告

不应使用以下变量,并且将来可能会删除。请考虑使用 Airflow 配置(如上所述)以获得面向未来的解决方案。

为了与 openlineage-airflow 包向后兼容,某些环境变量仍然可用

  • OPENLINEAGE_DISABLED 等效于 AIRFLOW__OPENLINEAGE__DISABLED

  • OPENLINEAGE_CONFIG 等效于 AIRFLOW__OPENLINEAGE__CONFIG_PATH

  • OPENLINEAGE_NAMESPACE 等效于 AIRFLOW__OPENLINEAGE__NAMESPACE

  • OPENLINEAGE_EXTRACTORS 等效于设置 AIRFLOW__OPENLINEAGE__EXTRACTORS

  • OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE 等效于 AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE

  • OPENLINEAGE_URL 可用于设置简单的 http 传输。此方法有一些限制,可能需要使用其他环境变量才能实现所需的输出。请参阅 文档

其他选项

命名空间

为这个特定实例设置 OpenLineage 命名空间非常有用。这样,如果您使用多个 OpenLineage 生产者,则来自它们的事件将在逻辑上分开。如果未设置,则使用 default 命名空间。在 Airflow 配置中提供命名空间的名称作为 namespace 选项。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
namespace = 'my-team-airflow-instance'

AIRFLOW__OPENLINEAGE__NAMESPACE 环境变量是等效的。

AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'

禁用

您可以通过在 Airflow 配置中将 disabled 选项设置为 true 来禁用发送 OpenLineage 事件,而无需卸载 OpenLineage 提供程序。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled = true

AIRFLOW__OPENLINEAGE__DISABLED 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DISABLED=true

禁用源代码

默认情况下,多个操作器(例如 Python、Bash)会将它们的源代码包含在它们的 OpenLineage 事件中。为防止这种情况,请在 Airflow 配置中将 disable_source_code 选项设置为 true

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disable_source_code = true

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true

为操作器禁用

您可以通过将以分号分隔的 Airflow 操作器完整导入路径字符串传递给 Airflow 配置中的 disabled_for_operators 字段,轻松地排除某些操作器发出 OpenLineage 事件。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled_for_operators = 'airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator'

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS 环境变量是等效的。

AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator'

自定义提取器

如果您使用 自定义提取器 功能,请通过将以分号分隔的 Airflow 操作器完整导入路径字符串传递给 Airflow 配置中的 extractors 选项来注册提取器。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass

AIRFLOW__OPENLINEAGE__EXTRACTORS 环境变量是等效的。

AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'

在 DAG/任务级别启用 OpenLineage

可以通过使用 selective_enable 策略选择性地为特定 DAG 和任务启用 OpenLineage。要启用此策略,请在 Airflow 配置文件的 [openlineage] 部分中将 selective_enable 选项设置为 True

[openlineage]
selective_enable = True

虽然 selective_enable 启用选择性控制,但 disabled 选项 仍然具有优先级。如果在配置中将 disabled 设置为 True,则无论 selective_enable 设置如何,都将为所有 DAG 和任务禁用 OpenLineage。

启用 selective_enable 策略后,您可以选择使用 enable_lineagedisable_lineage 函数为单个 DAG 和任务启用 OpenLineage。

  1. 在 DAG 上启用沿袭

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with enable_lineage(DAG(...)):
    # Tasks within this DAG will have lineage tracking enabled
    MyOperator(...)

    AnotherOperator(...)
  1. 在任务上启用沿袭

虽然在 DAG 上启用沿袭会隐式地为该 DAG 中的所有任务启用它,但您仍然可以选择性地为特定任务禁用它

from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage

with DAG(...) as dag:
    t1 = MyOperator(...)
    t2 = AnotherOperator(...)

# Enable lineage for the entire DAG
enable_lineage(dag)

# Disable lineage for task t1
disable_lineage(t1)

在 DAG 级别启用沿袭会自动为该 DAG 中的所有任务启用它,除非在每个任务中明确禁用。

在任务级别启用沿袭会隐式地在其 DAG 上启用沿袭。这是因为每个发出任务都会发送一个 ParentRunFacet,这要求在某些 OpenLineage 后端系统中启用 DAG 级别的沿袭。在启用任务级别沿袭的同时禁用 DAG 级别沿袭可能会导致错误或不一致。

故障排除

有关如何在本地对 OpenLineage 进行故障排除的详细信息,请参阅本地故障排除

添加对自定义运算符的支持

如果您想为特定运算符添加 OpenLineage 覆盖范围,请查看在运算符中实现 OpenLineage

我在哪里可以了解更多信息?

反馈

您可以通过slack联系我们并向我们提供反馈!

如何贡献

我们欢迎您的贡献!OpenLineage 是一个正在积极开发中的开源项目,我们非常感谢您的帮助!

听起来很有趣?查看我们的新贡献者指南以开始使用。

此条目有帮助吗?