任务¶
任务 (Task) 是 Airflow 中的基本执行单元。任务被组织到 DAGs 中,并通过设置上游和下游依赖关系来表达它们的运行顺序。
任务有三种基本类型
操作符 (Operators),是预定义的任务模板,您可以快速组合它们来构建 DAG 的大部分。
传感器 (Sensors),是 Operator 的特殊子类,完全用于等待外部事件发生。
TaskFlow 修饰的
@task
,是打包成任务的自定义 Python 函数。
内部来说,这些实际上都是 Airflow BaseOperator
的子类,任务(Task)和操作符(Operator)的概念在某种程度上可以互换,但将它们视为独立概念很有用——本质上,操作符和传感器是*模板*,当您在 DAG 文件中调用它们时,您正在创建一个任务。
关系¶
使用任务的关键部分在于定义它们之间的关系——它们的*依赖关系*,或者正如我们在 Airflow 中所说的,它们的*上游*任务和*下游*任务。您首先声明任务,然后声明它们的依赖关系。
注意
我们将*上游*任务称为直接位于另一个任务之前的任务。我们以前称之为父任务。请注意,这个概念并不描述任务层级结构中更高的任务(即它们不是任务的直接父级)。*下游*任务也适用同样的定义,它需要是另一个任务的直接子级。
有两种声明依赖关系的方法——使用 >>
和 <<
(位移)操作符
first_task >> second_task >> [third_task, fourth_task]
或者更显式的 set_upstream
和 set_downstream
方法
first_task.set_downstream(second_task)
third_task.set_upstream(second_task)
这两种方法的功能完全相同,但总的来说,我们建议您使用位移操作符,因为在大多数情况下它们更易读。
默认情况下,当一个任务的所有上游(父)任务都成功时,它才会运行,但有很多方法可以修改此行为,例如添加分支、只等待部分上游任务,或根据当前运行在历史中的位置改变行为。更多信息请参见 控制流。
任务默认情况下不相互传递信息,并完全独立运行。如果您想从一个任务向另一个任务传递信息,您应该使用 XComs。
任务实例¶
就像 DAG 每次运行时会被实例化为 DAG Run 一样,一个 DAG 下的任务会被实例化为*任务实例*。
任务实例是该任务在给定 DAG(以及给定数据间隔)下的特定运行。它们也是具有*状态*的任务的表现,表示它所处的生命周期阶段。
任务实例可能的状态有
none
: 任务尚未排队执行(其依赖项尚未满足)scheduled
: 调度器已确定任务的依赖项已满足并应运行queued
: 任务已被分配给执行器,正在等待工作进程running
: 任务正在工作进程上运行(或在本地/同步执行器上)success
: 任务运行完成无错误restarting
: 任务在运行时被外部请求重新启动failed
: 任务在执行期间发生错误并运行失败skipped
: 任务因分支、LatestOnly 或类似原因被跳过。upstream_failed
: 一个上游任务失败,且 触发规则 (Trigger Rule) 要求它必须成功up_for_retry
: 任务失败,但还有剩余重试次数,将被重新调度。up_for_reschedule
: 任务是一个 传感器(Sensor),处于reschedule
模式deferred
: 任务已被延迟到触发器 (trigger)removed
: 自运行开始以来,任务已从 DAG 中消失

理想情况下,任务应从 none
依次流经 scheduled
、queued
、running
,最终达到 success
状态。
当任何自定义任务(Operator)运行时,它将获得传递给它的任务实例副本;除了能够检查任务元数据外,它还包含诸如 XComs 之类的方法。
关系术语¶
对于任何给定的任务实例,它与其他实例有两种类型的关系。
首先,它可以有*上游*和*下游*任务
task1 >> task2 >> task3
当 DAG 运行时,它会为相互之间存在上游/下游关系的这些任务创建实例,但所有这些实例都具有相同的数据间隔。
可能还有*同一任务*的实例,但针对不同的数据间隔——来自同一 DAG 的其他运行。我们将这些称为*前一个*和*后一个*——这与*上游*和*下游*的关系不同!
注意
一些较旧的 Airflow 文档可能仍将“previous”用来表示“upstream”。如果您发现这种情况,请帮助我们修正!
超时¶
如果您希望任务有最大运行时间,请将其 execution_timeout
属性设置为一个 datetime.timedelta
值,作为最大允许运行时间。这适用于所有 Airflow 任务,包括传感器。execution_timeout
控制每次执行允许的最大时间。如果 execution_timeout
被超出,任务将超时并抛出 AirflowTaskTimeout
异常。
此外,传感器有一个 timeout
参数。这仅对处于 reschedule
模式的传感器重要。timeout
控制传感器成功允许的最大时间。如果 timeout
被超出,AirflowSensorTimeout
将被抛出,传感器将立即失败且不重试。
以下 SFTPSensor
示例对此进行了说明。该 sensor
处于 reschedule
模式,这意味着它会周期性地执行并重新调度,直到成功。
每次传感器探测 SFTP 服务器时,允许的最大时间为
execution_timeout
定义的 60 秒。如果传感器探测 SFTP 服务器耗时超过 60 秒,
AirflowTaskTimeout
将被抛出。发生这种情况时,传感器可以重试。最多可以重试retries
定义的 2 次。从第一次执行开始,直到最终成功(即文件 ‘root/test’ 出现后),传感器允许的最大时间为
timeout
定义的 3600 秒。换句话说,如果文件在 3600 秒内没有出现在 SFTP 服务器上,传感器将抛出AirflowSensorTimeout
。抛出此错误时不会重试。如果在 3600 秒间隔内,传感器由于网络中断等其他原因失败,最多可以重试
retries
定义的 2 次。重试不会重置timeout
。它总共有最多 3600 秒的时间来成功。
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(seconds=60),
timeout=3600,
retries=2,
mode="reschedule",
)
SLA¶
Airflow 2 中的 SLA 功能在 3.0 中已被移除,并将在 Airflow 3.1 中被新的实现替代。
特殊异常¶
如果您想从自定义任务/Operator 代码中控制任务的状态,Airflow 提供了两个您可以抛出的特殊异常
AirflowSkipException
将当前任务标记为已跳过AirflowFailException
将当前任务标记为失败,*忽略任何剩余的重试尝试*
如果您的代码对执行环境有额外了解并希望更快失败/跳过,这些异常会很有用——例如,当知道没有可用数据时跳过,或者当检测到 API 密钥无效时快速失败(因为重试无法解决此问题)。
任务实例心跳超时¶
没有系统能完美运行,任务实例偶尔会发生故障。
任务实例(TaskInstances
)即使其关联的作业处于非活动状态,也可能卡在 running
状态(例如,如果任务实例的工作进程内存不足)。这类任务以前被称为僵尸任务。Airflow 会定期查找这些任务,清理它们,并将任务实例标记为失败,如果还有可用重试次数则重试。任务实例的心跳可能因多种原因超时,包括
Airflow 工作进程内存不足并被 OOMKilled。
Airflow 工作进程的活跃度探测失败,因此系统(例如 Kubernetes)重启了工作进程。
系统(例如 Kubernetes)缩减规模并将 Airflow 工作进程从一个节点迁移到另一个节点。
在本地复现任务实例心跳超时¶
如果您希望在开发/测试过程中复现任务实例心跳超时,请遵循以下步骤
为您的本地 Airflow 设置以下环境变量(或者您可以修改 airflow.cfg 中的相应配置值)
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT=2
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT_DETECTION_INTERVAL=5
创建一个 DAG,其中包含一个需要大约 10 分钟才能完成的任务(即一个长时间运行的任务)。例如,您可以使用以下 DAG
from airflow.sdk import dag
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime
@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False)
def sleep_dag():
t1 = BashOperator(
task_id="sleep_10_minutes",
bash_command="sleep 600",
)
sleep_dag()
运行上面的 DAG 并等待一段时间。任务实例(TaskInstance
)将在 <task_instance_heartbeat_timeout> 秒后被标记为失败。
执行器配置¶
一些 执行器(Executors) 允许可选的按任务配置——例如 KubernetesExecutor
,它允许您设置运行任务的镜像。
这是通过向任务或 Operator 传递 executor_config
参数实现的。以下是为一个将在 KubernetesExecutor
上运行的任务设置 Docker 镜像的示例
MyOperator(...,
executor_config={
"KubernetesExecutor":
{"image": "myCustomDockerImage"}
}
)
您可以传递给 executor_config
的设置因执行器而异,因此请阅读各个执行器文档以了解您可以设置的内容。