使用 OpenLineage 集成¶
OpenLineage 是一个用于数据沿袭收集和分析的开放框架。其核心是一个可扩展的规范,系统可以使用该规范来互操作沿袭元数据。 查看 OpenLineage 文档。
使用 OpenLineage 不需要更改用户 DAG 文件。需要进行基本配置,以便 OpenLineage 知道将事件发送到哪里。
快速入门¶
注意
OpenLineage Provider 提供了多种数据传输选项(http、kafka、文件等),包括创建自定义解决方案的灵活性。可以通过多种方法管理配置,并且用户可以使用大量的设置来微调和增强其对 OpenLineage 的使用。有关这些功能的全面说明,请参阅本文档的后续部分。
此示例是 OpenLineage 设置的基本演示。
安装提供程序包或将其添加到
requirements.txt
文件中。pip install apache-airflow-providers-openlineage
提供
Transport
配置,以便 OpenLineage 知道将事件发送到哪里。在airflow.cfg
文件中[openlineage] transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
或使用
AIRFLOW__OPENLINEAGE__TRANSPORT
环境变量AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
就是这样! 运行 DAG 时,OpenLineage 事件应发送到已配置的后端。
用法¶
启用并配置后,集成不需要用户采取进一步的操作。它将自动
收集任务输入/输出元数据(来源、架构等)。
收集任务运行级别元数据(执行时间、状态、参数等)
收集任务作业级别元数据(所有者、类型、描述等)
收集特定于任务的元数据(bigquery 作业 ID、python 源代码等) - 取决于操作器
所有这些数据都将作为 OpenLineage 事件发送到已配置的后端,如 作业层次结构 中所述。
传输设置¶
配置 OpenLineage Airflow Provider 的主要和推荐方法是 Airflow 配置(airflow.cfg
文件)。所有可能的配置选项以及示例值都可以在 配置部分 中找到。
至少,在每种情况下都需要设置一件事,即 Transport
- 您希望事件最终出现在哪里 - 例如 Marquez。
将传输作为 JSON 字符串¶
Airflow 配置中的 transport
选项用于此目的。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
AIRFLOW__OPENLINEAGE__TRANSPORT
环境变量是等效的。
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
如果您想查看 OpenLineage 事件而不将它们发送到任何地方,则可以设置 ConsoleTransport
- 事件将最终出现在任务日志中。
[openlineage]
transport = {"type": "console"}
注意
有关内置传输类型的完整列表、特定传输的选项或有关如何实现自定义传输的说明,请参阅 Python 客户端文档。
将传输作为配置文件¶
您还可以使用 YAML 文件(例如 openlineage.yml
)配置 OpenLineage Transport
。在 Airflow 配置中提供 YAML 文件的路径作为 config_path
选项。
[openlineage]
config_path = '/path/to/openlineage.yml'
AIRFLOW__OPENLINEAGE__CONFIG_PATH
环境变量是等效的。
AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'
配置文件 YAML 文件的内容示例
transport:
type: http
url: https://backend:5000
endpoint: events/receive
auth:
type: api_key
apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e
注意
有关该配置方法的详细说明以及配置文件示例,请参阅 Python 客户端文档。
配置优先级¶
由于配置 OpenLineage 的方法有多种,因此牢记不同配置的优先级非常重要。OpenLineage Airflow Provider 按以下顺序查找配置
在
openlineage
部分下检查airflow.cfg
中的config_path
(或 AIRFLOW__OPENLINEAGE__CONFIG_PATH 环境变量)在
openlineage
部分下检查airflow.cfg
中的transport
(或 AIRFLOW__OPENLINEAGE__TRANSPORT 环境变量)如果缺少上述所有选项,则下面使用的 OpenLineage Python 客户端将按照 此 文档中描述的顺序查找配置。请注意,鼓励使用 Airflow 配置,并且这是唯一面向未来的解决方案。
向后兼容性¶
警告
不应使用以下变量,并且将来可能会删除。请考虑使用 Airflow 配置(如上所述)以获得面向未来的解决方案。
为了与 openlineage-airflow
包向后兼容,某些环境变量仍然可用
OPENLINEAGE_DISABLED
等效于AIRFLOW__OPENLINEAGE__DISABLED
。OPENLINEAGE_CONFIG
等效于AIRFLOW__OPENLINEAGE__CONFIG_PATH
。OPENLINEAGE_NAMESPACE
等效于AIRFLOW__OPENLINEAGE__NAMESPACE
。OPENLINEAGE_EXTRACTORS
等效于设置AIRFLOW__OPENLINEAGE__EXTRACTORS
。OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE
等效于AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE
。OPENLINEAGE_URL
可用于设置简单的 http 传输。此方法有一些限制,可能需要使用其他环境变量才能实现所需的输出。请参阅 文档。
其他选项¶
命名空间¶
为这个特定实例设置 OpenLineage 命名空间非常有用。这样,如果您使用多个 OpenLineage 生产者,则来自它们的事件将在逻辑上分开。如果未设置,则使用 default
命名空间。在 Airflow 配置中提供命名空间的名称作为 namespace
选项。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
namespace = 'my-team-airflow-instance'
AIRFLOW__OPENLINEAGE__NAMESPACE
环境变量是等效的。
AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
禁用¶
您可以通过在 Airflow 配置中将 disabled
选项设置为 true
来禁用发送 OpenLineage 事件,而无需卸载 OpenLineage 提供程序。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled = true
AIRFLOW__OPENLINEAGE__DISABLED
环境变量是等效的。
AIRFLOW__OPENLINEAGE__DISABLED=true
禁用源代码¶
默认情况下,多个操作器(例如 Python、Bash)会将它们的源代码包含在它们的 OpenLineage 事件中。为防止这种情况,请在 Airflow 配置中将 disable_source_code
选项设置为 true
。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disable_source_code = true
AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE
环境变量是等效的。
AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true
为操作器禁用¶
您可以通过将以分号分隔的 Airflow 操作器完整导入路径字符串传递给 Airflow 配置中的 disabled_for_operators
字段,轻松地排除某些操作器发出 OpenLineage 事件。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled_for_operators = 'airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator'
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS
环境变量是等效的。
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator'
自定义提取器¶
如果您使用 自定义提取器 功能,请通过将以分号分隔的 Airflow 操作器完整导入路径字符串传递给 Airflow 配置中的 extractors
选项来注册提取器。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
AIRFLOW__OPENLINEAGE__EXTRACTORS
环境变量是等效的。
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
在 DAG/任务级别启用 OpenLineage¶
可以通过使用 selective_enable
策略选择性地为特定 DAG 和任务启用 OpenLineage。要启用此策略,请在 Airflow 配置文件的 [openlineage] 部分中将 selective_enable
选项设置为 True
[openlineage]
selective_enable = True
虽然 selective_enable
启用选择性控制,但 disabled
选项 仍然具有优先级。如果在配置中将 disabled
设置为 True,则无论 selective_enable
设置如何,都将为所有 DAG 和任务禁用 OpenLineage。
启用 selective_enable
策略后,您可以选择使用 enable_lineage
和 disable_lineage
函数为单个 DAG 和任务启用 OpenLineage。
在 DAG 上启用沿袭
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with enable_lineage(DAG(...)):
# Tasks within this DAG will have lineage tracking enabled
MyOperator(...)
AnotherOperator(...)
在任务上启用沿袭
虽然在 DAG 上启用沿袭会隐式地为该 DAG 中的所有任务启用它,但您仍然可以选择性地为特定任务禁用它
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with DAG(...) as dag:
t1 = MyOperator(...)
t2 = AnotherOperator(...)
# Enable lineage for the entire DAG
enable_lineage(dag)
# Disable lineage for task t1
disable_lineage(t1)
在 DAG 级别启用沿袭会自动为该 DAG 中的所有任务启用它,除非在每个任务中明确禁用。
在任务级别启用沿袭会隐式地在其 DAG 上启用沿袭。这是因为每个发出任务都会发送一个 ParentRunFacet,这要求在某些 OpenLineage 后端系统中启用 DAG 级别的沿袭。在启用任务级别沿袭的同时禁用 DAG 级别沿袭可能会导致错误或不一致。
添加对自定义运算符的支持¶
如果您想为特定运算符添加 OpenLineage 覆盖范围,请查看在运算符中实现 OpenLineage
我在哪里可以了解更多信息?¶
访问我们的GitHub 存储库。
观看多个关于 OpenLineage 的演讲。