血缘¶
注意
血缘支持处于非常实验性的阶段,可能会发生变化。
Airflow 提供了一个强大的功能,用于跟踪任务之间以及任务中使用的 Hook 的数据血缘。此功能可帮助您了解数据在整个 Airflow 管道中的流向。
HookLineageCollector
的全局实例充当收集血缘信息的中央枢纽。Hook 可以将它们交互的资产的详细信息发送到此收集器。然后,收集器使用这些数据来构建符合 AIP-60 的资产,这是一种描述资产的标准格式。
from airflow.lineage.hook import get_hook_lineage_collector
class CustomHook(BaseHook):
def run(self):
# run actual code
collector = get_hook_lineage_collector()
collector.add_input_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/in"})
collector.add_output_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/out"})
通过 HookLineageCollector
收集的血缘数据可以使用在 Airflow 插件中注册的 HookLineageReader
实例访问。
from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin
class CustomHookLineageReader(HookLineageReader):
def get_inputs(self):
return self.lineage_collector.collected_assets.inputs
class HookLineageCollectionPlugin(AirflowPlugin):
name = "HookLineageCollectionPlugin"
hook_lineage_readers = [CustomHookLineageReader]
如果 Airflow 中未注册 HookLineageReader
,则会改用默认的 NoOpCollector
。此收集器不会创建符合 AIP-60 的资产或收集血缘信息。