执行器

执行器是运行任务实例的机制。它们具有通用的 API,并且是“可插拔”的,这意味着你可以根据安装需要交换执行器。

Airflow 一次只能配置一个执行器;这是通过配置文件[core]部分中的executor选项设置的。

内置执行器通过名称引用,例如

[core]
executor = KubernetesExecutor

注意

有关 Airflow 配置的更多信息,请参阅设置配置选项

如果你想检查当前设置了哪个执行器,可以使用airflow config get-value core executor命令

$ airflow config get-value core executor
SequentialExecutor

执行器类型

有两种类型的执行器——本地运行任务的执行器(在 scheduler 进程内)和远程运行任务的执行器(通常通过工作进程池)。Airflow 默认配置为 SequentialExecutor,它是一个本地执行器,也是执行的最简单选项。但是,SequentialExecutor 不适合生产,因为它不允许并行运行任务,因此某些 Airflow 功能(例如运行传感器)无法正常工作。对于小型单机生产安装,您应该改用 LocalExecutor,对于多机/云安装,您应该改用其中一个远程执行器。

本地执行器

远程执行器

注意

新的 Airflow 用户可能认为他们需要使用本地或远程执行器之一来运行单独的执行器进程。这是不正确的。执行器逻辑在调度程序进程内部运行,并且会根据所选执行器在本地运行任务或不运行任务。

编写自己的执行器

所有 Airflow 执行器都实现了一个公共接口,以便它们可插入,并且任何执行器都可以访问 Airflow 中的所有功能和集成。主要地,Airflow 调度程序使用此接口与执行器交互,但其他组件(如日志记录、CLI 和回填)也使用此接口。公共接口是 BaseExecutor。您可以查看代码以了解最详细和最新的接口,但下面概述了一些重要的要点。

注意

有关 Airflow 公共接口的更多信息,请参阅 Airflow 的公共接口

您可能希望编写自定义执行器的一些原因包括

  • 没有适合您特定用例的执行器,例如用于计算的特定工具或服务。

  • 您希望使用一个执行器,该执行器利用您首选云提供商的计算服务。

  • 您有一个私有工具/服务用于任务执行,该工具/服务仅对您或您的组织可用。

重要的 BaseExecutor 方法

这些方法不需要重写来实现您自己的执行器,但了解它们很有用

  • 心跳:Airflow 调度程序作业循环将定期在执行器上调用心跳。这是 Airflow 调度程序和执行器之间交互的主要点之一。此方法更新一些指标,触发新排队的任务执行,并更新正在运行/已完成任务的状态。

  • 队列命令:Airflow 执行器将调用 BaseExecutor 的此方法,以提供由执行器运行的任务。BaseExecutor 只是将 TaskInstances 添加到执行器中排队任务的内部列表中。

  • 获取事件缓冲区:Airflow 调度程序调用此方法以检索执行器正在执行的 TaskInstances 的当前状态。

  • 具有任务:调度程序使用此 BaseExecutor 方法来确定执行器是否已排队或正在运行特定任务实例。

  • 发送回调:将任何回调发送到在执行器上配置的接收器。

要实现的强制方法

必须至少覆盖以下方法,以便 Airflow 支持你的执行器

  • 同步:同步将在执行器心跳期间定期调用。实现此方法以更新执行器已知任务的状态。或者,尝试执行已从调度程序接收的排队任务。

  • 异步执行:异步执行命令。在此上下文中,命令是用于运行 Airflow 任务的 Airflow CLI 命令。此方法在调度程序定期运行的执行器心跳期间(经过几层)调用。实际上,此方法通常只是将任务排队到要运行的任务的内部或外部队列中(例如 KubernetesExecutor)。但也可以直接执行任务(例如 LocalExecutor)。这取决于执行器。

要实现的可选接口方法

不必覆盖以下方法即可拥有功能性的 Airflow 执行器。但是,实现它们可以带来一些强大的功能和稳定性

  • 开始:Airflow 调度程序(和回填)作业将在初始化执行器对象后调用此方法。执行器所需的任何其他设置都可以在此处完成。

  • end:当 Airflow 调度程序(和回填)作业终止时,它将调用此方法。应在此处完成运行作业所需的任何同步清理。

  • terminate:更强制地停止执行程序,甚至杀死/停止正在进行的任务,而不是同步等待完成。

  • cleanup_stuck_queued_tasks:如果任务在队列状态中停留的时间超过 task_queued_timeout,则调度程序将收集这些任务并通过此方法提供给执行程序,以便有机会处理它们(执行任何正常清理/终止),并返回任务实例以向用户显示警告消息。

  • try_adopt_task_instances:通过此方法向执行程序提供已放弃的任务(例如,来自已终止的调度程序作业),以便采用或以其他方式处理它们。应返回任何无法采用的任务(默认情况下,BaseExector 假设所有任务都无法采用)。

  • get_cli_commands:执行程序可以通过实现此方法向用户提供 CLI 命令,有关更多详细信息,请参见下面的 CLI 部分。

  • get_task_log:执行程序可以通过实现此方法向 Airflow 任务日志提供日志消息,有关更多详细信息,请参见下面的 日志记录 部分。

兼容性属性

BaseExecutor 类接口包含一组属性,Airflow 核心代码使用这些属性来检查执行程序兼容的功能。在编写自己的 Airflow 执行程序时,请务必根据你的用例正确设置这些属性。每个属性都只是一个布尔值,用于启用/禁用功能或指示执行程序支持/不支持该功能

  • supports_pickling:执行程序是否支持在执行之前从数据库中读取腌制的 DAG(而不是从文件系统中读取 DAG 定义)。

  • supports_sentry:执行程序是否支持 Sentry

  • is_local:执行程序是远程的还是本地的。请参见上面的 执行程序类型 部分。

  • is_single_threaded:执行程序是否是单线程的。这与支持哪些数据库后端特别相关。单线程执行程序可以在任何后端(包括 SQLite)上运行。

  • is_production:执行程序是否应用于生产目的。当用户使用非生产就绪执行程序时,会向他们显示 UI 消息。

  • change_sensor_mode_to_reschedule:以轮询模式运行 Airflow 传感器可能会阻塞执行程序的线程,在某些情况下还会阻塞 Airflow。

  • serve_logs:执行程序是否支持提供日志,请参阅 任务日志记录

CLI

执行程序可能会提供 CLI 命令,这些命令将通过实现 get_cli_commands 方法包含在 airflow 命令行工具中。例如,CeleryExecutorKubernetesExecutor 等执行程序会使用此机制。这些命令可用于设置必需的工作程序、初始化环境或设置其他配置。命令仅针对当前配置的执行程序提供。下面是执行程序实现 CLI 命令提供的伪代码示例

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
    sub_commands = [
        ActionCommand(
            name="command_name",
            help="Description of what this specific command does",
            func=lazy_load_command("path.to.python.function.for.command"),
            args=(),
        ),
    ]

    return [
        GroupCommand(
            name="my_cool_executor",
            help="Description of what this group of commands do",
            subcommands=sub_commands,
        ),
    ]

注意

目前,Airflow 命令名称空间没有严格的规则。开发人员应为其 CLI 命令使用足够唯一的名称,以免与其他 Airflow 执行程序或组件发生冲突。

注意

创建新执行程序或更新任何现有执行程序时,请务必不要在模块级别导入或执行任何昂贵的操作/代码。执行程序类在多个地方导入,如果导入速度较慢,这会对 Airflow 环境的性能产生负面影响,尤其是对于 CLI 命令。

日志记录

执行器可以通过实现 get_task_logs 方法来提供日志消息,这些消息将包含在 Airflow 任务日志中。如果执行环境在任务失败的情况下具有额外的上下文,这可能有助于执行环境本身而不是 Airflow 任务代码,这可能非常有用。从执行环境中包含设置/拆除日志也可能非常有用。 KubernetesExecutor 利用此功能来包含运行特定 Airflow 任务的 Pod 的日志,并将其显示在该 Airflow 任务的日志中。下面可以看到从执行器实现任务日志售卖的伪代码示例

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
    messages = []
    log = []
    try:
        res = helper_function_to_fetch_logs_from_execution_env(ti, try_number)
        for line in res:
            log.append(remove_escape_codes(line.decode()))
        if log:
            messages.append("Found logs from execution environment!")
    except Exception as e:  # No exception should cause task logs to fail
        messages.append(f"Failed to find logs from execution environment: {e}")
    return messages, ["\n".join(log)]

后续步骤

一旦你创建了一个实现 BaseExecutor 接口的新执行器类,你可以通过将 core.executor 配置值设置为执行器的模块路径来配置 Airflow 以使用它

[core]
executor = my_company.executors.MyCustomExecutor

注意

有关 Airflow 配置的更多信息,请参阅 设置配置选项,有关在 Airflow 中管理 Python 模块的更多信息,请参阅 模块管理

此条目是否有帮助?