调度器

Airflow 调度器监控所有任务和 DAG,并在其依赖项完成后触发任务实例。在幕后,调度器启动一个子进程,该子进程监控指定 DAG 目录中的所有 DAG 并与其保持同步。默认情况下,调度器每分钟收集一次 DAG 解析结果,并检查是否可以触发任何活动任务。

Airflow 调度器旨在作为 Airflow 生产环境中的持久服务运行。要启动它,您只需执行 airflow scheduler 命令。它使用 airflow.cfg 中指定的配置。

调度器使用配置的 执行器 来运行已准备好的任务。

要启动调度器,只需运行以下命令

airflow scheduler

调度器成功运行后,您的 DAG 将开始执行。

注意

第一个 DAG 运行是根据 DAG 中任务的最小 start_date 创建的。后续 DAG 运行将根据您的 DAG 的 时间表 创建。

对于具有 cron 或 timedelta 调度的 DAG,调度器在涵盖的周期结束后才会触发您的任务,例如,schedule 设置为 @daily 的作业将在一天结束后运行。这种技术确保在执行 DAG 之前,该周期所需的所有数据都已完全可用。在用户界面中,Airflow 似乎将您的任务运行 **延迟** 了一天

注意

如果您以一天的 schedule 运行 DAG,则数据间隔从 2019-11-21 开始的运行将在 2019-11-21T23:59 之后触发。

**让我们重复一遍**,调度器在开始日期之后的一个 schedule 运行您的作业,即在间隔的 **结束** 时。

您应该参考 DAG 运行 了解有关调度 DAG 的详细信息。

注意

调度器专为高吞吐量而设计。这是一个明智的设计决策,旨在尽快实现任务调度。调度器会检查池中有多少个可用插槽,并在一次迭代中调度最多该数量的任务实例。这意味着只有在等待的已调度任务多于队列插槽时,任务优先级才会生效。因此,如果低优先级任务和高优先级任务共享同一批次,则可能会出现先调度低优先级任务的情况。有关更多信息,您可以参考 此 GitHub 讨论

DAG 文件处理

您可以让 Airflow 调度器负责启动将 DAG 文件夹中包含的 Python 文件转换为包含要调度的任务的 DAG 对象的过程。

有关如何实现这一点的详细信息,请参阅 DAG 文件处理

使用未来日期触发 DAG

如果要使用“外部触发器”来运行未来日期的数据间隔,请在 airflow.cfgscheduler 部分中设置 allow_trigger_in_future = True。这仅在您的 DAG 使用 schedule=None 定义时才有效。如果设置为 False(默认值),如果您手动触发具有未来日期数据间隔的运行,则调度器在 data_interval_start 成为过去之前不会执行它。

运行多个调度器

Airflow 支持同时运行多个调度器,这既是为了提高性能,也是为了提高弹性。

概述

HA 调度器旨在利用现有的元数据数据库。这主要是为了简化操作:每个组件都已经需要与该数据库通信,并且通过不使用调度器之间的直接通信或共识算法(Raft、Paxos 等)或其他共识工具(例如 Apache Zookeeper 或 Consul),我们将“操作表面积”保持在最低限度。

调度器现在使用序列化的 DAG 表示来做出调度决策,调度循环的大致轮廓如下

  • 检查是否有任何 DAG 需要新的 DagRun,并创建它们

  • 检查一批 DagRun 是否存在可调度的 TaskInstance 或已完成的 DagRun

  • 选择可调度的 TaskInstance,并在遵守池限制和其他并发限制的同时,将它们排队以供执行

但是,这对数据库有一些要求。

数据库要求

简而言之,PostgreSQL 12+ 或 MySQL 8.0+ 的用户都已准备就绪 - 您可以根据需要启动任意数量的调度器副本 - 无需进一步设置或配置选项。如果您使用的是其他数据库,请继续阅读。

为了保持性能和吞吐量,调度循环中有一部分在内存中执行大量计算(因为每次 TaskInstance 都要往返于数据库会太慢),因此我们需要确保一次只有一个调度器位于此关键部分 - 否则将无法正确遵守限制。为了实现这一点,我们使用数据库行级锁(使用 SELECT ... FOR UPDATE)。

在这个关键部分中,TaskInstance 从已调度状态转变为排队到执行器,同时确保遵守各种并发和池限制。通过请求对池表中的每一行进行行级写锁来获取关键部分(大致相当于 SELECT * FROM slot_pool FOR UPDATE NOWAIT,但确切的查询略有不同)。

以下数据库完全受支持,并提供“最佳”体验

  • PostgreSQL 12+

  • MySQL 8.0+

警告

MariaDB 在 10.6.0 版本之前没有实现 SKIP LOCKEDNOWAIT SQL 子句。如果没有这些功能,则不支持运行多个调度器,并且已报告死锁错误。MariaDB 10.6.0 及更高版本可能适用于多个调度器,但这尚未经过测试。

注意

Microsoft SQL Server 尚未经过 HA 测试。

微调调度器性能

影响调度器性能的因素

调度器负责两个操作

  • 持续解析 DAG 文件并与数据库中的 DAG 同步

  • 持续调度任务以供执行

调度器并行执行这两个任务,并在不同的进程中独立运行。为了微调您的调度器,您需要考虑许多因素

  • 您拥有的部署类型
    • 您必须使用哪种文件系统来共享 DAG(影响持续读取 DAG 的性能)

    • 文件系统的速度有多快(在许多分布式云文件系统的情况下,您可以额外付费以获得更高的吞吐量/更快的文件系统)

    • 您有多少内存可用于处理

    • 您有多少可用的 CPU

    • 您有多少可用的网络吞吐量

  • 您的 DAG 结构的逻辑和定义
    • 您有多少个 DAG 文件

    • 您的文件中包含多少个 DAG

    • DAG 文件有多大(请记住,DAG 解析器需要每隔 n 秒读取并解析一次文件)

    • 它们的复杂程度如何(即解析速度有多快,有多少个任务和依赖项)

    • 解析您的 DAG 文件是否涉及导入大量库或在顶层进行大量处理(提示!不应该。请参阅 顶层 Python 代码

  • 调度器配置
    • 您有多少个调度器

    • 您的调度器中有多少个解析进程

    • 调度器在重新解析同一个 DAG 之间等待多长时间(持续进行)

    • 调度器在一个循环中处理多少个任务实例

    • 每个循环应创建/调度多少个新的 DAG 运行

    • 调度程序应多久执行一次清理并检查孤立任务/采用它们

为了执行微调,最好了解 Scheduler 如何在后台工作。您可以查看 Airflow Summit 2021 的演讲深入了解 Airflow Scheduler 演讲来执行微调。

如何进行 Scheduler 的微调

Airflow 为您提供了许多“旋钮”来微调性能,但这是一项单独的任务,取决于您的特定部署、DAG 结构、硬件可用性和期望,以确定要转动哪些旋钮才能获得最佳效果。管理部署的部分工作是确定您要优化的目标。一些用户可以接受 30 秒的新 DAG 解析延迟,以降低 CPU 使用率,而另一些用户则希望 DAG 在 DAG 文件夹中出现时几乎立即被解析,但代价是更高的 CPU 使用率。

Airflow 为您提供了决定的灵活性,但您应该找出性能的哪个方面对您最重要,并决定要朝哪个方向转动哪些旋钮。

通常,对于微调,您的方法应该与任何性能改进和优化相同(我们不会推荐任何特定工具 - 只需使用您通常用于观察和监控系统的工具)

  • 使用您通常用于监控系统的一组正确工具来监控您的系统非常重要。本文档不详细介绍您可以使用的特定指标和工具,它只描述您应该监控哪种资源,但您应该遵循您的最佳监控实践来获取正确的数据。

  • 确定性能的哪个方面对您最重要(您想要改进什么)

  • 观察您的系统以查看瓶颈在哪里:CPU、内存、I/O 是常见的限制因素

  • 根据您的期望和观察结果 - 确定您的下一个改进目标,然后返回到对性能和瓶颈的观察。性能改进是一个迭代过程。

哪些资源可能会限制 Scheduler 的性能

您应该注意以下几个资源使用领域

  • 文件系统性能。Airflow Scheduler 严重依赖于解析(有时是大量)Python 文件,这些文件通常位于共享文件系统上。Airflow Scheduler 会持续读取和重新解析这些文件。必须向工作程序提供相同的文件,因此它们通常存储在分布式文件系统中。您可以为此目的使用各种文件系统(NFS、CIFS、EFS、GCS fuse、Azure 文件系统都是很好的例子)。您可以控制这些文件系统的各种参数并微调其性能,但这超出了本文档的范围。您应该观察文件系统的统计信息和使用情况,以确定问题是否来自文件系统性能。例如,有传闻证据表明,当使用 EFS 时,增加 EFS 性能的 IOPS(并支付更多费用)会显着提高解析 Airflow DAG 的稳定性和速度。

  • 如果文件系统性能成为您的瓶颈,那么解决此问题的另一种方法是转向替代机制来分发您的 DAG。将 DAG 嵌入到您的映像中和 GitSync 分发都具有以下特性:Scheduler 可以在本地访问文件,并且它不必使用分布式文件系统来读取文件,Scheduler 可以在本地访问文件,并且它通常尽可能快,特别是如果您的机器使用快速的 SSD 磁盘进行本地存储。这些分发机制具有其他特性,可能使它们不是您的最佳选择,但如果您的性能问题来自分布式文件系统性能,它们可能是最佳的遵循方法。

  • 当您想要提高性能并并行处理更多内容时,数据库连接和数据库使用可能会成为一个问题。众所周知,Airflow 是“数据库连接密集型”的 - 您拥有的 DAG 越多,并且您想要并行处理的 DAG 越多,将打开的数据库连接就越多。对于 MySQL 来说,这通常不是问题,因为它处理连接的模型是基于线程的,但这对于 Postgres 来说可能是一个问题,因为 Postgres 的连接处理是基于进程的。普遍的共识是,如果您有中等规模的基于 Postgres 的 Airflow 安装,最好的解决方案是使用PGBouncer作为数据库的代理。Apache Airflow 的 Helm Chart开箱即用地支持 PGBouncer。

  • CPU 使用率对于 FileProcessor 来说是最重要的 - 这些是解析和执行 Python DAG 文件的进程。由于 Scheduler 会持续触发此类解析,因此当您有大量 DAG 时,处理可能会占用大量 CPU。您可以通过增加min_file_process_interval来缓解这种情况,但这是前面提到的权衡之一,其结果是对此类文件的更改将被更慢地获取,并且您将在提交文件与在 Airflow UI 中看到它们并由 Scheduler 执行之间看到延迟。优化 DAG 的构建方式、避免使用外部数据源是提高 CPU 使用率的最佳方法。如果您有更多可用的 CPU,则可以增加处理线程的数量parsing_processes,此外,Airflow Scheduler 可以通过多个实例几乎线性扩展,因此如果您的 Scheduler 的性能受 CPU 限制,您也可以添加更多 Scheduler。

  • 当您尝试从 Airflow 中获得更高性能时,它可能会使用相当多的内存。通常,通过增加处理负载的进程数量,可以在 Airflow 中实现更高的性能,并且每个进程都需要加载完整的 Python 解释器、导入大量类、临时内存存储。Airflow 通过使用分叉和写时复制内存来优化其中很大一部分,但在分叉后导入新类的情况下,这可能会导致额外的内存压力。您需要注意您的系统是否使用了超过其拥有的内存 - 这会导致使用交换磁盘,从而大大降低性能。请注意,2.1.4版本之前的 Airflow Scheduler 会生成大量由日志文件使用的页面缓存内存(当日志文件未被删除时)。这通常是无害的,因为内存只是缓存,可以随时由系统回收,但是,在2.1.4及更高版本中,写入日志不会生成过多的页面缓存内存。无论如何 - 确保在查看内存使用情况时,要注意您正在观察的内存类型。通常,您应该查看工作内存(名称可能会因您的部署而异),而不是已用总内存

您可以做些什么来提高 Scheduler 的性能

当您了解了您的资源使用情况后,您可以考虑的改进可能是

  • 改进逻辑、解析效率并降低顶级 DAG Python 代码的复杂性。它会被持续解析,因此优化该代码可能会带来巨大的改进,尤其是在解析 DAG 时尝试访问某些外部数据库等时(应该不惜一切代价避免这种情况)。顶级 Python 代码解释了编写顶级 Python 代码的最佳实践。降低 DAG 复杂性文档提供了一些您在想要降低代码复杂性时可以查看的领域。

  • 提高资源利用率。这是当您的系统中有空闲容量似乎未得到充分利用时(同样,CPU、内存 I/O、网络是主要候选者)- 您可以采取诸如增加调度程序数量、解析进程或减少间隔以进行更频繁的操作等操作,这些操作可能会以提高这些资源的利用率为代价来提高性能。

  • 增加硬件容量(例如,如果您发现 CPU 或用于 DAG 文件系统的 I/O 已达到极限)。通常,调度程序性能出现问题仅仅是因为您的系统“能力”不足,这可能是唯一的方法。例如,如果您发现您正在使用机器上的所有 CPU,您可能希望在新机器上添加另一个调度程序 - 在大多数情况下,当您添加第二个或第三个调度程序时,调度容量会线性增长(除非共享数据库或文件系统是瓶颈)。

  • 尝试使用“调度程序可调参数”的不同值。通常,您只需将一个性能方面换成另一个性能方面,就可以获得更好的效果。例如,如果您想降低 CPU 使用率,您可以增加文件处理间隔(但结果是新 DAG 的出现延迟会更大)。通常,性能调优是平衡不同方面的艺术。

  • 有时,您会稍微更改调度程序行为(例如,更改解析排序顺序),以便为您的特定部署获得更好的微调结果。

调度程序配置选项

以下配置设置可用于控制 Scheduler 的各个方面。但是,您也可以查看配置参考[scheduler]部分中提供的其他与性能无关的调度程序配置参数。

  • max_dagruns_to_create_per_loop

    这会更改每个调度程序在创建 DAG 运行时锁定的 DAG 数量。将其设置得更低的一个可能原因是,如果您有巨大的 DAG(每个 DAG 大约有 10k+ 个任务)并且正在运行多个调度程序,您不希望一个调度程序完成所有工作。

  • max_dagruns_per_loop_to_schedule

    调度程序在调度和排队任务时应检查(并锁定)多少个 DagRun。增加此限制将允许较小的 DAG 具有更高的吞吐量,但可能会降低较大的 DAG(例如 >500 个任务)的吞吐量。在使用多个调度程序时将其设置得太高也可能导致一个调度程序占用所有 DAG 运行,而其他调度程序则没有工作可做。

  • use_row_level_locking

    调度程序是否应在相关查询中发出SELECT ... FOR UPDATE。如果将其设置为 False,则一次不应运行多个调度程序。

  • pool_metrics_interval

    池使用情况统计信息应多久(以秒为单位)发送到 StatsD(如果启用了 statsd_on)。计算此查询的成本相对较高,因此应将其设置为与您的 StatsD 汇总周期相同的时间段。

  • orphaned_tasks_check_interval

    调度程序应多久(以秒为单位)检查一次孤立任务或失效的 SchedulerJob。

    此设置控制如何注意到失效的调度程序以及它正在“监督”的任务如何被另一个调度程序接收。这些任务将继续运行,因此在一段时间内未检测到这种情况并无害处。

    当一个 SchedulerJob 被检测为“dead”(由 scheduler_health_check_threshold 决定)时,任何由该dead进程启动的正在运行或排队的任务都将被此调度器“接管”并监控。

  • dag_dir_list_interval 扫描 DAGs 目录以查找新文件的频率(以秒为单位)。

  • file_parsing_sort_mode 调度器将列出并对 DAG 文件进行排序,以确定解析顺序。

  • max_tis_per_query 调度主循环中查询的批量大小。这不应大于 core.parallelism。如果此值过高,则 SQL 查询性能可能会受到查询谓词复杂性和/或过度锁定的影响。

    此外,您可能会达到数据库允许的最大查询长度。将其设置为 0 可使用 core.parallelism 的值。

  • min_file_process_interval 重新解析 DAG 文件之前的秒数。每隔 min_file_process_interval 秒解析一次 DAG 文件。在此间隔后,将反映对 DAG 的更新。保持较低的数字会增加 CPU 使用率。

  • parsing_processes 调度器可以并行运行多个进程来解析 DAG 文件。这定义了将运行多少个进程。

  • scheduler_idle_sleep_time 控制调度器在循环之间休眠的时间,但如果循环中没有任何操作。即,如果它安排了一些东西,它将立即开始下一个循环迭代。此参数的名称不正确(历史原因),将来会重命名,并弃用当前名称。

  • schedule_after_task_execution 任务主管进程是否应执行“迷你调度器”以尝试调度同一 DAG 的更多任务。保持启用此选项将意味着同一 DAG 中的任务执行速度更快,但在某些情况下可能会饿死其他 DAG。

此条目有帮助吗?