在操作器中实现 OpenLineage¶
OpenLineage 通过支持直接修改 Airflow 操作器,可以轻松地将血缘添加到您的数据管道中。如果可以修改操作器,添加血缘提取就像向其中添加一个方法一样简单。有关更多详细信息,请参阅 OpenLineage 方法。
可能有一些您无法修改的操作器(例如第三方提供程序),但您仍然希望从中提取血缘。为了处理这种情况,OpenLineage 允许您为任何操作器提供自定义提取器。有关更多详细信息,请参阅 自定义提取器。
如果以上所有方法都无法实现,作为后备方案,有一种方法可以手动注释血缘。Airflow 允许操作器通过入口和出口指定操作器的输入和输出来跟踪血缘。有关更多详细信息,请参阅 手动注释的血缘。
提取优先级¶
由于有多种可能的方法来实现对操作器的 OpenLineage 支持,因此请务必记住 OpenLineage 查找血缘数据的顺序
提取器 - 检查是否为操作器类名指定了自定义提取器。用户注册的任何自定义提取器都将优先于 Airflow 提供程序源代码中定义的默认提取器(例如 BashExtractor)。
OpenLineage 方法 - 如果没有为操作器类名显式指定提取器,则使用 DefaultExtractor,它在操作器中查找 OpenLineage 方法。
入口和出口 - 如果操作器中没有定义 OpenLineage 方法,则检查入口和出口。
如果缺少以上所有选项,则不会从操作器中提取血缘数据。您仍然会收到 OpenLineage 事件,其中包含诸如常规 Airflow 方面、正确的事件时间和类型之类的信息,但输入/输出将为空,并且缺少特定于操作器的方面。
OpenLineage 方法¶
在处理您自己的操作器时,建议使用此方法,您可以直接在其中实现 OpenLineage 方法。当处理您无法修改的操作器(例如第三方提供程序)但仍然希望从中提取血缘时,请参阅 自定义提取器。
OpenLineage 定义了一些要在操作器中实现的方法。这些被称为 OpenLineage 方法。
def get_openlineage_facets_on_start() -> OperatorLineage:
...
def get_openlineage_facets_on_complete(ti: TaskInstance) -> OperatorLineage:
...
def get_openlineage_facets_on_failure(ti: TaskInstance) -> OperatorLineage:
...
当任务实例状态更改为 RUNNING、SUCCESS 和 FAILED 时,将分别调用 OpenLineage 方法。如果没有 on_complete
或 on_failure
方法,则调用 on_start
方法。
提供程序定义了 OperatorLineage
结构以供操作器返回,而不是返回完整的 OpenLineage 事件
@define
class OperatorLineage:
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, BaseFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
OpenLineage 集成本身负责使用诸如常规 Airflow 方面、正确的事件时间和类型之类的信息来丰富它,从而创建正确的 OpenLineage RunEvent。
如何正确实现 OpenLineage 方法?¶
在操作器中实现 OpenLineage 时,有几点需要注意。
首先,不要在顶级导入 OpenLineage 相关的对象,而是在 OL 方法本身中导入。这允许用户即使没有安装 OpenLineage 提供程序也可以使用您的提供程序。
第二个要点是确保您的提供程序返回符合 OpenLineage 标准的数据集名称。它允许 OpenLineage 使用者正确匹配从不同来源收集的有关数据集的信息。命名约定在 OpenLineage 命名文档 中进行了描述。
第三,OpenLineage 实现不应浪费不使用它的用户的时间。这意味着不要在 execute
方法中执行未被其使用的繁重处理或网络调用。更好的选择是将相关信息保存在操作器属性中,然后在 OpenLineage 方法中使用它。一个很好的例子是 BigQueryExecuteQueryOperator
。它保存已执行查询的 job_ids
。然后,get_openlineage_facets_on_complete
可以调用 BigQuery API,请求这些表的血缘,并将其转换为 OpenLineage 格式。
第四,没有必要实现所有方法。如果在调用 execute
之前就知道所有数据集,并且没有相关的运行时数据,则可能没有必要实现 get_openlineage_facets_on_complete
- get_openlineage_facets_on_start
方法可以提供所有数据。反之,如果在执行之前一切都未知,则编写 _on_start
方法可能没有意义。类似地,如果没有相关的故障数据,或者故障条件未知,则实现 get_openlineage_facets_on_failure
可能不值得。
如何测试 OpenLineage 方法?¶
对操作器中的 OpenLineage 集成进行单元测试与测试操作器本身非常相似。这些测试的目标是确保 get_openlineage_*
方法返回填充了相关字段的正确 OperatorLineage
数据结构。建议模拟任何外部调用。测试的作者需要记住调用不同 OL 方法的条件是不同的。get_openlineage_facets_on_start
在 execute
之前调用,因此不能依赖于在那里设置的值。
有关如何在本地对 OpenLineage 进行故障排除的详细信息,请参阅 本地故障排除。
目前还没有用于系统测试 OpenLineage 集成的框架,但最简单的实现方法是将发出的事件(例如使用 FileTransport
)与预期的事件进行比较。OpenLineage 系统测试的作者的目标是提供事件键的预期字典。事件键标识从特定操作器和方法发送的事件:它们的结构为 <dag_id>.<task_id>.event.<event_type>
;始终可以通过这种方式识别从特定任务发送的特定事件。提供的事件结构不必包含结果事件中的所有字段。只能比较测试作者提供的字段;这允许仅检查特定测试关心的字段。它还允许跳过(半)随机生成的字段,例如 runId
或 eventTime
,或者在 Airflow 中的 OpenLineage 上下文中始终相同的字段,例如 producer
。
示例¶
以下是正确实现的 get_openlineage_facets_on_complete
方法的示例,用于 GcsToGcsOperator。由于在 execute
方法中进行了一些处理,并且没有相关的故障数据,因此实现此单一方法就足够了。
def get_openlineage_facets_on_complete(self, task_instance):
"""
Implementing _on_complete because execute method does preprocessing on internals.
This means we won't have to normalize self.source_object and self.source_objects,
destination bucket and so on.
"""
from openlineage.client.run import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[
Dataset(namespace=f"gs://{self.source_bucket}", name=source)
for source in sorted(self.resolved_source_objects)
],
outputs=[
Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
for target in sorted(self.resolved_target_objects)
],
)
有关已实现的 OpenLineage 方法的更多示例,请查看 支持的操作器 的源代码。
自定义提取器¶
当处理您无法修改的操作器(例如第三方提供程序)但仍然希望从中提取血缘时,建议使用此方法。如果要从您自己的操作器中提取血缘,您可能更喜欢直接实现 OpenLineage 方法 中描述的 OpenLineage 方法。
此方法的工作原理是检测您的 DAG 正在使用哪些 Airflow 操作器,并使用相应的 Extractors 类从中提取血缘数据。
接口¶
自定义提取器必须派生自 BaseExtractor
并至少实现两个方法:_execute_extraction
和 get_operator_classnames
。
BaseOperator 定义了两个方法:extract
和 extract_on_complete
,它们被调用并用于提供实际的血缘数据。区别在于 extract
在操作器的 execute
方法之前调用,而 extract_on_complete
在之后调用。默认情况下,extract
调用自定义提取器中实现的 _execute_extraction
方法,而 extract_on_complete
调用 extract
方法。如果要提供任务执行后可用的其他信息,可以覆盖 extract_on_complete
方法。这可用于提取操作器在其自身属性上设置的任何其他信息。一个很好的例子是 SnowflakeOperator
,它在执行后设置 query_ids
。
get_operator_classnames
是一个类方法,用于提供您的提取器可以从中获取血缘的操作器列表。
例如
@classmethod
def get_operator_classnames(cls) -> List[str]:
return ['PostgresOperator']
如果操作器的名称与列表中的某个名称匹配,则将实例化提取器 - 在提取器的 self.operator
属性中提供操作器 - 并调用 extract
和 extract_on_complete
方法。
这两种方法都返回 OperatorLineage
结构
@define
class OperatorLineage:
"""Structure returned from lineage extraction."""
inputs: list[Dataset] = Factory(list)
outputs: list[Dataset] = Factory(list)
run_facets: dict[str, BaseFacet] = Factory(dict)
job_facets: dict[str, BaseFacet] = Factory(dict)
输入和输出是普通 OpenLineage 数据集 (openlineage.client.run.Dataset) 的列表。
run_facets
和 job_facets
是可选的 RunFacets 和 JobFacets 字典,它们将附加到作业中 - 例如,如果您的 Operator 正在执行 SQL,您可能希望附加 SqlJobFacet
。
要详细了解 OpenLineage 中的分面,请参阅自定义分面。
注册自定义提取器¶
除非您注册提取器,否则 OpenLineage 集成不会知道您已提供提取器。
可以使用 Airflow 配置中的 extractors
选项完成此操作。
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
AIRFLOW__OPENLINEAGE__EXTRACTORS
环境变量是等效的。
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
(可选)您可以使用空格分隔它们。如果您将它们作为某些 YAML 文件的一部分提供,这将非常有用。
AIRFLOW__OPENLINEAGE__EXTRACTORS: >-
full.path.to.FirstExtractor;
full.path.to.SecondExtractor
请记住确保调度程序和工作程序可以导入该路径。
调试自定义提取器¶
自定义提取器有两个常见问题。
首先,为 Airflow 配置中的 extractors
选项提供的路径错误。该路径需要与您在代码中使用的路径完全相同。如果路径错误或工作程序无法导入,则插件将无法加载提取器,并且不会为该 Operator 发出正确的 OpenLineage 事件。
第二个问题,也许更隐蔽,是来自 Airflow 的导入。由于 OpenLineage 代码是在 Airflow 工作程序本身启动时实例化的,因此来自 Airflow 的任何导入都可能不知不觉地循环。这会导致 OpenLineage 提取失败。
为避免此问题,请仅在 _execute_extraction
或 extract_on_complete
方法中本地导入 Airflow。如果您需要导入以进行类型检查,请使用 typing.TYPE_CHECKING 保护它们。
测试自定义提取器¶
与所有代码一样,应测试自定义提取器。本节将提供一些有关编写测试最重要的数据结构的信息,以及一些有关故障排除的说明。我们假设您事先了解如何编写自定义提取器。要详细了解 Operator 和提取器如何在后台协同工作,请查看自定义提取器。
在测试提取器时,我们首先要验证是否正在创建 OperatorLineage
对象,特别是验证该对象是否使用正确的输入和输出数据集以及相关分面构建。这在 OpenLineage 中通过 pytest 完成,并对连接和对象进行适当的模拟和修补。查看示例测试。
测试每个分面也很重要,因为如果分面错误,UI 中的数据或图形可能会呈现错误。例如,如果在提取器中错误地创建了分面名称,则 Operator 的任务将不会显示在沿袭图中,从而导致管道可观察性出现差距。
即使使用单元测试,提取器也可能无法按预期运行。判断数据是否未正确传输的最简单方法是,UI 元素是否未在“沿袭”选项卡中正确显示。
有关如何在本地对 OpenLineage 进行故障排除的详细信息,请参阅 本地故障排除。
示例¶
这是一个简单提取器的示例,该提取器用于在 BigQuery 中执行导出查询并将结果保存到 S3 文件的 Operator。在调用 Operator 的 execute
方法之前,已知一些信息,我们已经可以在 _execute_extraction
方法中提取一些沿袭。在调用 Operator 的 execute
方法后,在 extract_on_complete
中,我们可以简单地将一些额外的分面(例如 Bigquery 作业 ID)附加到我们之前准备的内容中。这样,我们就可以从 Operator 中获取所有可能的信息。
请注意,这只是一个示例。OpenLineage 内置了一些功能,可以促进不同的流程,例如使用 SQL 解析器从 SQL 查询中提取列级沿袭和输入/输出。
from openlineage.client.facet import BaseFacet, ExternalQueryRunFacet, SqlJobFacet
from openlineage.client.run import Dataset
from airflow.models.baseoperator import BaseOperator
from airflow.providers.openlineage.extractors.base import BaseExtractor
class ExampleOperator(BaseOperator):
def __init__(self, query, bq_table_reference, s3_path) -> None:
self.bq_table_reference = bq_table_reference
self.s3_path = s3_path
self.s3_file_name = s3_file_name
self.query = query
self._job_id = None
def execute(self, context) -> Any:
self._job_id = run_query(query=self.query)
class ExampleExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls):
return ["ExampleOperator"]
def _execute_extraction(self) -> OperatorLineage:
"""Define what we know before Operator's extract is called."""
return OperatorLineage(
inputs=[Dataset(namespace="bigquery", name=self.bq_table_reference)],
outputs=[Dataset(namespace=self.s3_path, name=self.s3_file_name)],
job_facets={
"sql": SqlJobFacet(
query="EXPORT INTO ... OPTIONS(FORMAT=csv, SEP=';' ...) AS SELECT * FROM ... "
)
},
)
def extract_on_complete(self) -> OperatorLineage:
"""Add what we received after Operator's extract call."""
lineage_metadata = self.extract()
lineage_metadata.run_facets = {
"parent": ExternalQueryRunFacet(externalQueryId=self._job_id, source="bigquery")
}
return lineage_metadata
有关 OpenLineage 提取器的更多示例,请查看BashExtractor 或PythonExtractor 的源代码。
手动注释沿袭¶
不建议使用这种方法,只有在非常特殊的情况下才建议使用,即无法从 Operator 本身提取某些沿袭信息时。如果要从您自己的 Operator 中提取沿袭,您可能更愿意直接实现 OpenLineage 方法,如OpenLineage 方法中所述。在处理您无法修改的 Operator(例如第三方提供程序)但仍希望从中提取沿袭时,请参阅自定义提取器。
Airflow 允许 Operator 通过入口和出口指定 Operator 的输入和输出,从而跟踪沿袭。默认情况下,如果 OpenLineage 无法从 OpenLineage 方法或提取器中找到任何成功的提取,它将使用入口和出口作为输入/输出数据集。
Airflow 支持入口和出口为表、列、文件或用户实体,OpenLineage 也是如此。
示例¶
Airflow DAG 中的 Operator 可以使用入口和出口进行注释,如下例所示
"""Example DAG demonstrating the usage of the extraction via Inlets and Outlets."""
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.lineage.entities import Table, File, Column, User
t1 = Table(
cluster="c1",
database="d1",
name="t1",
owners=[User(email="[email protected]", first_name="Joe", last_name="Doe")],
)
t2 = Table(
cluster="c1",
database="d1",
name="t2",
columns=[
Column(name="col1", description="desc1", data_type="type1"),
Column(name="col2", description="desc2", data_type="type2"),
],
owners=[
User(email="[email protected]", first_name="Mike", last_name="Smith"),
User(email="[email protected]", first_name="Theo"),
User(email="[email protected]", last_name="Smith"),
User(email="[email protected]"),
],
)
t3 = Table(
cluster="c1",
database="d1",
name="t3",
columns=[
Column(name="col3", description="desc3", data_type="type3"),
Column(name="col4", description="desc4", data_type="type4"),
],
)
t4 = Table(cluster="c1", database="d1", name="t4")
f1 = File(url="s3://bucket/dir/file1")
with DAG(
dag_id="example_operator",
schedule_interval="@once",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
task1 = BashOperator(
task_id="task_1_with_inlet_outlet",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
inlets=[t1, t2],
outlets=[t3],
)
task2 = BashOperator(
task_id="task_2_with_inlet_outlet",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
inlets=[t3, f1],
outlets=[t4],
)
task1 >> task2
if __name__ == "__main__":
dag.cli()
从 Airflow 表实体到 OpenLineage 数据集的转换方式如下:- 表实体的 CLUSTER
成为 OpenLineage 数据集的命名空间 - 数据集的名称由 {{DATABASE}}.{{NAME}}
组成,其中 DATABASE
和 NAME
是 Airflow 的表实体指定的属性。
自定义分面¶
要详细了解 OpenLineage 中的分面,请参阅分面文档。另请查看可用的分面
OpenLineage 规范可能不包含编写提取器所需的所有分面,在这种情况下,您必须创建自己的自定义分面。有关创建自定义分面的更多信息,请参见此处。
作业层次结构¶
Apache Airflow 具有固有的作业层次结构:DAG 是大型且可独立调度的单元,包含较小的可执行任务。
OpenLineage 在其作业层次结构模型中反映了这种结构。
DAG 调度后,将发出 START 事件。
随后,按照 Airflow 的任务顺序,每个任务都会触发
TaskInstance 启动时的 START 事件。
完成时的 COMPLETE/FAILED 事件。
最后,在 DAG 终止时,将发出完成事件(COMPLETE 或 FAILED)。
TaskInstance 事件的 ParentRunFacet 引用原始的 DAG 运行。
本地故障排除¶
在本地测试代码时,可以使用Marquez 检查正在发出的数据(或未发出的数据)。使用 Marquez 可以让您确定错误是由提取器还是 API 引起的。如果数据按预期从提取器发出但未到达 UI,则提取器没有问题,应在 OpenLineage 中打开一个问题。但是,如果数据未正确发出,则可能需要更多单元测试来覆盖提取器行为。Marquez 可以帮助您查明哪些分面未正确形成,以便您知道在哪里添加测试覆盖率。
我在哪里可以了解更多信息?¶
访问我们的GitHub 存储库。
观看有关 OpenLineage 的多个演讲。