任务¶
任务是 Airflow 中执行的基本单元。任务被组织成 DAGs,然后在它们之间设置上游和下游依赖关系,以表达它们应该运行的顺序。
有三种基本类型的任务
运算符,预定义的任务模板,您可以快速将它们组合在一起以构建 DAG 的大部分部分。
传感器,运算符的一个特殊子类,它完全是关于等待外部事件发生。
一个 TaskFlow 装饰的
@task
,它是一个打包为任务的自定义 Python 函数。
在内部,这些实际上都是 Airflow 的 BaseOperator
的子类,并且任务和运算符的概念在某种程度上是可以互换的,但将它们视为单独的概念是有用的 - 本质上,运算符和传感器是模板,当你在 DAG 文件中调用一个时,你正在创建一个任务。
关系¶
使用任务的关键部分是定义它们如何相互关联 - 它们的依赖关系,或者正如我们在 Airflow 中所说的,它们的上游和下游任务。你首先声明你的任务,然后你声明它们的依赖关系。
注意
我们称上游任务为直接位于另一个任务之前的任务。我们以前称之为父任务。请注意,这个概念并不描述任务层次结构中更高的任务(即,它们不是该任务的直接父级)。同样的定义适用于下游任务,它需要是另一个任务的直接子级。
有两种声明依赖关系的方法 - 使用 >>
和 <<
(位移)运算符
first_task >> second_task >> [third_task, fourth_task]
或者更明确的 set_upstream
和 set_downstream
方法
first_task.set_downstream(second_task)
third_task.set_upstream(second_task)
这两者做的事情完全相同,但一般来说,我们建议你使用位移运算符,因为在大多数情况下它们更容易阅读。
默认情况下,当它的所有上游(父)任务都成功完成时,任务才会运行,但有很多方法可以修改此行为以添加分支,仅等待某些上游任务,或根据当前运行在历史记录中的位置更改行为。有关更多信息,请参阅 控制流。
默认情况下,任务不会相互传递信息,并且完全独立运行。如果你想将信息从一个任务传递到另一个任务,你应该使用 XComs。
任务实例¶
就像每次运行 DAG 时都会实例化为 DAG 运行一样,DAG 下的任务也被实例化为任务实例。
任务的实例是该任务针对给定 DAG(因此针对给定的数据间隔)的特定运行。它们也是任务的表示,该任务具有状态,表示它所处的生命周期阶段。
任务实例的可能状态是
none
: 该任务尚未排队等待执行(其依赖关系尚未满足)scheduled
: 调度程序已确定任务的依赖关系已满足,它应该运行queued
: 该任务已分配给执行器并正在等待工作者running
: 该任务正在工作者上运行(或在本地/同步执行器上运行)success
: 该任务完成运行且没有错误restarting
: 该任务在运行时被外部请求重新启动failed
: 该任务在执行过程中出现错误且未能运行skipped
: 由于分支、LatestOnly 或类似情况,该任务被跳过。upstream_failed
: 上游任务失败,并且 触发规则 表示我们需要它up_for_retry
: 该任务失败,但仍有重试尝试次数,并且将被重新调度。up_for_reschedule
: 该任务是一个处于reschedule
模式的 传感器deferred
: 该任务已推迟到触发器removed
: 自运行开始以来,该任务已从 DAG 中消失
理想情况下,任务应该从 none
流向 scheduled
,流向 queued
,流向 running
,最后流向 success
。
当任何自定义任务(运算符)正在运行时,它将获得传递给它的任务实例的副本;除了能够检查任务元数据外,它还包含诸如 XComs 之类的方法。
关系术语¶
对于任何给定的任务实例,它与其他实例有两种类型的关系。
首先,它可以有上游和下游任务
task1 >> task2 >> task3
当 DAG 运行时,它将为彼此上游/下游的每个任务创建实例,但这些实例都具有相同的数据间隔。
也可能有同一任务的实例,但对于不同的数据间隔 - 来自同一 DAG 的其他运行。我们称这些为前一个和下一个 - 它与上游和下游的关系不同!
注意
一些较旧的 Airflow 文档可能仍使用“前一个”来表示“上游”。如果您发现这种情况,请帮助我们修复它!
超时¶
如果您希望任务具有最大运行时,请将其 execution_timeout
属性设置为一个 datetime.timedelta
值,该值是允许的最大运行时。这适用于所有 Airflow 任务,包括传感器。execution_timeout
控制每次执行允许的最大时间。如果违反了 execution_timeout
,则任务超时并引发 AirflowTaskTimeout
。
此外,传感器还有一个 timeout
参数。这仅对 reschedule
模式下的传感器很重要。timeout
控制传感器成功允许的最大时间。如果违反了 timeout
,将引发 AirflowSensorTimeout
并且传感器会立即失败而不会重试。
以下 SFTPSensor
示例说明了这一点。该 sensor
处于 reschedule
模式,这意味着它会定期执行和重新调度,直到成功为止。
每次传感器探测 SFTP 服务器时,它最多允许花费 60 秒,如
execution_timeout
所定义。如果传感器探测 SFTP 服务器的时间超过 60 秒,将引发
AirflowTaskTimeout
。允许传感器在这种情况下重试。它最多可以重试 2 次,如retries
所定义。从第一次执行开始,直到最终成功(即在文件 ‘root/test’ 出现后),允许传感器最多花费 3600 秒,如
timeout
所定义。换句话说,如果文件在 3600 秒内未出现在 SFTP 服务器上,则传感器将引发AirflowSensorTimeout
。当引发此错误时,它不会重试。如果传感器由于其他原因(例如 3600 秒间隔内的网络中断)而失败,则它可以重试最多 2 次,如
retries
所定义。重试不会重置timeout
。它仍然有最多 3600 秒的总时间才能成功。
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(seconds=60),
timeout=3600,
retries=2,
mode="reschedule",
)
如果您只是想在任务超时运行时收到通知,但仍然让它运行完成,那么您需要使用 SLAs。
SLAs¶
SLA,即服务级别协议,是指任务相对于 DAG 运行开始时间应完成的最大时间期望。如果任务运行时间超过此时间,则会在用户界面的“SLA 未命中”部分显示,并会通过电子邮件发送所有未命中 SLA 的任务。
但是,超过 SLA 的任务不会被取消,它们会被允许运行到完成。 如果你希望在达到某个运行时后取消任务,则需要使用超时。
要为任务设置 SLA,请将 datetime.timedelta
对象传递给 Task/Operator 的 sla
参数。你还可以提供一个 sla_miss_callback
,如果你想运行自己的逻辑,它将在 SLA 未命中时被调用。
如果你想完全禁用 SLA 检查,可以在 Airflow 的 [core]
配置中设置 check_slas = False
。
要了解有关配置电子邮件的更多信息,请参阅电子邮件配置。
注意
手动触发的任务和事件驱动 DAG 中的任务将不会检查 SLA 是否未命中。有关 DAG schedule
值的更多信息,请参阅DAG 运行。
sla_miss_callback¶
你还可以提供一个 sla_miss_callback
,如果你想运行自己的逻辑,它将在 SLA 未命中时被调用。 sla_miss_callback
的函数签名需要 5 个参数。
dag
task_list
自上次运行
sla_miss_callback
以来,所有未命中其 SLA 的任务的字符串列表(以换行符分隔,\n)。
blocking_task_list
slas
与
task_list
参数中的任务关联的SlaMiss
对象列表。
blocking_tis
与
blocking_task_list
参数中的任务关联的 TaskInstance 对象列表。
sla_miss_callback
函数签名的示例
def my_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
...
def my_sla_miss_callback(*args):
...
示例 DAG
def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
print(
"The callback arguments are: ",
{
"dag": dag,
"task_list": task_list,
"blocking_task_list": blocking_task_list,
"slas": slas,
"blocking_tis": blocking_tis,
},
)
@dag(
schedule="*/2 * * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
sla_miss_callback=sla_callback,
default_args={"email": "[email protected]"},
)
def example_sla_dag():
@task(sla=datetime.timedelta(seconds=10))
def sleep_20():
"""Sleep for 20 seconds"""
time.sleep(20)
@task
def sleep_30():
"""Sleep for 30 seconds"""
time.sleep(30)
sleep_20() >> sleep_30()
example_dag = example_sla_dag()
特殊异常¶
如果你想从自定义 Task/Operator 代码中控制任务的状态,Airflow 提供了两个你可以引发的特殊异常
AirflowSkipException
会将当前任务标记为跳过AirflowFailException
会将当前任务标记为失败,忽略任何剩余的重试尝试
如果你的代码对其环境有额外的了解并希望更快地失败/跳过,这些可能很有用 - 例如,当它知道没有可用数据时跳过,或者当它检测到其 API 密钥无效时快速失败(因为重试无法解决问题)。
僵尸/不死任务¶
没有系统是完美运行的,任务实例偶尔会死机。Airflow 检测两种任务/进程不匹配的情况
僵尸任务 是
TaskInstances
卡在running
状态,尽管它们关联的作业处于非活动状态(例如,它们的进程没有发送最近的心跳,因为它被终止了,或者机器死机了)。Airflow 会定期查找这些僵尸任务,清理它们,并根据其设置使任务失败或重试。任务可能由于多种原因变成僵尸,包括Airflow worker 内存不足并被 OOMKilled。
Airflow worker 未通过其活跃性探测,因此系统(例如,Kubernetes)重新启动了 worker。
系统(例如,Kubernetes)缩减规模并将 Airflow worker 从一个节点移动到另一个节点。
不死任务 是不应该运行但正在运行的任务,通常是由于你通过 UI 手动编辑任务实例而引起的。Airflow 会定期查找它们并终止它们。
以下是 Airflow 调度程序中定期运行以检测僵尸/不死任务的代码片段。
def _find_and_purge_zombies(self) -> None:
"""
Find and purge zombie task instances.
Zombie instances are tasks that failed to heartbeat for too long, or
have a no-longer-running LocalTaskJob.
A TaskCallbackRequest is also created for the killed zombie to be
handled by the DAG processor, and the executor is informed to no longer
count the zombie as running when it calculates parallelism.
"""
with create_session() as session:
if zombies := self._find_zombies(session=session):
self._purge_zombies(zombies, session=session)
def _find_zombies(self, *, session: Session) -> list[tuple[TI, str, str]]:
from airflow.jobs.job import Job
self.log.debug("Finding 'running' jobs without a recent heartbeat")
limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)
zombies = (
session.execute(
select(TI, DM.fileloc, DM.processor_subdir)
.with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
.join(Job, TI.job_id == Job.id)
.join(DM, TI.dag_id == DM.dag_id)
.where(TI.state == TaskInstanceState.RUNNING)
.where(or_(Job.state != JobState.RUNNING, Job.latest_heartbeat < limit_dttm))
.where(Job.job_type == "LocalTaskJob")
.where(TI.queued_by_job_id == self.job.id)
)
.unique()
.all()
)
if zombies:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)
return zombies
def _purge_zombies(self, zombies: list[tuple[TI, str, str]], *, session: Session) -> None:
for ti, file_loc, processor_subdir in zombies:
zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
processor_subdir=processor_subdir,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=str(zombie_message_details),
)
session.add(
Log(
event="heartbeat timeout",
task_instance=ti.key,
extra=(
f"Task did not emit heartbeat within time limit ({self._zombie_threshold_secs} "
"seconds) and will be terminated. "
"See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-undead-tasks"
),
)
)
self.log.error(
"Detected zombie job: %s "
"(See https://airflow.apache.org/docs/apache-airflow/"
"stable/core-concepts/tasks.html#zombie-undead-tasks)",
request,
)
self.job.executor.send_callback(request)
if (executor := self._try_to_load_executor(ti.executor)) is None:
self.log.warning("Cannot clean up zombie %r with non-existent executor %s", ti, ti.executor)
continue
executor.change_state(ti.key, TaskInstanceState.FAILED, remove_running=True)
Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})
以下是上述代码片段中用于检测僵尸任务的标准说明
任务实例状态
仅考虑处于 RUNNING 状态的任务实例作为潜在的僵尸任务。
作业状态和心跳检查
如果关联的作业未处于 RUNNING 状态,或者如果该作业的最新心跳早于计算的时间阈值 (limit_dttm),则将识别为僵尸任务。心跳是一种机制,用于指示任务或作业仍处于活动状态并正在运行。
作业类型
与任务关联的作业必须是
LocalTaskJob
类型。按作业 ID 排队
仅考虑由当前正在处理的同一作业排队的任务。
这些条件共同帮助根据其状态、关联的作业状态、心跳状态、作业类型以及将它们排队的特定作业来识别可能为僵尸的正在运行的任务。如果任务满足这些条件,则将其视为潜在的僵尸,并采取进一步的操作,例如记录日志和发送回调请求。
在本地重现僵尸任务¶
如果你想为开发/测试过程重现僵尸任务,请按照以下步骤操作
为本地 Airflow 设置设置以下环境变量(或者你可以调整 airflow.cfg 中的相应配置值)
export AIRFLOW__SCHEDULER__LOCAL_TASK_JOB_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=2
export AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=5
有一个 DAG,其中包含一个大约需要 10 分钟才能完成的任务(即,一个长时间运行的任务)。例如,你可以使用以下 DAG
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from datetime import datetime
@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False)
def sleep_dag():
t1 = BashOperator(
task_id="sleep_10_minutes",
bash_command="sleep 600",
)
sleep_dag()
运行上述 DAG 并等待一段时间。你应该看到任务实例变成僵尸任务,然后被调度程序杀死。
执行器配置¶
一些 执行器 允许可选的按任务配置 - 例如 KubernetesExecutor
,它允许你设置一个在其上运行任务的镜像。
这是通过 Task 或 Operator 的 executor_config
参数实现的。以下是在 KubernetesExecutor
上运行的任务设置 Docker 镜像的示例
MyOperator(...,
executor_config={
"KubernetesExecutor":
{"image": "myCustomDockerImage"}
}
)
你可以传递到 executor_config
中的设置因执行器而异,因此请阅读各个执行器文档,以查看你可以设置的内容。