Airflow 峰会 2025 即将于 10 月 07-09 日召开。立即注册获取早鸟票!

任务

任务 (Task) 是 Airflow 中的基本执行单元。任务被组织到 DAGs 中,并通过设置上游和下游依赖关系来表达它们的运行顺序。

任务有三种基本类型

  • 操作符 (Operators),是预定义的任务模板,您可以快速组合它们来构建 DAG 的大部分。

  • 传感器 (Sensors),是 Operator 的特殊子类,完全用于等待外部事件发生。

  • TaskFlow 修饰的 @task,是打包成任务的自定义 Python 函数。

内部来说,这些实际上都是 Airflow BaseOperator 的子类,任务(Task)和操作符(Operator)的概念在某种程度上可以互换,但将它们视为独立概念很有用——本质上,操作符和传感器是*模板*,当您在 DAG 文件中调用它们时,您正在创建一个任务。

关系

使用任务的关键部分在于定义它们之间的关系——它们的*依赖关系*,或者正如我们在 Airflow 中所说的,它们的*上游*任务和*下游*任务。您首先声明任务,然后声明它们的依赖关系。

注意

我们将*上游*任务称为直接位于另一个任务之前的任务。我们以前称之为父任务。请注意,这个概念并不描述任务层级结构中更高的任务(即它们不是任务的直接父级)。*下游*任务也适用同样的定义,它需要是另一个任务的直接子级。

有两种声明依赖关系的方法——使用 >><<(位移)操作符

first_task >> second_task >> [third_task, fourth_task]

或者更显式的 set_upstreamset_downstream 方法

first_task.set_downstream(second_task)
third_task.set_upstream(second_task)

这两种方法的功能完全相同,但总的来说,我们建议您使用位移操作符,因为在大多数情况下它们更易读。

默认情况下,当一个任务的所有上游(父)任务都成功时,它才会运行,但有很多方法可以修改此行为,例如添加分支、只等待部分上游任务,或根据当前运行在历史中的位置改变行为。更多信息请参见 控制流

任务默认情况下不相互传递信息,并完全独立运行。如果您想从一个任务向另一个任务传递信息,您应该使用 XComs

任务实例

就像 DAG 每次运行时会被实例化为 DAG Run 一样,一个 DAG 下的任务会被实例化为*任务实例*。

任务实例是该任务在给定 DAG(以及给定数据间隔)下的特定运行。它们也是具有*状态*的任务的表现,表示它所处的生命周期阶段。

任务实例可能的状态有

  • none: 任务尚未排队执行(其依赖项尚未满足)

  • scheduled: 调度器已确定任务的依赖项已满足并应运行

  • queued: 任务已被分配给执行器,正在等待工作进程

  • running: 任务正在工作进程上运行(或在本地/同步执行器上)

  • success: 任务运行完成无错误

  • restarting: 任务在运行时被外部请求重新启动

  • failed: 任务在执行期间发生错误并运行失败

  • skipped: 任务因分支、LatestOnly 或类似原因被跳过。

  • upstream_failed: 一个上游任务失败,且 触发规则 (Trigger Rule) 要求它必须成功

  • up_for_retry: 任务失败,但还有剩余重试次数,将被重新调度。

  • up_for_reschedule: 任务是一个 传感器(Sensor),处于 reschedule 模式

  • deferred: 任务已被延迟到触发器 (trigger)

  • removed: 自运行开始以来,任务已从 DAG 中消失

../_images/diagram_task_lifecycle.png

理想情况下,任务应从 none 依次流经 scheduledqueuedrunning,最终达到 success 状态。

当任何自定义任务(Operator)运行时,它将获得传递给它的任务实例副本;除了能够检查任务元数据外,它还包含诸如 XComs 之类的方法。

关系术语

对于任何给定的任务实例,它与其他实例有两种类型的关系。

首先,它可以有*上游*和*下游*任务

task1 >> task2 >> task3

当 DAG 运行时,它会为相互之间存在上游/下游关系的这些任务创建实例,但所有这些实例都具有相同的数据间隔。

可能还有*同一任务*的实例,但针对不同的数据间隔——来自同一 DAG 的其他运行。我们将这些称为*前一个*和*后一个*——这与*上游*和*下游*的关系不同!

注意

一些较旧的 Airflow 文档可能仍将“previous”用来表示“upstream”。如果您发现这种情况,请帮助我们修正!

超时

如果您希望任务有最大运行时间,请将其 execution_timeout 属性设置为一个 datetime.timedelta 值,作为最大允许运行时间。这适用于所有 Airflow 任务,包括传感器。execution_timeout 控制每次执行允许的最大时间。如果 execution_timeout 被超出,任务将超时并抛出 AirflowTaskTimeout 异常。

此外,传感器有一个 timeout 参数。这仅对处于 reschedule 模式的传感器重要。timeout 控制传感器成功允许的最大时间。如果 timeout 被超出,AirflowSensorTimeout 将被抛出,传感器将立即失败且不重试。

以下 SFTPSensor 示例对此进行了说明。该 sensor 处于 reschedule 模式,这意味着它会周期性地执行并重新调度,直到成功。

  • 每次传感器探测 SFTP 服务器时,允许的最大时间为 execution_timeout 定义的 60 秒。

  • 如果传感器探测 SFTP 服务器耗时超过 60 秒,AirflowTaskTimeout 将被抛出。发生这种情况时,传感器可以重试。最多可以重试 retries 定义的 2 次。

  • 从第一次执行开始,直到最终成功(即文件 ‘root/test’ 出现后),传感器允许的最大时间为 timeout 定义的 3600 秒。换句话说,如果文件在 3600 秒内没有出现在 SFTP 服务器上,传感器将抛出 AirflowSensorTimeout。抛出此错误时不会重试。

  • 如果在 3600 秒间隔内,传感器由于网络中断等其他原因失败,最多可以重试 retries 定义的 2 次。重试不会重置 timeout。它总共有最多 3600 秒的时间来成功。

sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60),
    timeout=3600,
    retries=2,
    mode="reschedule",
)

SLA

Airflow 2 中的 SLA 功能在 3.0 中已被移除,并将在 Airflow 3.1 中被新的实现替代。

特殊异常

如果您想从自定义任务/Operator 代码中控制任务的状态,Airflow 提供了两个您可以抛出的特殊异常

  • AirflowSkipException 将当前任务标记为已跳过

  • AirflowFailException 将当前任务标记为失败,*忽略任何剩余的重试尝试*

如果您的代码对执行环境有额外了解并希望更快失败/跳过,这些异常会很有用——例如,当知道没有可用数据时跳过,或者当检测到 API 密钥无效时快速失败(因为重试无法解决此问题)。

任务实例心跳超时

没有系统能完美运行,任务实例偶尔会发生故障。

任务实例(TaskInstances)即使其关联的作业处于非活动状态,也可能卡在 running 状态(例如,如果任务实例的工作进程内存不足)。这类任务以前被称为僵尸任务。Airflow 会定期查找这些任务,清理它们,并将任务实例标记为失败,如果还有可用重试次数则重试。任务实例的心跳可能因多种原因超时,包括

  • Airflow 工作进程内存不足并被 OOMKilled。

  • Airflow 工作进程的活跃度探测失败,因此系统(例如 Kubernetes)重启了工作进程。

  • 系统(例如 Kubernetes)缩减规模并将 Airflow 工作进程从一个节点迁移到另一个节点。

在本地复现任务实例心跳超时

如果您希望在开发/测试过程中复现任务实例心跳超时,请遵循以下步骤

  1. 为您的本地 Airflow 设置以下环境变量(或者您可以修改 airflow.cfg 中的相应配置值)

export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT=2
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT_DETECTION_INTERVAL=5
  1. 创建一个 DAG,其中包含一个需要大约 10 分钟才能完成的任务(即一个长时间运行的任务)。例如,您可以使用以下 DAG

from airflow.sdk import dag
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime


@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False)
def sleep_dag():
    t1 = BashOperator(
        task_id="sleep_10_minutes",
        bash_command="sleep 600",
    )


sleep_dag()

运行上面的 DAG 并等待一段时间。任务实例(TaskInstance)将在 <task_instance_heartbeat_timeout> 秒后被标记为失败。

执行器配置

一些 执行器(Executors) 允许可选的按任务配置——例如 KubernetesExecutor,它允许您设置运行任务的镜像。

这是通过向任务或 Operator 传递 executor_config 参数实现的。以下是为一个将在 KubernetesExecutor 上运行的任务设置 Docker 镜像的示例

MyOperator(...,
    executor_config={
        "KubernetesExecutor":
            {"image": "myCustomDockerImage"}
    }
)

您可以传递给 executor_config 的设置因执行器而异,因此请阅读各个执行器文档以了解您可以设置的内容。

这篇文章有帮助吗?