回调

日志记录和监控的一个有价值的组件是使用任务回调对给定任务的状态更改或给定 DAG 中所有任务的状态更改采取行动。例如,您可能希望在某些任务失败时发出警报,或让 DAG 中的最后一个任务在成功时调用回调。

注意

仅当任务状态因工作进程执行而更改时,才会调用回调函数。因此,通过命令行界面 (CLI) 或用户界面 (UI) 设置的任务更改不会执行回调函数。

警告

回调函数在任务完成后执行。回调函数中的错误将显示在调度程序日志中,而不是任务日志中。默认情况下,调度程序日志不会显示在 UI 中,而是在 $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log 中。

回调类型

有五种任务事件可以触发回调

名称

说明

on_success_callback

当任务成功时调用

on_failure_callback

当任务失败时调用

sla_miss_callback

当任务错过其定义的SLA时调用

on_retry_callback

当任务准备重试时调用

on_execute_callback

在任务开始执行之前调用。

on_skipped_callback

当任务正在运行且引发 AirflowSkipException 时调用。如果由于 DAG 中前面的分支决策或导致执行跳过的触发器规则,导致任务未开始执行,则明确不会调用此回调,因此任务执行永远不会被调度。

示例

在以下示例中,任何任务中的失败都会调用 task_failure_alert 函数,最后一个任务中的成功会调用 dag_success_alert 函数

import datetime
import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
):
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
    task1 >> task2 >> task3

注意

从 Airflow 2.6.0 开始,回调现在支持回调函数列表,允许用户指定在所需事件中执行多个函数。只需在定义 DAG/任务回调时将回调函数列表传递给回调参数:例如 on_failure_callback=[callback_func_1, callback_func_2]

文档代码context 中可用的变量的完整列表。

此条目是否有用?