创建通知程序¶
BaseNotifier
是一个抽象类,它为使用各种 on_*__callback
在 Airflow 中发送通知提供了一个基本结构。它旨在供提供商扩展并根据其特定需求进行自定义。
要扩展 BaseNotifier 类,您需要创建一个继承自它的新类。在这个新类中,您应该使用您自己的发送通知的实现来覆盖 notify
方法。 notify
方法采用一个参数,即 Airflow 上下文,其中包含有关当前任务和执行的信息。
您还可以设置 template_fields
属性以指定哪些属性应作为模板呈现。
以下是如何创建 Notifier 类的示例
from airflow.notifications.basenotifier import BaseNotifier
from my_provider import send_message
class MyNotifier(BaseNotifier):
template_fields = ("message",)
def __init__(self, message):
self.message = message
def notify(self, context):
# Send notification here, below is an example
title = f"Task {context['task_instance'].task_id} failed"
send_message(title, self.message)
使用通知器¶
一旦您有了通知器实现,您就可以在 DAG
定义中使用它,方法是将其作为参数传递给 on_*_callbacks
。例如,您可以将其与 on_success_callback
或 on_failure_callback
一起使用,以根据任务或 DAG 运行的状态发送通知。
以下是如何使用上述通知器的示例
from datetime import datetime
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from myprovider.notifier import MyNotifier
with DAG(
dag_id="example_notifier",
start_date=datetime(2022, 1, 1),
schedule_interval=None,
on_success_callback=MyNotifier(message="Success!"),
on_failure_callback=MyNotifier(message="Failure!"),
):
task = BashOperator(
task_id="example_task",
bash_command="exit 1",
on_success_callback=MyNotifier(message="Task Succeeded!"),
)
有关社区管理的通知器的列表,请参阅 通知。