Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册享早鸟票优惠!

可延迟的操作符与触发器

标准的 Operator(操作符)Sensor(传感器) 在整个运行期间都会占用一个完整的 worker slot(工作节点插槽),即使它们处于空闲状态。例如,如果您只有 100 个 worker slot 可用来运行任务,并且有 100 个 DAG 正在等待一个当前正在运行但空闲的 Sensor,那么您就 无法运行任何其他任务 - 即使您的整个 Airflow 集群实际上是空闲的。Sensor 的 reschedule 模式在一定程度上解决了这个问题,它允许 Sensor 只在固定时间间隔运行,但这不够灵活,并且只允许使用时间作为恢复的原因,而不是其他条件。

这就是 *Deferrable Operator*(可延迟操作符)的作用。当一个 Operator 除了等待别无他事时,它可以通过 *延迟* 来挂起自身,释放 worker 给其他进程使用。当 Operator 延迟时,执行会转移到 Triggerer(触发器),在那里将运行由 Operator 指定的 Trigger。Trigger 可以执行 Operator 所需的轮询或等待操作。然后,当 Trigger 完成轮询或等待后,它会发送信号通知 Operator 恢复执行。在延迟执行阶段,由于工作已卸载到 Triggerer,任务不再占用 worker slot,您拥有更多的空闲工作负载能力。默认情况下,处于延迟状态的任务不占用 pool slots。如果您希望它们占用,可以通过编辑相关 pool 来更改此设置。

*Trigger*(触发器)是小型、异步的 Python 代码段,设计用于在单个 Python 进程中运行。由于它们是异步的,因此可以在 Airflow 组件 *triggerer* 中高效地共存。

该过程的工作概述

  • 任务实例(运行中的 Operator)到达需要等待其他操作或条件的点,并使用绑定到恢复事件的 Trigger 延迟自身。这会释放 worker 以运行其他任务。

  • 新的 Trigger 实例由 Airflow 注册,并由 Triggerer 进程拾取。

  • Trigger 运行直到触发,此时其源任务将由 Scheduler(调度器)重新调度。

  • Scheduler 将任务排队以在 worker 节点上恢复。

作为 DAG 作者,您可以使用预先编写的可延迟 Operator,也可以编写自己的 Operator。然而,编写它们需要满足某些设计标准。

使用可延迟的操作符

如果您想使用 Airflow 自带的预先编写的可延迟 Operator,例如 TimeSensorAsync,则只需完成两个步骤

  • 确保您的 Airflow 安装运行至少一个 triggerer 进程以及正常的 scheduler

  • 在您的 DAG 中使用可延迟的 Operator/Sensor。

Airflow 会自动为您处理和实现延迟过程。

如果您正在升级现有 DAG 以使用可延迟 Operator,Airflow 包含与 API 兼容的 Sensor 变体,例如 TimeSensorAsync 对应 TimeSensor。将这些变体添加到您的 DAG 中即可使用可延迟 Operator,无需其他更改。

请注意,您不能在自定义 PythonOperator 或 TaskFlow Python 函数内部使用延迟能力。延迟功能仅适用于传统的、基于类的 Operator。

编写可延迟的操作符

编写可延迟 Operator 时,主要需要考虑以下几点

  • 您的 Operator 必须使用 Trigger 延迟自身。您可以使用 Airflow 核心中包含的 Trigger,或者编写一个自定义 Trigger。

  • 您的 Operator 在延迟期间将停止并在其 worker 中移除,并且不会自动保留任何状态。您可以通过指示 Airflow 在特定方法恢复 Operator 或通过传递特定的 kwargs 来保留状态。

  • 您可以多次延迟,也可以在 Operator 执行重要工作之前或之后延迟。或者,您可以在满足特定条件时延迟。例如,如果系统没有立即的响应。延迟完全由您控制。

  • 任何 Operator 都可以延迟;不需要在其类上进行特殊标记,并且不限于 Sensor。

  • 如果您想添加一个同时支持可延迟和不可延迟模式的 Operator 或 Sensor,建议将 deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False) 添加到 Operator 的 __init__ 方法中,并用它来决定是否在可延迟模式下运行 Operator。您可以通过 operator 部分的 default_deferrable 配置所有支持在可延迟和不可延迟模式之间切换的 Operator 和 Sensor 的 deferrable 默认值。这是一个支持两种模式的 Sensor 示例。

import time
from datetime import timedelta
from typing import Any

from airflow.configuration import conf
from airflow.sdk import BaseSensorOperator
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
    def __init__(
        self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.deferrable = deferrable

    def execute(self, context: Context) -> None:
        if self.deferrable:
            self.defer(
                trigger=TimeDeltaTrigger(timedelta(hours=1)),
                method_name="execute_complete",
            )
        else:
            time.sleep(3600)

    def execute_complete(
        self,
        context: Context,
        event: dict[str, Any] | None = None,
    ) -> None:
        # We have no more work to do here. Mark as complete.
        return

编写触发器

一个 *Trigger*(触发器)被编写为一个继承自 BaseTrigger 的类,并实现三个方法

  • __init__: 用于接收实例化它的 Operator 传递的参数的方法。从 2.10.0 版本开始,我们可以直接从预定义的 Trigger 开始任务执行。为了利用此功能,__init__ 中的所有参数必须是可序列化的。

  • run: 一个异步方法,用于运行其逻辑并生成(yield)一个或多个 TriggerEvent 实例作为异步生成器。

  • serialize: 返回重建此 Trigger 所需的信息,作为类路径和传递给 __init__ 的关键字参数组成的元组。

此示例展示了一个基本 Trigger 的结构,它是 Airflow DateTimeTrigger 的非常简化版本

import asyncio

from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone


class DateTimeTrigger(BaseTrigger):
    def __init__(self, moment):
        super().__init__()
        self.moment = moment

    def serialize(self):
        return ("airflow.providers.standard.triggers.temporal.DateTimeTrigger", {"moment": self.moment})

    async def run(self):
        while self.moment > timezone.utcnow():
            await asyncio.sleep(1)
        yield TriggerEvent(self.moment)

代码示例展示了几件事

  • __init__serialize 是成对编写的。Trigger 在由 Operator 作为其延迟请求的一部分提交时实例化一次,然后序列化并在运行该 Trigger 的任何 Triggerer 进程上重新实例化。

  • run 方法声明为 async def,因为它 *必须* 是异步的,并且使用 asyncio.sleep 而不是常规的 time.sleep(因为后者会阻塞进程)。

  • 当它发出事件时,它会包含 self.moment,这样如果此 Trigger 在多个主机上冗余运行,则可以对事件进行去重。

Trigger 可以根据您的需求复杂或简单,前提是它们符合设计约束。它们可以以高可用方式运行,并自动分配到运行 Triggerer 的主机上。我们鼓励您避免在 Trigger 中使用任何类型的持久状态。Trigger 应从其 __init__ 中获取所需的一切,以便它们可以自由序列化和移动。

如果您刚开始编写异步 Python,在编写您的 run() 方法时要非常小心。Python 的异步模型意味着如果代码在执行阻塞操作时没有正确地 await,它可能会阻塞整个进程。Airflow 会尝试检测阻塞进程的代码,并在 Triggerer 日志中发出警告。您可以在编写 Trigger 时通过设置环境变量 PYTHONASYNCIODEBUG=1 来启用 Python 的额外检查,以确保您编写的是非阻塞代码。在进行文件系统调用时要特别小心,因为如果底层文件系统是网络支持的,它可能会阻塞。

编写自定义 Trigger 时需要注意一些设计约束

  • run 方法 *必须是异步的*(使用 Python 的 asyncio),并在执行阻塞操作时正确地 await

  • run 必须 yield 其 TriggerEvent,而不是 return 它们。如果在至少生成一个事件之前返回,Airflow 将认为这是一个错误,并使任何等待它的任务实例失败。如果它抛出异常,Airflow 也会使任何依赖的任务实例失败。

  • 您应该假定 Trigger 实例可以运行 *不止一次*。如果发生网络分区,Airflow 在分离的机器上重新启动 Trigger,可能会发生这种情况。因此,您必须注意副作用。例如,您可能不想使用 Trigger 来插入数据库行。

  • 如果您的 Trigger 设计为发出多个事件(目前不支持),那么每个发出的事件 *必须* 包含一个 payload,如果在多个地方运行 Trigger,该 payload 可用于去重事件。如果您只触发一个事件并且不需要将信息传回 Operator,则可以将 payload 设置为 None

  • Trigger 可以突然从一个 Triggerer 服务中移除,并在新的服务上启动。例如,如果子网发生变化导致网络分区,或者进行部署时。如果需要,您可以实现 cleanup 方法,无论 Trigger 正常退出还是发生其他情况,此方法都会在 run 之后被调用。

  • 为了使 Trigger 的任何更改生效,*triggerer* 需要在 Trigger 修改后重新启动。

  • 您的 Trigger 不能来自 DAG bundle - sys.path 上的其他任何位置都可以。Triggerer 在运行 Trigger 时不会初始化任何 bundle。

注意

目前 Trigger 仅在其第一个事件触发后使用,因为它们仅用于恢复延迟的任务,并且任务在第一个事件触发后恢复。然而,Airflow 计划未来允许从 Trigger 启动 DAG,届时多事件 Trigger 将更有用。

触发器中的敏感信息

从 Airflow 2.9.0 开始,Trigger 的 kwargs 在存储到数据库之前会被序列化和加密。这意味着您传递给 Trigger 的任何敏感信息都将以加密形式存储在数据库中,并在从数据库读取时解密。

触发延迟

如果您想在 Operator 的任何位置触发延迟,可以调用 self.defer(trigger, method_name, kwargs, timeout)。这会为 Airflow 引发一个特殊异常。参数如下

  • trigger: 您希望延迟到的 Trigger 实例。它将被序列化到数据库中。

  • method_name: 您希望 Airflow 在恢复时调用的 Operator 方法名称。

  • kwargs:(可选)调用方法时传递的额外关键字参数。默认为 {}

  • timeout:(可选)一个 timedelta,指定此延迟将失败并导致任务实例失败的超时时间。默认为 None,表示没有超时。

以下是 Sensor 如何触发延迟的基本示例

from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger

if TYPE_CHECKING:
    from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
    def execute(self, context: Context) -> None:
        self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete")

    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
        # We have no more work to do here. Mark as complete.
        return

当您选择延迟时,您的 Operator 将在该点停止执行,并从其当前 worker 中移除。不会保留任何状态,例如局部变量或在 self 上设置的属性。当您的 Operator 恢复时,它将作为 Operator 的新实例恢复。将状态从 Operator 的旧实例传递到新实例的唯一方法是使用 method_namekwargs

当您的 Operator 恢复时,Airflow 会将 context 对象和 event 对象添加到传递给 method_name 方法的 kwargs 中。这个 event 对象包含触发您的 Operator 恢复的 Trigger 事件的 payload。根据 Trigger 的不同,这可能对您的 Operator 有用,例如状态码或用于获取结果的 URL。或者,它可能是无关紧要的信息,例如日期时间。然而,您的 method_name 方法 *必须* 接受 contextevent 作为关键字参数。

如果您的 Operator 从其首次新的 execute() 方法或由 method_name 指定的后续方法返回,它将被视为已完成并结束执行。

让我们更深入地了解上面的 WaitOneHourSensor 示例。这个 Sensor 只是 Trigger 的一个简单包装。它会延迟到 Trigger,并指定当 Trigger 触发时返回到的不同方法。当它立即返回时,它会标记该 Sensor 为成功。

self.defer 调用会抛出 TaskDeferred 异常,因此它可以在 Operator 代码中的任何位置工作,即使嵌套在 execute() 方法深处。您也可以手动抛出 TaskDeferred,它使用与 self.defer 相同的参数。

Operator 的 execution_timeout 是根据 *总运行时间* 确定的,而不是延迟之间的单个执行时间。这意味着如果设置了 execution_timeout,Operator 可能会在延迟期间或延迟后运行期间失败,即使它只恢复了几秒钟。

多次延迟

想象一个场景:您希望 Operator 迭代一个长度可变的列表项,并延迟处理每个项。

例如,向数据库提交多个查询,或处理多个文件。

如果您希望 Operator 只有一个入口点,可以将 method_name 设置为 execute,但它也必须接受 event 作为可选关键字参数。

以下是实现此功能的概要。

import asyncio

from airflow.sdk import BaseOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent


class MyItemTrigger(BaseTrigger):
    def __init__(self, item):
        super().__init__()
        self.item = item

    def serialize(self):
        return (self.__class__.__module__ + "." + self.__class__.__name__, {"item": self.item})

    async def run(self):
        result = None
        try:
            # Somehow process the item to calculate the result
            ...
            yield TriggerEvent({"result": result})
        except Exception as e:
            yield TriggerEvent({"error": str(e)})


class MyItemsOperator(BaseOperator):
    def __init__(self, items, **kwargs):
        super().__init__(**kwargs)
        self.items = items

    def execute(self, context, current_item_index=0, event=None):
        last_result = None
        if event is not None:
            # execute method was deferred
            if "error" in event:
                raise Exception(event["error"])
            last_result = event["result"]
            current_item_index += 1

        try:
            current_item = self.items[current_item_index]
        except IndexError:
            return last_result

        self.defer(
            trigger=MyItemTrigger(item),
            method_name="execute",  # The trigger will call this same method again
            kwargs={"current_item_index": current_item_index},
        )

从任务开始时触发延迟

在 2.10.0 版本中新增。

如果您想将任务直接延迟到 Triggerer 而不进入 worker,可以将类级别属性 start_from_trigger 设置为 True,并为您的可延迟 Operator 添加一个带有 StartTriggerArgs 对象的类级别属性 start_trigger_args,该对象包含以下 4 个属性

  • trigger_cls: 您的 Trigger 类可导入路径。

  • trigger_kwargs: 初始化 trigger_cls 时要传递的关键字参数。**请注意,所有参数都需要可由 Airflow 序列化。这是此功能的主要限制。**

  • next_method: 您希望 Airflow 在恢复时调用的 Operator 方法名称。

  • next_kwargs: 调用 next_method 时传递的额外关键字参数。

  • timeout:(可选)一个 timedelta,指定此延迟将失败并导致任务实例失败的超时时间。默认为 None,表示没有超时。

在 Sensor 部分,我们需要提供 TimeDeltaTrigger 的路径作为 trigger_cls

from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

if TYPE_CHECKING:
    from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
        trigger_kwargs={"moment": timedelta(hours=1)},
        next_method="execute_complete",
        next_kwargs=None,
        timeout=None,
    )
    start_from_trigger = True

    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
        # We have no more work to do here. Mark as complete.
        return

start_from_triggertrigger_kwargs 也可以在实例级别修改,以实现更灵活的配置。

from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

if TYPE_CHECKING:
    from airflow.utils.context import Context


class WaitHoursSensor(BaseSensorOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
        trigger_kwargs={"moment": timedelta(hours=1)},
        next_method="execute_complete",
        next_kwargs=None,
        timeout=None,
    )
    start_from_trigger = True

    def __init__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None:
        super().__init__(*args, **kwargs)
        self.start_trigger_args.trigger_kwargs = {"hours": 2}
        self.start_from_trigger = True

    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
        # We have no more work to do here. Mark as complete.
        return

映射任务的初始化阶段发生在 Scheduler 将它们提交给 Executor 之后。因此,此功能提供的动态任务映射支持有限,其用法也与标准实践不同。要启用动态任务映射支持,您需要在 __init__ 方法中定义 start_from_triggertrigger_kwargs。**请注意,您不需要同时定义这两个参数才能使用此功能,但需要使用完全相同的参数名称。** 例如,如果您将参数定义为 t_kwargs 并将此值赋给 self.start_trigger_args.trigger_kwargs,它将不会产生任何效果。当映射 start_from_trigger 设置为 True 的任务时,整个 __init__ 方法将被跳过。Scheduler 将使用 partialexpand 中提供的 start_from_triggertrigger_kwargs(如果未提供,则回退到类属性中的值)来确定是否以及如何将任务提交给 Executor 或 Triggerer。请注意,在此阶段不会解析 XCom 值。

Trigger 执行完成后,任务可能会被发送回 worker 执行 next_method,或者任务实例可能直接结束。(参考 从触发器退出延迟任务)如果任务被发送回 worker,__init__ 方法中的参数在 next_method 执行之前仍然会生效,但它们不会影响 Trigger 的执行。

from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

if TYPE_CHECKING:
    from airflow.utils.context import Context


class WaitHoursSensor(BaseSensorOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
        trigger_kwargs={"moment": timedelta(hours=1)},
        next_method="execute_complete",
        next_kwargs=None,
        timeout=None,
    )
    start_from_trigger = True

    def __init__(
        self,
        *args: list[Any],
        trigger_kwargs: dict[str, Any] | None,
        start_from_trigger: bool,
        **kwargs: dict[str, Any],
    ) -> None:
        # This whole method will be skipped during dynamic task mapping.

        super().__init__(*args, **kwargs)
        self.start_trigger_args.trigger_kwargs = trigger_kwargs
        self.start_from_trigger = start_from_trigger

    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
        # We have no more work to do here. Mark as complete.
        return

这将扩展为 2 个任务,其“hours”参数分别设置为 1 和 2。

WaitHoursSensor.partial(task_id="wait_for_n_hours", start_from_trigger=True).expand(
    trigger_kwargs=[{"hours": 1}, {"hours": 2}]
)

从触发器退出延迟任务

在 2.10.0 版本中新增。

如果您想直接从 Triggerer 退出任务而无需进入 worker,可以为您的可延迟 Operator 指定实例级别属性 end_from_trigger,并附带 Operator 的属性(如上所述)。这可以节省启动新 worker 所需的一些资源。

Trigger 可以有两个选项:要么将执行发送回 worker,要么直接结束任务实例。如果 Trigger 本身结束任务实例,则 method_name 无关紧要,可以为 None。否则,提供任务恢复执行时应使用的 method_name

class WaitFiveHourSensorAsync(BaseSensorOperator):
    # this sensor always exits from trigger.
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.end_from_trigger = True

    def execute(self, context: Context) -> NoReturn:
        self.defer(
            method_name=None,
            trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
        )

TaskSuccessEventTaskFailureEvent 是可用于直接结束任务实例的两个事件。这将任务标记为 task_instance_state 状态,并在适用时可选地推送 xcom。以下是使用这些事件的示例

class WaitFiveHourTrigger(BaseTrigger):
    def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
        super().__init__()
        self.duration = duration
        self.end_from_trigger = end_from_trigger

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return (
            "your_module.WaitFiveHourTrigger",
            {"duration": self.duration, "end_from_trigger": self.end_from_trigger},
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        await asyncio.sleep(self.duration.total_seconds())
        if self.end_from_trigger:
            yield TaskSuccessEvent()
        else:
            yield TriggerEvent({"duration": self.duration})

在上述示例中,如果 end_from_trigger 设置为 True,Trigger 将通过 yield TaskSuccessEvent 直接结束任务实例。否则,它将使用 Operator 中指定的方法恢复任务实例。

注意

从 Trigger 退出仅在可延迟 Operator 未集成监听器时有效。目前,当可延迟 Operator 将 end_from_trigger 属性设置为 True 并集成监听器时,它会在解析期间引发异常以指示此限制。编写自定义 Trigger 时,请确保如果从插件添加了监听器,则 Trigger 未设置为直接结束任务实例。如果 Trigger 的作者将 end_from_trigger 属性更改为其他属性,DAG 解析不会引发任何异常,并且依赖于此任务的监听器将不起作用。此限制将在未来版本中解决。

高可用性

Trigger 被设计为在高可用性(HA)架构中工作。如果您想运行高可用性设置,请在多个主机上运行多个 triggerer 副本。就像 scheduler 一样,它们通过正确的锁定和 HA 自动共存。

根据 Trigger 执行的工作量,单个 triggerer 主机可以容纳数百到数万个 Trigger。默认情况下,每个 triggerer 具有 1000 个 Trigger 的容量,可以尝试同时运行。您可以使用 --capacity 参数更改可同时运行的 Trigger 数量。如果您尝试运行的 Trigger 数量超过所有 triggerer 进程的总容量,部分 Trigger 将延迟运行,直到其他 Trigger 完成。

Airflow 尝试仅在一个地方同时运行 Trigger,并维护与当前运行的所有 triggerer 的心跳。如果 triggerer 死亡或与其运行 Airflow 数据库的网络分区,Airflow 会自动重新调度在该主机上的 Trigger 在其他地方运行。Airflow 会等待 (2.1 * triggerer.job_heartbeat_sec) 秒,等待机器重新出现,然后才重新调度 Trigger。

这意味着 Trigger 可能(但不常见)同时在多个地方运行。然而,此行为已设计到 Trigger 契约中,并且是预期行为。Airflow 会对 Trigger 同时在多个地方运行时触发的事件进行去重,因此此过程对您的 Operator 是透明的。

请注意,您运行的每个额外 triggerer 都会导致与您的数据库建立一个额外的持久连接。

传感器中 Mode=’reschedule’ 与 Deferrable=True 的区别

在 Airflow 中,Sensor 会等待特定条件满足后才继续执行下游任务。Sensor 有两种管理空闲期的方式:mode='reschedule'deferrable=True。由于 mode='reschedule' 是 Airflow 中 BaseSensorOperator 特有的参数,它允许 Sensor 在条件不满足时重新调度自身。deferrable=True 是某些 Operator 用来指示任务可以稍后重试(或延迟)的约定,但它不是 Airflow 内置的参数或模式。延迟任务的实际行为取决于具体的 Operator 实现。

mode=’reschedule’

deferrable=True

持续重新调度自身,直到条件满足

空闲时暂停执行,条件变化时恢复

资源使用较高(重复执行)

资源使用较低(空闲时暂停,释放 worker slot)

预期条件会随时间变化(例如文件创建)

等待外部事件或资源(例如 API 响应)

内置的重新调度功能

需要自定义逻辑来延迟任务和处理外部变化

此内容有帮助吗?