回调¶
日志记录和监控的一个有价值的组件是使用任务回调对给定任务的状态更改或给定 DAG 中所有任务的状态更改采取行动。例如,您可能希望在某些任务失败时发出警报,或让 DAG 中的最后一个任务在成功时调用回调。
警告
回调函数在任务完成后执行。回调函数中的错误将显示在调度程序日志中,而不是任务日志中。默认情况下,调度程序日志不会显示在 UI 中,而是在 $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log
中。
回调类型¶
有五种任务事件可以触发回调
名称 |
说明 |
---|---|
|
当任务成功时调用 |
|
当任务失败时调用 |
|
当任务错过其定义的SLA时调用 |
|
当任务准备重试时调用 |
|
在任务开始执行之前调用。 |
|
当任务正在运行且引发 AirflowSkipException 时调用。如果由于 DAG 中前面的分支决策或导致执行跳过的触发器规则,导致任务未开始执行,则明确不会调用此回调,因此任务执行永远不会被调度。 |
示例¶
在以下示例中,任何任务中的失败都会调用 task_failure_alert
函数,最后一个任务中的成功会调用 dag_success_alert
函数
import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_alert,
tags=["example"],
):
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
task1 >> task2 >> task3
注意
从 Airflow 2.6.0 开始,回调现在支持回调函数列表,允许用户指定在所需事件中执行多个函数。只需在定义 DAG/任务回调时将回调函数列表传递给回调参数:例如 on_failure_callback=[callback_func_1, callback_func_2]