Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册以获得早鸟票!

回调

任务回调是日志和监控的一个重要组成部分,用于在给定任务状态变化时或给定 DAG 中所有任务状态变化时采取行动。例如,您可能希望在某些任务失败时发出警报,或者让 DAG 中的最后一个任务在成功时调用回调函数。

注意

回调函数仅在任务状态因 worker 执行而改变时被调用。因此,通过命令行界面 (CLI) 或用户界面 (UI) 设置的任务更改不会执行回调函数。

警告

回调函数在任务完成后执行。回调函数中的错误将出现在调度器日志中,而不是任务日志中。默认情况下,调度器日志不会在 UI 中显示,而可以在 $AIRFLOW_HOME/logs/scheduler/latest/DAG_FILE.py.log 中找到。

回调类型

有五种任务事件类型可以触发回调

名称

描述

on_success_callback

当任务成功时调用

on_failure_callback

当任务失败时调用

on_retry_callback

当任务准备重试时调用

on_execute_callback

在任务开始执行前立即调用。

on_skipped_callback

当任务正在运行并抛出 AirflowSkipException 时调用。明确地说,如果任务由于 DAG 中前一个分支决策或触发规则导致跳过执行而从未被调度执行,则不会调用此函数。

示例

在以下示例中,任何任务失败都会调用 task_failure_alert 函数,最后一个任务成功时会调用 dag_success_alert 函数

import datetime
import pendulum

from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
):
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
    task1 >> task2 >> task3

注意

自 Airflow 2.6.0 起,回调现在支持回调函数列表,允许用户指定多个函数在所需的事件发生时执行。只需在定义 DAG/任务回调时将回调函数列表传递给回调参数:例如 on_failure_callback=[callback_func_1, callback_func_2]

context 中可用的变量完整列表见文档代码

本条目有帮助吗?