在 Operator 中实现 OpenLineage

OpenLineage 通过直接修改 Airflow Operator 的方式,可以轻松地为您的数据管道添加血缘关系。如果可以修改 Operator,那么添加血缘提取就像向其添加一个方法一样简单。有关更多详细信息,请参阅 OpenLineage 方法

可能存在一些您无法修改的 Operator(例如,第三方提供商),但您仍然希望从中提取血缘关系。为了处理这种情况,OpenLineage 允许您为任何 Operator 提供自定义提取器。有关更多详细信息,请参阅 自定义提取器

如果以上所有方法都无法实现,作为回退方案,还有一种手动注释血缘关系的方法。Airflow 允许 Operator 通过指定输入和输出的入口和出口来跟踪血缘关系。有关更多详细信息,请参阅 手动注释血缘关系

提取优先级

由于有多种方法可以为 Operator 实现 OpenLineage 支持,因此请务必记住 OpenLineage 查找血缘数据的顺序

  1. 提取器 - 检查是否为 Operator 类名指定了自定义提取器。用户注册的任何自定义提取器将优先于 Airflow Provider 源代码中定义的默认提取器(例如,BashExtractor)。

  2. OpenLineage 方法 - 如果没有为 Operator 类名显式指定提取器,则使用 DefaultExtractor,它会在 Operator 中查找 OpenLineage 方法。

  3. 入口和出口 - 如果 Operator 中没有定义 OpenLineage 方法,则检查入口和出口。

如果以上所有选项都缺失,则不会从 Operator 中提取任何血缘数据。您仍然会收到包含诸如通用 Airflow 面、适当的事件时间和类型等信息的 OpenLineage 事件,但输入/输出将为空,并且缺少特定于 Operator 的面。

OpenLineage 方法

当处理您自己的 Operator(您可以直接实现 OpenLineage 方法)时,建议使用此方法。当处理您无法修改的 Operator(例如,第三方提供商),但仍然希望从中提取血缘关系时,请参阅 自定义提取器

OpenLineage 定义了一些在 Operator 中实现的方法。这些方法被称为 OpenLineage 方法。

def get_openlineage_facets_on_start() -> OperatorLineage: ...


def get_openlineage_facets_on_complete(ti: TaskInstance) -> OperatorLineage: ...


def get_openlineage_facets_on_failure(ti: TaskInstance) -> OperatorLineage: ...

当任务实例状态更改为以下状态时,会分别调用 OpenLineage 方法

  • RUNNING -> get_openlineage_facets_on_start()

  • SUCCESS -> get_openlineage_facets_on_complete()

  • FAILED -> get_openlineage_facets_on_failure()

必须实现以下方法中的至少一种:get_openlineage_facets_on_start()get_openlineage_facets_on_complete()。有关在缺少其他方法时调用哪些方法的更多详细信息,请参阅 如何正确实现 OpenLineage 方法?

Provider 定义了 OperatorLineage 结构,由 Operator 返回,而不是返回完整的 OpenLineage 事件

@define
class OperatorLineage:
    inputs: list[Dataset] = Factory(list)
    outputs: list[Dataset] = Factory(list)
    run_facets: dict[str, RunFacet] = Factory(dict)
    job_facets: dict[str, BaseFacet] = Factory(dict)

OpenLineage 集成本身负责使用诸如通用 Airflow 面、适当的事件时间和类型等信息来丰富它,从而创建适当的 OpenLineage RunEvent。

如何正确实现 OpenLineage 方法?

在 Operator 中实现 OpenLineage 时,有几点值得注意。

首先,不要在顶层导入与 OpenLineage 相关的对象,而是在 OL 方法本身中导入。这允许用户即使没有安装 OpenLineage Provider 也可以使用您的 Provider。

第二个重点是确保您的 Provider 返回符合 OpenLineage 规范的数据集名称。这允许 OpenLineage 消费者正确匹配从不同来源收集的数据集信息。命名约定在 OpenLineage 命名文档 中进行了描述。

第三,OpenLineage 实现不应浪费不使用它的用户的时间。这意味着不要在 execute 方法中进行它未使用的繁重处理或网络调用。更好的选择是将相关信息保存在 Operator 属性中 - 然后在 OpenLineage 方法中使用它。一个很好的例子是 BigQueryExecuteQueryOperator。它保存已执行的查询的 job_idsget_openlineage_facets_on_complete 然后可以调用 BigQuery API,请求这些表的血缘关系,并将其转换为 OpenLineage 格式。

第四,没有必要实现所有方法。如果在调用 execute 之前知道所有数据集,并且没有相关的运行时数据,则可能没有必要实现 get_openlineage_facets_on_complete - get_openlineage_facets_on_start 方法可以提供所有数据。反之,如果在执行之前所有内容都是未知的,则可能没有必要编写 _on_start 方法。类似地,如果没有相关的失败数据 - 或者失败条件未知,则可能不值得实现 get_openlineage_facets_on_failure。一般来说:如果没有 on_failure 方法,则会改为调用 on_complete 方法。如果没有 on_failureon_complete 方法,则会改为调用 on_start 方法(在任务启动和任务完成时都会调用)。如果没有 on_start 方法,则血缘信息将不会包含在 START 事件中,并且将在任务完成时调用 on_complete 方法。

如何测试 OpenLineage 方法?

在 Operator 中对 OpenLineage 集成进行单元测试与测试 Operator 本身非常相似。这些测试的目标是确保 get_openlineage_* 方法返回带有相关字段的正确 OperatorLineage 数据结构。建议模拟任何外部调用。测试的作者需要记住调用不同 OL 方法的条件是不同的。get_openlineage_facets_on_startexecute 之前调用,因此,不能依赖于在那里设置的值。

有关如何在本地排除 OpenLineage 故障的详细信息,请参阅 故障排除

没有现有的框架可以进行 OpenLineage 集成的系统测试,但实现此目的的最简单方法是将发出的事件(例如,使用 FileTransport)与预期事件进行比较。OpenLineage 系统测试作者的目标是提供事件键的预期字典。事件键标识从特定 Operator 和方法发送的事件:它们具有 <dag_id>.<task_id>.event.<event_type> 结构;始终可以通过这种方式识别从特定任务发送的特定事件。提供的事件结构不必包含结果事件中的所有字段。只能比较测试作者提供的字段;这允许仅检查特定测试关心的字段。它还允许跳过(半)随机生成的字段,例如 runIdeventTime,或者在 Airflow 中的 OpenLineage 上下文中始终相同的字段,例如 producer

示例

以下是针对 GcsToGcsOperator 的正确实现的 get_openlineage_facets_on_complete 方法示例。由于在 execute 方法中进行了一些处理,并且没有相关的失败数据,因此实现此单个方法就足够了。

def get_openlineage_facets_on_complete(self, task_instance):
    """
    Implementing _on_complete because execute method does preprocessing on internals.
    This means we won't have to normalize self.source_object and self.source_objects,
    destination bucket and so on.
    """
    from airflow.providers.common.compat.openlineage.facet import Dataset
    from airflow.providers.openlineage.extractors import OperatorLineage

    return OperatorLineage(
        inputs=[
            Dataset(namespace=f"gs://{self.source_bucket}", name=source)
            for source in sorted(self.resolved_source_objects)
        ],
        outputs=[
            Dataset(namespace=f"gs://{self.destination_bucket}", name=target)
            for target in sorted(self.resolved_target_objects)
        ],
    )

有关已实现的 OpenLineage 方法的更多示例,请查看 支持的类 的源代码。

自定义提取器

当处理您无法修改的 Operator(例如,第三方供应商提供的 Operator)但仍希望从中提取血缘关系时,建议使用此方法。如果您希望从自己的 Operator 中提取血缘关系,您可能更倾向于直接实现 OpenLineage 方法 中描述的 OpenLineage 方法。

此方法的工作原理是检测您的 DAG 正在使用的 Airflow Operator,并使用相应的 Extractor 类从中提取血缘数据。

接口

自定义 Extractor 必须继承自 BaseExtractor 并至少实现两个方法:_execute_extractionget_operator_classnames

BaseExtractor 定义了两个方法:extractextract_on_complete,它们被调用并用于提供实际的血缘数据。区别在于 extract 在 Operator 的 execute 方法之前调用,而 extract_on_complete 在之后调用。默认情况下,extract 调用自定义 Extractor 中实现的 _execute_extraction 方法,而 extract_on_complete 调用 extract 方法。如果您想提供在任务执行后可用的其他信息,您可以覆盖 extract_on_complete 方法。这可以用来提取 Operator 在其自身属性上设置的任何其他信息。一个很好的例子是 SnowflakeOperator,它在执行后设置 query_ids

get_operator_classnames 是一个类方法,用于提供您的 Extractor 可以从中获取血缘关系的 Operator 列表。

例如

@classmethod
def get_operator_classnames(cls) -> List[str]:
  return ['PostgresOperator']

如果 Operator 的名称与列表中的某个名称匹配,则将实例化 Extractor,并将 Operator 提供给 Extractor 的 self.operator 属性,并且将调用 extractextract_on_complete 方法。

这两种方法都返回 OperatorLineage 结构

@define
class OperatorLineage:
    """Structure returned from lineage extraction."""

    inputs: list[Dataset] = Factory(list)
    outputs: list[Dataset] = Factory(list)
    run_facets: dict[str, RunFacet] = Factory(dict)
    job_facets: dict[str, BaseFacet] = Factory(dict)

输入和输出是普通的 OpenLineage 数据集列表(openlineage.client.event_v2.Dataset)。

run_facetsjob_facets 是可选的 RunFacets 和 JobFacets 的字典,这些字典将附加到作业上 - 例如,如果您的 Operator 正在执行 SQL,您可能希望附加 SqlJobFacet

要了解有关 OpenLineage 中 facets 的更多信息,请参阅 自定义 Facets

注册自定义 Extractor

OpenLineage 集成不知道您已提供 Extractor,除非您注册它。

可以通过在 Airflow 配置中使用 extractors 选项来完成。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass

AIRFLOW__OPENLINEAGE__EXTRACTORS 环境变量是等效的。

AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'

或者,您可以使用空格分隔它们。如果您将它们作为 YAML 文件的一部分提供,这将很有用。

AIRFLOW__OPENLINEAGE__EXTRACTORS: >-
  full.path.to.FirstExtractor;
  full.path.to.SecondExtractor

请记住确保该路径对于调度程序和 worker 是可导入的。

调试自定义 Extractor

自定义 Extractor 有两个常见问题。

首先,是为 Airflow 配置中的 extractors 选项提供了错误的路径。该路径需要与您在代码中使用的路径完全相同。如果路径错误或无法从 worker 导入,则插件将无法加载 Extractor,并且不会为该 Operator 发出正确的 OpenLineage 事件。

第二个问题,可能更隐蔽,是来自 Airflow 的导入。由于 OpenLineage 代码在 Airflow worker 本身启动时被实例化,因此任何来自 Airflow 的导入都可能在不知不觉中循环依赖。这会导致 OpenLineage 提取失败。

为了避免此问题,请仅在本地(在 _execute_extractionextract_on_complete 方法中)从 Airflow 导入。如果需要导入进行类型检查,请在 typing.TYPE_CHECKING 后面保护它们。

测试自定义 Extractor

与所有代码一样,应测试自定义 Extractor。本节将提供有关编写测试的最重要数据结构的一些信息以及一些关于故障排除的说明。我们假设您事先了解如何编写自定义 Extractor。要了解更多关于 Operator 和 Extractor 如何在底层协同工作的信息,请查看 自定义 Extractor

在测试 Extractor 时,我们首先要验证是否正在创建 OperatorLineage 对象,特别是验证该对象是否使用正确的输入和输出数据集以及相关的 facets 构建。这是在 OpenLineage 中通过 pytest 完成的,使用适当的模拟和修补来连接和对象。查看 示例测试

测试每个 facet 也很重要,因为如果 facets 错误,UI 中的数据或图表可能会错误呈现。例如,如果在 Extractor 中错误地创建了 facet 名称,则 Operator 的任务将不会显示在血缘关系图中,从而在管道可观察性中产生间隙。

即使进行了单元测试,Extractor 可能仍然无法按预期运行。判断数据是否未正确传递的最简单方法是查看 UI 元素是否未在“血缘关系”选项卡中正确显示。

有关如何在本地排除 OpenLineage 故障的详细信息,请参阅 故障排除

示例

这是一个简单的 Extractor 示例,适用于在 BigQuery 中执行导出查询并将结果保存到 S3 文件的 Operator。在调用 Operator 的 execute 方法之前,我们已经知道一些信息,并且我们可以在 _execute_extraction 方法中提取一些血缘关系。在调用 Operator 的 execute 方法之后,在 extract_on_complete 中,我们可以简单地将一些额外的 Facets (例如,包含 Bigquery Job ID)附加到我们之前准备的内容中。这样,我们可以从 Operator 获取所有可能的信息。

请注意,这只是一个示例。有一些 OpenLineage 内置功能可以促进不同的流程,例如使用 SQL 解析器从 SQL 查询中提取列级血缘关系和输入/输出。

from airflow.models.baseoperator import BaseOperator
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from airflow.providers.common.compat.openlineage.facet import (
    Dataset,
    ExternalQueryRunFacet,
    SQLJobFacet,
)


class ExampleOperator(BaseOperator):
    def __init__(self, query, bq_table_reference, s3_path) -> None:
        self.bq_table_reference = bq_table_reference
        self.s3_path = s3_path
        self.s3_file_name = s3_file_name
        self._job_id = None

    def execute(self, context) -> Any:
        self._job_id = run_query(query=self.query)


class ExampleExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls):
        return ["ExampleOperator"]

    def _execute_extraction(self) -> OperatorLineage:
        """Define what we know before Operator's extract is called."""
        return OperatorLineage(
            inputs=[Dataset(namespace="bigquery", name=self.operator.bq_table_reference)],
            outputs=[Dataset(namespace=self.operator.s3_path, name=self.operator.s3_file_name)],
            job_facets={
                "sql": SQLJobFacet(
                    query="EXPORT INTO ... OPTIONS(FORMAT=csv, SEP=';' ...) AS SELECT * FROM ... "
                )
            },
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage:
        """Add what we received after Operator's extract call."""
        lineage_metadata = self.extract()
        lineage_metadata.run_facets = {
            "parent": ExternalQueryRunFacet(externalQueryId=task_instance.task._job_id, source="bigquery")
        }
        return lineage_metadata

有关 OpenLineage Extractor 的更多示例,请查看 BashExtractorPythonExtractor 的源代码。

手动注释的血缘关系

很少推荐使用此方法,仅在非常特殊的情况下使用,当无法从 Operator 本身提取某些血缘关系信息时。如果您想从自己的 Operator 中提取血缘关系,您可能更倾向于直接实现 OpenLineage 方法 中描述的 OpenLineage 方法。当处理您无法修改的 Operator(例如,第三方供应商提供的 Operator)但仍希望从中提取血缘关系时,请参阅 自定义 Extractor

Airflow 允许 Operator 通过 inlets 和 outlets 指定 Operator 的输入和输出来跟踪血缘关系。如果 OpenLineage 无法从 OpenLineage 方法或 Extractor 中找到任何成功的提取,则默认情况下会使用 inlets 和 outlets 作为输入/输出数据集。

Airflow 支持 inlets 和 outlets 为 Table、Column、File 或 User 实体,OpenLineage 也是如此。

示例

可以像下面的示例中那样使用 inlets 和 outlets 注释 Airflow DAG 中的 Operator

"""Example DAG demonstrating the usage of the extraction via Inlets and Outlets."""

import pendulum

from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.lineage.entities import Table, File, Column, User


t1 = Table(
    cluster="c1",
    database="d1",
    name="t1",
    owners=[User(email="[email protected]", first_name="Joe", last_name="Doe")],
)
t2 = Table(
    cluster="c1",
    database="d1",
    name="t2",
    columns=[
        Column(name="col1", description="desc1", data_type="type1"),
        Column(name="col2", description="desc2", data_type="type2"),
    ],
    owners=[
        User(email="[email protected]", first_name="Mike", last_name="Smith"),
        User(email="[email protected]", first_name="Theo"),
        User(email="[email protected]", last_name="Smith"),
        User(email="[email protected]"),
    ],
)
t3 = Table(
    cluster="c1",
    database="d1",
    name="t3",
    columns=[
        Column(name="col3", description="desc3", data_type="type3"),
        Column(name="col4", description="desc4", data_type="type4"),
    ],
)
t4 = Table(cluster="c1", database="d1", name="t4")
f1 = File(url="s3://bucket/dir/file1")


with DAG(
    dag_id="example_operator",
    schedule="@once",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    task1 = BashOperator(
        task_id="task_1_with_inlet_outlet",
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        inlets=[t1, t2],
        outlets=[t3],
    )

    task2 = BashOperator(
        task_id="task_2_with_inlet_outlet",
        bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
        inlets=[t3, f1],
        outlets=[t4],
    )

    task1 >> task2

if __name__ == "__main__":
    dag.cli()

从 Airflow Table 实体到 OpenLineage 数据集的转换方式如下: - table 实体的 CLUSTER 变为 OpenLineage 数据集的命名空间 - 数据集的名称由 {{DATABASE}}.{{NAME}} 组成,其中 DATABASENAME 是 Airflow Table 实体指定的属性。

自定义 Facets

要了解有关 OpenLineage 中 facets 的更多信息,请参阅 facet 文档。另请查看 可用的 facets 和一篇关于 使用 facets 进行扩展 的博客文章。

OpenLineage 规范可能不包含您编写提取器所需的所有 facets,在这种情况下,您将必须创建自己的 自定义 facets

您还可以使用 custom_run_facets Airflow 配置将您自己的自定义 facets 注入血缘关系事件的 run facet 中。

要采取的步骤,

  1. 编写一个返回自定义 facets 的函数。您可以编写任意多个自定义 facet 函数。

  2. 使用 custom_run_facets Airflow 配置注册这些函数。

Airflow OpenLineage 监听器将在血缘关系事件生成期间自动执行这些函数,并将它们的返回值附加到血缘关系事件中的 run facet 中。

编写自定义 facet 函数

  • 输入参数: 该函数应接受两个输入参数:TaskInstanceTaskInstanceState

  • 函数体: 执行生成自定义 facets 所需的逻辑。自定义 facets 必须继承自 RunFacet,以便为 facet 自动添加 _producer_schemaURL

  • 返回值: 要添加到血缘事件的自定义 facets。返回类型应为 dict[str, RunFacet]None。 如果您不想为某些条件添加自定义 facets,可以选择返回 None

自定义 facet 函数示例

import attrs
from airflow.models.taskinstance import TaskInstance, TaskInstanceState
from airflow.providers.common.compat.openlineage.facet import RunFacet


@attrs.define
class MyCustomRunFacet(RunFacet):
    """Define a custom facet."""

    name: str
    jobState: str
    uniqueName: str
    displayName: str
    dagId: str
    taskId: str
    cluster: str
    custom_metadata: dict


def get_my_custom_facet(
    task_instance: TaskInstance, ti_state: TaskInstanceState
) -> dict[str, RunFacet] | None:
    operator_name = task_instance.task.operator_name
    custom_metadata = {}
    if operator_name == "BashOperator":
        return None
    if ti_state == TaskInstanceState.FAILED:
        custom_metadata["custom_key_failed"] = "custom_value"
    job_unique_name = f"TEST.{task_instance.dag_id}.{task_instance.task_id}"
    return {
        "additional_run_facet": MyCustomRunFacet(
            name="test-lineage-namespace",
            jobState=task_instance.state,
            uniqueName=job_unique_name,
            displayName=f"{task_instance.dag_id}.{task_instance.task_id}",
            dagId=task_instance.dag_id,
            taskId=task_instance.task_id,
            cluster="TEST",
            custom_metadata=custom_metadata,
        )
    }

注册自定义 facet 函数

使用 custom_run_facets Airflow 配置,通过传递以分号分隔的函数完整导入路径字符串来注册自定义运行 facet 函数。

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS 环境变量是等效的。

AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'

注意

  • 自定义 facet 函数在 TaskInstance 的 START 和 COMPLETE/FAIL 时都会执行,并添加到相应的 OpenLineage 事件中。

  • 当创建 TaskInstance 状态的条件时,您应该使用提供的第二个参数(TaskInstanceState),它将包含任务应处于的状态。这可能与 ti.current_state() 不同,因为 OpenLineage 监听器可能会在 Airflow 数据库中更新 TaskInstance 的状态之前被调用。

  • 当同一个函数的路径被注册多次时,它仍然只会执行一次。

  • 当多个注册的函数返回重复的自定义 facet 键时,将随机选择一个函数的结果添加到血缘事件中。请避免使用重复的 facet 键,因为它可能会产生意外的行为。

Job 层次结构

Apache Airflow 具有固有的作业层次结构:DAG(大型且可独立调度的单元)包含较小的可执行任务。

OpenLineage 在其 Job 层次结构模型中反映了这种结构。

  • 在 DAG 调度时,会发出一个 START 事件。

  • 随后,按照 Airflow 的任务顺序,每个任务都会在 TaskInstance 启动时触发

    • START 事件。

    • 在完成时触发 COMPLETE/FAILED 事件。

  • 最后,在 DAG 终止时,会发出完成事件(COMPLETE 或 FAILED)。

TaskInstance 事件的 ParentRunFacet 引用了原始的 DAG 运行。

故障排除

在本地测试代码时,可以使用 Marquez 来检查正在发出或未发出的数据。使用 Marquez 将帮助您确定错误是由 Extractor 还是 API 引起的。如果数据正如预期的那样从 Extractor 发出,但没有到达 UI,那么 Extractor 正常,应该在 OpenLineage 中提出问题。但是,如果数据没有正确发出,则可能需要更多单元测试来覆盖 Extractor 的行为。Marquez 可以帮助您查明哪些 facets 没有正确形成,以便您知道在哪里添加测试覆盖率。

调试设置

为了进行调试,请确保 Airflow 日志级别 设置为 DEBUG,并且 OpenLineage 集成启用了 debug_mode。这将增加 Airflow 日志中的详细信息,并在 OpenLineage 事件中包含其他环境信息。

当寻求调试帮助时,请始终尝试提供以下信息

  • 将日志级别设置为 DEBUG 的 Airflow 调度器日志

  • 将日志级别设置为 DEBUG 的 Airflow worker 日志(任务日志)

  • 启用了 debug_mode 的 OpenLineage 事件

在哪里可以了解更多?

反馈

您可以通过 slack 联系我们并留下反馈!

如何贡献

我们欢迎您的贡献! OpenLineage 是一个正在积极开发中的开源项目,我们很乐意得到您的帮助!

听起来有趣吗? 查看我们的 新贡献者指南 以开始。

此条目是否有帮助?