任务

任务是 Airflow 中的基本执行单元。任务被安排到 DAG 中,然后在其之间设置上游和下游依赖关系,以表达它们应该运行的顺序。

任务有三种基本类型

  • 操作器,预定义的任务模板,您可以快速将它们串在一起以构建 DAG 的大部分内容。

  • 传感器,操作器的一个特殊子类,完全用于等待外部事件的发生。

  • 使用 TaskFlow 装饰的 @task,它是一个打包为任务的自定义 Python 函数。

在内部,这些实际上都是 Airflow 的 BaseOperator 的子类,任务和操作器的概念在某种程度上是可以互换的,但将它们视为独立的概念是有用的 - 本质上,操作器和传感器是*模板*,当您在 DAG 文件中调用一个时,您正在创建一个任务。

关系

使用任务的关键部分是定义它们之间的关系 - 它们的*依赖关系*,或者正如我们在 Airflow 中所说的,它们的*上游*和*下游*任务。您首先声明您的任务,然后您声明它们的依赖关系。

注意

我们将*上游*任务称为直接位于另一个任务之前的任务。我们以前称之为父任务。请注意,此概念并未描述任务层次结构中较高的任务(即它们不是任务的直接父级)。相同的定义适用于*下游*任务,它需要是另一个任务的直接子级。

有两种声明依赖关系的方法 - 使用 >><<(位移)运算符

first_task >> second_task >> [third_task, fourth_task]

或者更明确的 set_upstreamset_downstream 方法

first_task.set_downstream(second_task)
third_task.set_upstream(second_task)

这两者都做完全相同的事情,但总的来说,我们建议您使用位移运算符,因为在大多数情况下它们更容易阅读。

默认情况下,当其所有上游(父)任务都成功时,任务将运行,但是有很多方法可以修改此行为以添加分支,仅等待某些上游任务,或根据当前运行在历史记录中的位置更改行为。有关更多信息,请参阅 控制流

默认情况下,任务不会相互传递信息,而是完全独立运行。如果您想将信息从一个任务传递到另一个任务,您应该使用 XComs

任务实例

与 DAG 在每次运行时实例化为 DAG 运行 非常相似,DAG 下的任务被实例化为*任务实例*。

任务的实例是给定 DAG(因此是给定数据间隔)的该任务的特定运行。它们也是具有*状态*的任务的表示形式,表示它处于生命周期的哪个阶段。

任务实例的可能状态为

  • none:任务尚未排队等待执行(其依赖项尚未满足)

  • scheduled:调度程序已确定满足任务的依赖关系,并且应该运行

  • queued:该任务已分配给执行器,正在等待工作器

  • running:任务正在工作器上运行(或在本地/同步执行器上运行)

  • success:任务运行完成,没有错误

  • restarting:任务在运行时被外部请求重新启动

  • failed:任务在执行过程中出错,无法运行

  • skipped:由于分支、LatestOnly 或类似原因,任务被跳过。

  • upstream_failed:上游任务失败,并且 触发规则 表示我们需要它

  • up_for_retry:任务失败,但还有重试次数,将重新安排。

  • up_for_reschedule:任务是处于 reschedule 模式的 传感器

  • deferred:任务已 推迟到触发器

  • removed:自运行开始以来,任务已从 DAG 中消失

../_images/task_lifecycle_diagram.png

理想情况下,任务应该从 none 流向 scheduled,再到 queued,再到 running,最后到 success

当任何自定义任务(操作器)正在运行时,它将获得传递给它的任务实例的副本;除了能够检查任务元数据之外,它还包含 XComs 之类的方法。

关系术语

对于任何给定的任务实例,它与其他实例有两种类型的关系。

首先,它可以有*上游*和*下游*任务

task1 >> task2 >> task3

当 DAG 运行时,它将为这些彼此上游/下游但都具有相同数据间隔的任务创建实例。

也可能存在*相同任务*的实例,但数据间隔不同 - 来自同一 DAG 的其他运行。我们称这些为*上一个*和*下一个* - 它是与*上游*和*下游*不同的关系!

注意

一些较旧的 Airflow 文档可能仍然使用“上一个”来表示“上游”。如果您发现这种情况,请帮助我们修复它!

超时

如果您希望任务具有最大运行时间,请将其 execution_timeout 属性设置为 datetime.timedelta 值,该值是允许的最大运行时间。这适用于所有 Airflow 任务,包括传感器。 execution_timeout 控制每次执行允许的最长时间。如果超过 execution_timeout,则任务超时并引发 AirflowTaskTimeout

此外,传感器还有一个 timeout 参数。这仅适用于 reschedule 模式的传感器。 timeout 控制传感器成功允许的最长时间。如果超过 timeout,将引发 AirflowSensorTimeout,并且传感器立即失败,不会重试。

以下 SFTPSensor 示例说明了这一点。 sensor 处于 reschedule 模式,这意味着它会定期执行并重新安排,直到成功。

  • 每次传感器探测 SFTP 服务器时,它最多允许花费 60 秒,如 execution_timeout 定义的那样。

  • 如果传感器探测 SFTP 服务器的时间超过 60 秒,将引发 AirflowTaskTimeout。发生这种情况时,允许传感器重试。它最多可以重试 2 次,如 retries 定义的那样。

  • 从第一次执行开始,直到最终成功(即在文件“root/test”出现之后),传感器最多允许 3600 秒,如 timeout 所定义。换句话说,如果文件在 3600 秒内没有出现在 SFTP 服务器上,传感器将引发 AirflowSensorTimeout。引发此错误时,它将不会重试。

  • 如果传感器由于其他原因(例如 3600 秒间隔内的网络中断)而失败,则它最多可以重试 2 次,如 retries 所定义。重试不会重置 timeout。它总共仍然有最多 3600 秒的时间来成功。

sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60),
    timeout=3600,
    retries=2,
    mode="reschedule",
)

如果您只是想在任务超时运行时收到通知,但仍然希望它运行到完成,那么您需要的是 SLA

SLA

SLA 或服务级别协议,是对任务相对于 Dag 运行开始时间应完成的最长时间的预期。如果任务花费的时间超过此时间,则它将在用户界面的“SLA 未达标”部分中可见,并且会通过电子邮件发送所有未达到其 SLA 的任务。

但是,超过其 SLA 的任务不会被取消 - 它们被允许运行到完成。如果您想在达到一定的运行时间后取消任务,则需要 超时

要为任务设置 SLA,请将 datetime.timedelta 对象传递给任务/运算符的 sla 参数。您还可以提供一个 sla_miss_callback,如果希望运行自己的逻辑,则在 SLA 未达标时调用它。

如果您想完全禁用 SLA 检查,可以在 Airflow 的 [core] 配置中设置 check_slas = False

要详细了解如何配置电子邮件,请参阅 电子邮件配置

注意

不会检查手动触发的任务和事件驱动 DAG 中的任务是否出现 SLA 未达标的情况。有关 DAG schedule 值的更多信息,请参阅 DAG 运行

sla_miss_callback

您还可以提供一个 sla_miss_callback,如果希望运行自己的逻辑,则在 SLA 未达标时调用它。sla_miss_callback 的函数签名需要 5 个参数。

  1. dag

    • DAG 对象,用于 DAGRun 中的任务未达到其 SLA

  2. task_list

    • 字符串列表(以换行符分隔,\n),包含自上次 sla_miss_callback 运行以来所有未达到其 SLA 的任务。

  3. blocking_task_list

    • DAGRun(s) 中的任何任务(与未达到 SLA 的任务具有相同的 execution_date),在 sla_miss_callback 运行时未处于 **SUCCESS** 状态。例如“正在运行”、“失败”。这些任务被描述为阻止自身或其他任务在其 SLA 窗口完成之前完成的任务。

  4. slas

    • task_list 参数中的任务关联的 SlaMiss 对象列表。

  5. blocking_tis

    • blocking_task_list 参数中的任务关联的 TaskInstance 对象列表。

sla_miss_callback 函数签名的示例

def my_sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    ...
def my_sla_miss_callback(*args):
    ...

示例 DAG

airflow/example_dags/example_sla_dag.py[源代码]

def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
    print(
        "The callback arguments are: ",
        {
            "dag": dag,
            "task_list": task_list,
            "blocking_task_list": blocking_task_list,
            "slas": slas,
            "blocking_tis": blocking_tis,
        },
    )


@dag(
    schedule="*/2 * * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    sla_miss_callback=sla_callback,
    default_args={"email": "[email protected]"},
)
def example_sla_dag():
    @task(sla=datetime.timedelta(seconds=10))
    def sleep_20():
        """Sleep for 20 seconds"""
        time.sleep(20)

    @task
    def sleep_30():
        """Sleep for 30 seconds"""
        time.sleep(30)

    sleep_20() >> sleep_30()


example_dag = example_sla_dag()

特殊异常

如果您想从自定义任务/运算符代码中控制任务的状态,Airflow 提供了两个可以引发的特殊异常

  • AirflowSkipException 会将当前任务标记为已跳过

  • AirflowFailException 会将当前任务标记为已失败,*忽略任何剩余的重试尝试*

如果您的代码对其环境有额外的了解,并且希望更快地失败/跳过,这些异常会很有用 - 例如,在知道没有可用数据时跳过,或者在其检测到其 API 密钥无效时快速失败(因为重试无法解决此问题)。

僵尸/不死任务

没有哪个系统可以完美运行,任务实例预计会偶尔死亡。Airflow 检测两种任务/进程不匹配

  • “僵尸任务”是 TaskInstances 停留在 running 状态,尽管与其关联的作业处于非活动状态(例如,它们的进程在被终止时没有发送最近的心跳,或者机器死机)。Airflow 会定期查找这些任务,清理它们,并根据其设置使任务失败或重试。

  • “不死任务”是不应该运行但仍在运行的任务,通常是通过 UI 手动编辑任务实例时造成的。Airflow 会定期查找它们并终止它们。

以下是 Airflow 调度程序中定期运行以检测僵尸/不死任务的代码片段。

airflow/jobs/scheduler_job_runner.py[源代码]

    def _find_zombies(self) -> None:
        """
        Find zombie task instances and create a TaskCallbackRequest to be handled by the DAG processor.

        Zombie instances are tasks haven't heartbeated for too long or have a no-longer-running LocalTaskJob.
        """
        from airflow.jobs.job import Job

        self.log.debug("Finding 'running' jobs without a recent heartbeat")
        limit_dttm = timezone.utcnow() - timedelta(seconds=self._zombie_threshold_secs)

        with create_session() as session:
            zombies: list[tuple[TI, str, str]] = (
                session.execute(
                    select(TI, DM.fileloc, DM.processor_subdir)
                    .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
                    .join(Job, TI.job_id == Job.id)
                    .join(DM, TI.dag_id == DM.dag_id)
                    .where(TI.state == TaskInstanceState.RUNNING)
                    .where(
                        or_(
                            Job.state != JobState.RUNNING,
                            Job.latest_heartbeat < limit_dttm,
                        )
                    )
                    .where(Job.job_type == "LocalTaskJob")
                    .where(TI.queued_by_job_id == self.job.id)
                )
                .unique()
                .all()
            )

        if zombies:
            self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)

        for ti, file_loc, processor_subdir in zombies:
            zombie_message_details = self._generate_zombie_message_details(ti)
            request = TaskCallbackRequest(
                full_filepath=file_loc,
                processor_subdir=processor_subdir,
                simple_task_instance=SimpleTaskInstance.from_ti(ti),
                msg=str(zombie_message_details),
            )
            log_message = (
                f"Detected zombie job: {request} "
                "(See https://airflow.apache.org/docs/apache-airflow/"
                "stable/core-concepts/tasks.html#zombie-undead-tasks)"
            )
            self._task_context_logger.error(log_message, ti=ti)
            self.job.executor.send_callback(request)
            Stats.incr("zombies_killed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})

以上代码片段中用于检测僵尸任务的标准解释如下

  1. 任务实例状态

    只有处于 RUNNING 状态的任务实例才被视为潜在的僵尸。

  2. 作业状态和心跳检查

    如果关联的作业未处于 RUNNING 状态,或者作业的最新心跳早于计算出的时间阈值 (limit_dttm),则会识别出僵尸任务。心跳是一种机制,用于指示任务或作业仍在运行。

  3. 作业类型

    与任务关联的作业必须是 LocalTaskJob 类型。

  4. 按作业 ID 排队

    仅考虑由当前正在处理的同一作业排队的任务。

这些条件共同有助于根据任务的状态、关联的作业状态、心跳状态、作业类型以及对它们进行排队的特定作业来识别可能是僵尸的正在运行的任务。如果任务满足这些条件,则将其视为潜在的僵尸,并采取进一步的操作,例如记录和发送回调请求。

在本地重现僵尸任务

如果您想为开发/测试过程重现僵尸任务,请按照以下步骤操作

  1. 为您的本地 Airflow 设置设置以下环境变量(或者,您可以在 airflow.cfg 中调整相应的配置值)

export AIRFLOW__SCHEDULER__LOCAL_TASK_JOB_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__SCHEDULER_ZOMBIE_TASK_THRESHOLD=2
export AIRFLOW__SCHEDULER__ZOMBIE_DETECTION_INTERVAL=5
  1. 使用一个包含大约需要 10 分钟才能完成的任务的 DAG(即长时间运行的任务)。例如,您可以使用以下 DAG

from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from datetime import datetime


@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False)
def sleep_dag():
    t1 = BashOperator(
        task_id="sleep_10_minutes",
        bash_command="sleep 600",
    )


sleep_dag()

运行上述 DAG 并等待一段时间。您应该会看到任务实例变成僵尸任务,然后被调度程序终止。

执行程序配置

某些 执行程序 允许可选的每任务配置 - 例如 KubernetesExecutor,它允许您设置运行任务的映像。

这是通过任务或运算符的 executor_config 参数实现的。以下是在 KubernetesExecutor 上运行的任务的 Docker 映像设置示例

MyOperator(...,
    executor_config={
        "KubernetesExecutor":
            {"image": "myCustomDockerImage"}
    }
)

您可以传递给 executor_config 的设置因执行程序而异,因此请阅读 各个执行程序文档 以了解可以设置的内容。

此条目有帮助吗?