Airflow 的监听器插件¶
Airflow 具有一项功能,允许使用插件添加监听器来监视和跟踪任务状态。
这是一个简单的 Airflow 监听器插件示例,可帮助跟踪任务状态并收集有关任务、DAG 运行和 DAG 的有用元数据信息。
这是一个 Airflow 示例插件,允许创建 Airflow 的监听器插件。 此插件通过使用 SQLAlchemy 的事件机制工作。 它会监视表中任务实例状态的更改并触发事件。 这将通知所有 DAG 中的所有任务。
在此插件中,对象引用派生自基类 airflow.plugins_manager.AirflowPlugin
。
监听器插件在底层使用 pluggy 应用程序。 Pluggy 是一个为 Pytest 构建的插件管理和钩子调用应用程序。 Pluggy 启用函数钩子,因此它允许构建“可插入”的系统,并通过该钩子进行自定义。
- 使用此插件,可以监听以下事件
任务实例处于运行状态。
任务实例处于成功状态。
任务实例处于失败状态。
DAG 运行处于运行状态。
DAG 运行处于成功状态。
DAG 运行处于失败状态。
在诸如 airflow job、调度程序或 backfilljob 等事件之前启动
在诸如 airflow job、调度程序或 backfilljob 等事件之前停止
监听器注册¶
具有监听器对象对象引用的监听器插件注册为 Airflow 插件的一部分。 以下是我们实现新监听器的框架
from airflow.plugins_manager import AirflowPlugin
# This is the listener file created where custom code to monitor is added over hookimpl
import listener
class MetadataCollectionPlugin(AirflowPlugin):
name = "MetadataCollectionPlugin"
listeners = [listener]
接下来,我们可以检查添加到 listener
中的代码,并查看每个监听器的实现方法。 实现后,监听器部分将在所有 DAG 的所有任务执行期间执行
作为参考,这里是 listener.py
类中的插件代码,显示了数据库中的表列表
此示例在任务实例处于运行状态时进行侦听
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: TaskInstance, session):
"""
This method is called when task state changes to RUNNING.
Through callback, parameters like previous_task_state, task_instance object can be accessed.
This will give more information about current task_instance that is running its dag_run,
task and dag information.
"""
print("Task instance is in running state")
print(" Previous state of the Task instance:", previous_state)
state: TaskInstanceState = task_instance.state
name: str = task_instance.task_id
start_date = task_instance.start_date
dagrun = task_instance.dag_run
dagrun_status = dagrun.state
task = task_instance.task
if TYPE_CHECKING:
assert task
dag = task.dag
dag_name = None
if dag:
dag_name = dag.dag_id
print(f"Current task name:{name} state:{state} start_date:{start_date}")
print(f"Dag name:{dag_name} and current dag run status:{dagrun_status}")
类似地,可以实现用于在 task_instance 成功和失败后进行侦听的代码。
此示例在 dag 运行更改为失败状态时进行侦听
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
external_trigger = dag_run.external_trigger
print(f"Dag information:{dag_id} Run id: {run_id} external trigger: {external_trigger}")
print(f"Failed with message: {msg}")
类似地,可以实现用于在 dag_run 成功后以及在运行状态期间进行侦听的代码。
添加监听器实现所需的监听器插件文件将作为 Airflow 插件的一部分添加到 $AIRFLOW_HOME/plugins/
文件夹中,并在 Airflow 启动期间加载。