Airflow 峰会 2025 将于 10 月 07 日至 09 日举行。立即注册以获取早鸟票!

血缘

注意

血缘支持处于非常实验性的阶段,可能会发生变化。

Airflow 提供了一个强大的功能,用于跟踪任务之间以及任务中使用的 Hook 的数据血缘。此功能可帮助您了解数据在整个 Airflow 管道中的流向。

HookLineageCollector 的全局实例充当收集血缘信息的中央枢纽。Hook 可以将它们交互的资产的详细信息发送到此收集器。然后,收集器使用这些数据来构建符合 AIP-60 的资产,这是一种描述资产的标准格式。

from airflow.lineage.hook import get_hook_lineage_collector


class CustomHook(BaseHook):
    def run(self):
        # run actual code
        collector = get_hook_lineage_collector()
        collector.add_input_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/in"})
        collector.add_output_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/out"})

通过 HookLineageCollector 收集的血缘数据可以使用在 Airflow 插件中注册的 HookLineageReader 实例访问。

from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin


class CustomHookLineageReader(HookLineageReader):
    def get_inputs(self):
        return self.lineage_collector.collected_assets.inputs


class HookLineageCollectionPlugin(AirflowPlugin):
    name = "HookLineageCollectionPlugin"
    hook_lineage_readers = [CustomHookLineageReader]

如果 Airflow 中未注册 HookLineageReader,则会改用默认的 NoOpCollector。此收集器不会创建符合 AIP-60 的资产或收集血缘信息。

本条目是否有帮助?