OpenLineage 作业层级 & 宏¶
OpenLineage 插件中包含的宏会集成到 Airflow 的主要集合中,并可供使用。
OpenLineage 中的作业层级¶
当您需要在不同作业之间建立关系(例如在 DAG 之间,或在 Airflow 任务与外部系统之间)时,可能需要显式传递父作业信息。以下章节描述了不同情景以及是否需要用户操作。
DAG 与任务层级¶
Apache Airflow 本身具有作业层级:DAG 作为可独立调度的大单元,包含更小的可执行任务。OpenLineage 在其作业层级模型中映射了这种结构。在单个 DAG 内,OpenLineage 会自动追踪 DAG 与其任务之间的层级——TaskInstance 事件会自动包含一个 ParentRunFacet,该 facet 引用其来源的 DAG 运行作为父作业。
用户操作要求: 无。OpenLineage 自动在 DAG 运行及其任务实例之间建立父子关系。
TriggerDagRunOperator¶
TriggerDagRunOperator 用于触发指定 Dag ID 的 DAG 运行。
OpenLineage 行为
自 apache-airflow-providers-standard==1.10.0 起,默认情况下,operator 会自动将 OpenLineage 父作业信息注入被触发的 DAG 运行的配置中。这会在触发任务与被触发的 DAG 运行之间创建父子关系——被触发的 DAG Run 事件将带有引用触发任务的 ParentRunFacet。
除此之外,触发任务的 OpenLineage COMPLETE 事件还会在 AirflowRunFacet 中包含以下特定于 operator 的属性:
trigger_dag_id- 被触发的 DAG 的 Dag IDtrigger_run_id- 被触发的 DagRun 的 Dag Run ID
用户操作要求: 无。operator 会自动处理父信息注入。
如需禁用自动注入,请传入 openlineage_inject_parent_info=False
TriggerDagRunOperator(
task_id="trigger_downstream",
trigger_dag_id="downstream_dag",
openlineage_inject_parent_info=False, # Disable automatic injection
)
通过 API 触发 DAG¶
通过 Airflow REST API 触发 DAG 运行时,您可以在 DAG 运行的 conf 参数中手动传递父作业和根作业信息。当 DAG 运行配置中包含带有有效元数据的 openlineage 部分时,这些信息会被自动解析并转换为 DAG 运行事件中的 parentRunFacet,根信息也会随之传播到该 DAG 运行中的所有任务。
如果未提供 DAG 运行的 openlineage 配置,运行将不包含 parentRunFacet,所有任务的根默认指向该 DAG 运行本身。
DAG 运行配置中的 openlineage 字典应包含以下键:
父作业信息(必须同时提供三项才能创建父引用)
parentRunId — 直接父作业的唯一运行 ID(UUID)
parentJobName — 父作业的名称
parentJobNamespace — 父作业的命名空间
根作业信息(必须同时提供三项才能创建根引用;若缺失,则使用父作业作为根)
rootParentRunId — 顶层(根)作业的运行 ID(UUID)
rootParentJobName — 顶层(根)作业的名称
rootParentJobNamespace — 顶层(根)作业的命名空间
注意
强烈建议提供全部六个 OpenLineage 标识符(父与根),以确保完整的血缘追踪。如果缺少根信息,将使用父信息作为根;如果缺少任意一项父字段,则不会创建父 facet。部分或混合配置不受支持——要么全部提供父的三项,要么全部提供根的三项。
示例
curl -X POST "http://<AIRFLOW_HOST>/api/v2/dags/my_dag_name/dagRuns" \
-H "Content-Type: application/json" \
-d '{
"logical_date": "2019-08-24T14:15:22Z",
"conf": {
"openlineage": {
"parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072",
"parentJobNamespace": "prod_biz",
"parentJobName": "get_files",
"rootParentRunId": "9d3b14f7-de91-40b6-aeef-e887e2c7673e",
"rootParentJobNamespace": "prod_analytics",
"rootParentJobName": "generate_report_sales_e2e"
}
}
}'
用户操作要求: 是——您必须在 DAG 运行的 conf 中手动包含父作业和根作业信息。
ExternalTaskSensor¶
ExternalTaskSensor 用于等待另一个 DAG 中的任务。
OpenLineage 行为
传感器任务的 OpenLineage 事件会在 AirflowRunFacet 中包含以下特定于 operator 的属性:
external_dag_id- 被等待的外部任务所在的 DAG IDexternal_task_id- 被等待的单个外部任务的 Task ID(单任务等待时)external_task_ids- 被等待的任务 ID 列表(多任务等待时)external_task_group_id- 被等待的任务组 ID(等待任务组时)external_dates_filter- 检查外部任务完成时使用的日期过滤器
这些属性提供了跨 DAG 依赖的可视性,但不会创建父子作业关系。
用户操作要求: 不会自动创建父关系。如果需要在 OpenLineage 中追踪此关系,可考虑使用 TriggerDagRunOperator、通过 API 手动传递父信息,或利用上述属性自行构建。
Airflow 资产¶
Airflow 资产允许您基于任务更新资产的时间来调度 DAG(数据依赖)。当任务更新资产且另一个 DAG 基于该资产进行调度时,OpenLineage 会追踪资产之间的关系。
OpenLineage 行为
生成资产的任务(使用 outlets=[Asset(...)])以及基于资产调度的 DAG(使用 schedule=[Asset(...)])都会被 OpenLineage 记录为消费这些资产。
当 DAG 运行因资产消费而被触发时,OpenLineage 会在该 DAG 运行的事件(START 与 COMPLETE/FAIL)中添加 JobDependenciesRunFacet。该 facet 包含上游作业依赖,展示所有被消费的资产事件以及资产生产作业的 OpenLineage 作业/运行信息。每条依赖包括:
生产任务的作业标识符(OpenLineage 命名空间和名称)
生产任务实例的运行标识符(OpenLineage run ID),若可用
依赖类型:
IMPLICIT_ASSET_DEPENDENCY资产事件信息:从该作业消费的所有资产事件的详细信息,包括资产 URI、资产 ID、来源 DAG 运行 ID 以及其他元数据
请注意,消费 DAG 运行的事件**不会**添加 ParentRunFacet。相反,JobDependenciesRunFacet 提供了更灵活的表示方式,能够处理多个上游依赖(当一个 DAG 从多个生产任务消费资产)并保留每个资产事件的详细信息。
资产关系在 OpenLineage 中创建了数据血缘连接,显示哪些任务生产和消费资产。
"run": {
"facets": {
"jobDependencies": {
"upstream": [
{
"job": {
"name": "dag_asset_1_producer.produce_dataset_1",
"namespace": "airflow"
},
"run": {
"runId": "019b6ff1-f2f0-79bf-a797-0bbe6983c753"
},
"airflow": {
"asset_events": [
{
"asset_id": 1,
"asset_uri": "s3://first-bucket/ds1.csv",
"dag_run_id": "manual__2025-12-30T15:48:06+00:00",
"asset_event_id": 1
}
]
},
"dependency_type": "IMPLICIT_ASSET_DEPENDENCY"
},
{
"job": {
"name": "dag_asset_1_producer.produce_dataset_1",
"namespace": "airflow"
},
"run": {
"runId": "019b6ff4-4c80-7b5f-9f35-da28a44030df"
},
"airflow": {
"asset_events": [
{
"asset_id": 1,
"asset_uri": "s3://first-bucket/ds1.csv",
"dag_run_id": "manual__2025-12-30T15:50:40+00:00",
"asset_event_id": 2
}
]
},
"dependency_type": "IMPLICIT_ASSET_DEPENDENCY"
},
{
"job": {
"name": "dag_asset_2_producer.produce_dataset_2",
"namespace": "airflow"
},
"run": {
"runId": "019b6ff4-7f48-7ee5-aacb-a88072516b1e"
},
"airflow": {
"asset_events": [
{
"asset_id": 2,
"asset_uri": "gs://second-bucket/ds2.xlsx",
"dag_run_id": "manual__2025-12-30T15:50:53+00:00",
"asset_event_id": 3
}
]
},
"dependency_type": "IMPLICIT_ASSET_DEPENDENCY"
}
],
"downstream": []
}
}
}
用户操作要求: 无。数据血缘关系会自动被追踪。
通过 API 手动发出资产事件¶
当您通过 Airflow REST API 手动发出资产事件(例如资产在 Airflow 任务之外被更新)时,可以在资产事件的 extra 字段中包含 OpenLineage 作业信息。这样即使资产事件未直接关联到 Airflow TaskInstance,OpenLineage 仍能追踪资产生产者与消费者之间的关系。
OpenLineage 行为
若通过 API 手动创建资产事件且没有 TaskInstance 引用,OpenLineage 会在检查 TaskInstance 与 AssetEvent 源字段后,查找 asset_event.extra["openlineage"] 中的父作业信息。若存在,该信息将被用于在消费该资产的 DAG 运行事件的 JobDependenciesRunFacet 中创建作业依赖。
``asset_event.extra[“openlineage”]`` 中的必填字段
parentJobName(必填)- 生成该资产的父作业名称
parentJobNamespace(必填)- 父作业的命名空间
parentRunId(可选)- 父作业执行的运行 ID(UUID)。若提供,必须为有效的 UUID 格式
示例 API 调用
curl -X POST "http://<AIRFLOW_HOST>/api/v2/assets/events" \
-H "Content-Type: application/json" \
-d '{
"asset_id": 3,
"extra": {
"openlineage": {
"parentJobName": "external_system.data_processor",
"parentJobNamespace": "prod_etl",
"parentRunId": "3bb703d1-09c1-4a42-8da5-35a0b3216072"
}
}
}'
用户操作要求: 是——在通过 API 发出资产事件时,必须手动在 extra 字段中加入 OpenLineage 作业信息。
Airflow 之外的子作业¶
当 Airflow 任务触发外部系统(例如 Spark 应用、外部 API、其他调度器)时,需要显式为这些子作业配置父作业信息,以在 OpenLineage 中建立层级关系。
用户操作要求: 是——您必须使用下面的宏(或自动注入机制,例如 Spark)将父作业信息传递给子作业。
使用宏保留作业层级¶
要在血缘追踪中建立正确的作业层级,子作业(如 Spark 应用、外部系统或下游 DAG)需要了解其父作业(触发它们的 Airflow 任务)。这使得子作业的 OpenLineage 集成能够自动在其事件中添加 ParentRunFacet,将子作业链接到血缘图中的来源 Airflow 作业。
OpenLineage 提供者提供的宏可帮助您从 Airflow 任务向子作业传递父作业信息。lineage_* 宏描述的是 Airflow 任务本身,对子作业而言它即为父。lineage_root_* 宏则将 Airflow 任务的根信息转发给子作业,使子作业能够保持完整的作业层级以及根作业信息。
它们可以作为 Jinja 模板调用,例如:
血缘作业 & 运行宏¶
这些宏
lineage_job_namespace()- 返回给定 task_instance 的 OpenLineage 命名空间
lineage_job_name(task_instance)- 返回给定 task_instance 的 OpenLineage 作业名称
lineage_run_id(task_instance)- 返回给定 task_instance 生成的 OpenLineage 运行 ID
它们描述 Airflow 任务,应在配置子作业时作为父信息使用。从子作业的角度看,Airflow 任务即为父作业。
示例:在 Spark 应用中使用宏
当从 Airflow 触发 Spark 作业时,您可以使用这些宏传递父作业信息。
SparkSubmitOperator(
task_id="my_task",
application="/script.py",
conf={
"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
"spark.openlineage.parentJobName": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
"spark.openlineage.parentRunId": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
},
)
示例:在其他子作业中使用宏
这些宏可用于任何接受父作业信息的子作业。例如,您可以将其传递给外部系统、下游 DAG 或其他处理框架。
PythonOperator(
task_id="trigger_external_job",
python_callable=call_external_api,
op_kwargs={
"parent_job_namespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
"parent_job_name": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
"parent_run_id": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
},
)
血缘根宏¶
这些宏
lineage_root_job_namespace(task_instance)- 返回给定 task_instance 的根作业的 OpenLineage 命名空间
lineage_root_job_name(task_instance)- 返回给定 task_instance 的根作业的 OpenLineage 作业名称
lineage_root_run_id(task_instance)- 返回给定 task_instance 的根运行的 OpenLineage run ID
它们将 Airflow 任务的根信息转发给子作业,并应在配置子作业时作为根信息使用。这样子作业能够保持完整的作业层级,尤其在任务作为更大工作流的一部分执行时。
示例:在 Spark 应用中使用根宏
SparkSubmitOperator(
task_id="my_task",
application="/script.py",
conf={
"spark.openlineage.rootJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_root_job_namespace(task_instance) }}",
"spark.openlineage.rootJobName": "{{ macros.OpenLineageProviderPlugin.lineage_root_job_name(task_instance) }}",
"spark.openlineage.rootRunId": "{{ macros.OpenLineageProviderPlugin.lineage_root_run_id(task_instance) }}",
},
)
联合标识符¶
与其分别传递各个组件,您也可以使用返回完整信息的组合宏。当需要一次性将完整标识符传递给子作业时,这些宏非常有用。
lineage_parent_id(task_instance) 宏将父信息(命名空间、作业名称、运行 ID)合并为形如 {namespace}/{job_name}/{run_id} 的单个字符串。该字符串代表 Airflow 任务,应在配置子作业时作为父信息使用。
同理,lineage_root_parent_id(task_instance) 宏将根信息(根命名空间、根作业名称、根运行 ID)合并为 {namespace}/{job_name}/{run_id} 的单个字符串。该字符串转发 Airflow 任务的根信息,应在配置子作业时作为根信息使用。
def my_task_function(templates_dict, **kwargs):
parent_job_namespace, parent_job_name, parent_run_id = templates_dict["parentRun"].split("/")
root_job_namespace, root_job_name, root_run_id = templates_dict["rootRun"].split("/")
...
PythonOperator(
task_id="render_template",
python_callable=my_task_function,
templates_dict={
# Parent information as one string `<namespace>/<name>/<run_id>`
"parentRun": "{{ macros.OpenLineageProviderPlugin.lineage_parent_id(task_instance) }}",
# Root information as one string `<namespace>/<name>/<run_id>`
"rootRun": "{{ macros.OpenLineageProviderPlugin.lineage_root_parent_id(task_instance) }}",
},
provide_context=False,
dag=dag,
)
示例¶
当需要同时向子作业传递父和根血缘信息时,您可以在单个 operator 配置中组合所有宏。下面的示例展示了如何在 Spark 应用中同时使用父宏和根宏。
SparkSubmitOperator(
task_id="process_data",
application="/path/to/spark/app.py",
conf={
# Parent lineage information
"spark.openlineage.parentJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_job_namespace() }}",
"spark.openlineage.parentJobName": "{{ macros.OpenLineageProviderPlugin.lineage_job_name(task_instance) }}",
"spark.openlineage.parentRunId": "{{ macros.OpenLineageProviderPlugin.lineage_run_id(task_instance) }}",
# Root lineage information
"spark.openlineage.rootJobNamespace": "{{ macros.OpenLineageProviderPlugin.lineage_root_job_namespace(task_instance) }}",
"spark.openlineage.rootJobName": "{{ macros.OpenLineageProviderPlugin.lineage_root_job_name(task_instance) }}",
"spark.openlineage.rootRunId": "{{ macros.OpenLineageProviderPlugin.lineage_root_run_id(task_instance) }}",
},
)
有关更丰富的 Spark 示例以及自动注入选项,请参见 OpenLineage Spark 集成。