调度器

Airflow 调度器监控所有任务和 DAG,然后在它们的依赖关系完成时触发任务实例。在后台,调度器启动一个子进程,该子进程监控并与指定 DAG 目录中的所有 DAG 保持同步。默认情况下,调度器每分钟收集一次 DAG 解析结果,并检查是否有任何活动任务可以触发。

Airflow 调度器设计为在 Airflow 生产环境中作为持久服务运行。要启动它,您只需执行 airflow scheduler 命令。它使用 airflow.cfg 中指定的配置。

调度器使用配置的 执行器 来运行准备好的任务。

要启动调度器,只需运行以下命令

airflow scheduler

一旦调度器成功运行,您的 DAG 将开始执行。

注意

第一个 DAG 运行是根据 DAG 中任务的最小 start_date 创建的。后续的 DAG 运行根据 DAG 的 时间表 创建。

对于具有 cron 或 timedelta 计划的 DAG,调度器不会触发您的任务,直到它覆盖的时间段结束,例如,schedule 设置为 @daily 的作业会在一天结束后运行。此技术确保在执行 DAG 之前,该期间所需的任何数据都完全可用。在 UI 中,它看起来好像 Airflow 延迟 一天运行您的任务。

注意

如果您在一天的 schedule 上运行 DAG,则数据间隔从 2019-11-21 开始的运行会在 2019-11-21T23:59 之后触发。

让我们重复一遍,调度器会在开始日期之后的一个 schedule,在间隔的末尾运行您的作业。

您应该参考 DAG 运行 以了解有关调度 DAG 的详细信息。

注意

调度器专为高吞吐量而设计。这是一个明智的设计决策,旨在尽快调度任务。调度器检查池中可用的空闲插槽数量,并在一次迭代中调度最多该数量的任务实例。这意味着,只有当等待调度的任务多于队列插槽时,任务优先级才会生效。因此,在共享同一批次的情况下,可能会出现低优先级任务在优先级高的任务之前被调度的情况。有关更多信息,您可以参考 此 GitHub 讨论

DAG 文件处理

您可以让 Airflow 调度器负责启动将 DAG 文件夹中包含的 Python 文件转换为包含要调度的任务的 DAG 对象的过程。

请参阅 DAG 文件处理 以了解如何实现此目的的详细信息

使用未来日期触发 DAG

如果您想使用“外部触发器”来运行未来日期的数据间隔,请在 airflow.cfg 中的 scheduler 部分设置 allow_trigger_in_future = True。这仅在您的 DAG 定义为 schedule=None 时有效。当设置为 False (默认值)时,如果您手动触发具有未来日期数据间隔的运行,则调度器将不会执行它,直到其 data_interval_start 处于过去。

运行多个调度器

Airflow 支持同时运行多个调度器 - 这既是为了提高性能,也是为了提高弹性。

概述

HA 调度器旨在利用现有的元数据数据库。这主要是为了操作简单性:每个组件都必须与此数据库通信,并且不使用调度器之间的直接通信或共识算法(Raft、Paxos 等)或另一个共识工具(例如 Apache Zookeeper 或 Consul),我们已将“操作表面积”保持在最低限度。

调度器现在使用序列化的 DAG 表示来做出调度决策,调度循环的大致轮廓是

  • 检查是否有任何 DAG 需要新的 DagRun,并创建它们

  • 检查一批 DagRun 中可调度的 TaskInstances 或完成的 DagRun

  • 选择可调度的 TaskInstances,并在遵守池限制和其他并发限制的同时,将其排队以供执行

然而,这确实对数据库提出了一些要求。

数据库要求

简而言之,PostgreSQL 12+ 或 MySQL 8.0+ 的用户都已准备就绪 - 您可以根据需要启动任意多个调度器副本 - 无需进一步设置或配置选项。如果您使用的是其他数据库,请继续阅读。

为了保持性能和吞吐量,调度循环中有一部分会在内存中执行许多计算(因为对于每个 TaskInstance 都必须往返数据库会太慢),因此我们需要确保一次只有一个调度器处于此关键部分中 - 否则将无法正确遵守限制。为了实现这一点,我们使用数据库行级锁(使用 SELECT ... FOR UPDATE)。

此关键部分是 TaskInstances 从计划状态转换为排队到执行器的地方,同时确保遵守各种并发和池限制。通过请求对 Pool 表的每一行进行行级写锁来获得关键部分(大致相当于 SELECT * FROM slot_pool FOR UPDATE NOWAIT,但确切的查询略有不同)。

完全支持以下数据库,并提供“最佳”体验

  • PostgreSQL 12+

  • MySQL 8.0+

警告

MariaDB 直到 10.6.0 版本才实现 SKIP LOCKEDNOWAIT SQL 子句。如果没有这些功能,则不支持运行多个调度器,并且已报告死锁错误。MariaDB 10.6.0 及更高版本可能可以与多个调度器一起正常工作,但尚未对此进行测试。

注意

尚未测试 Microsoft SQL Server 的 HA。

微调调度器性能

哪些因素影响调度器的性能

调度器负责两项操作

  • 持续解析 DAG 文件并与数据库中的 DAG 同步

  • 持续调度任务以供执行

这两个任务由调度器并行执行,并在不同的进程中独立运行。为了微调您的调度器,您需要考虑许多因素

  • 您拥有的部署类型
    • 您有哪些类型的文件系统来共享 DAG(影响持续读取 DAG 的性能)

    • 文件系统的速度有多快(在许多分布式云文件系统的情况下,您可以额外付费以获得更高的吞吐量/更快的文件系统)

    • 您有多少内存用于处理

    • 您有多少可用的 CPU

    • 您有多少可用的网络吞吐量

  • 您的 DAG 结构的逻辑和定义
    • 您有多少 DAG 文件

    • 您的文件中包含多少个 DAG

    • DAG 文件有多大(请记住 DAG 解析器需要每隔 n 秒读取和解析文件)

    • 它们的复杂程度如何(即,它们可以多快被解析、它们有多少任务和依赖项)

    • 解析您的 DAG 文件是否涉及在顶层导入大量库或进行繁重的处理(提示!不应该这样。请参阅 顶层 Python 代码

  • 调度器配置
    • 您有多少个调度器

    • 您的调度器中有多少个解析进程

    • 调度器在重新解析同一个 DAG 之间等待多长时间(它会持续发生)

    • 调度器在一个循环中处理多少个任务实例

    • 每个循环应该创建/调度多少个新的 DAG 运行

    • 调度器应该多久执行一次清理并检查孤立任务/采用它们

为了进行微调,最好了解调度器的工作原理。您可以观看 Airflow Summit 2021 的 深入了解 Airflow 调度器 的演讲来进行微调。

如何进行调度器的微调

Airflow 提供了许多可以用来微调性能的“旋钮”,但这是一个单独的任务,具体取决于您的特定部署、DAG 结构、硬件可用性和期望,以决定转动哪些旋钮才能获得最佳效果。管理部署的一部分工作是决定您要优化什么。一些用户可以接受新 DAG 解析延迟 30 秒,以降低 CPU 使用率为代价,而另一些用户则希望 DAG 在 DAG 文件夹中出现时几乎立即解析,例如,以更高的 CPU 使用率为代价。

Airflow 使您可以灵活地进行决定,但是您应该找出对您而言最重要的性能方面,并决定要朝着哪个方向转动哪些旋钮。

通常,对于微调,您应该采用与任何性能改进和优化相同的方法(我们不会推荐任何特定的工具 - 只需使用您通常用于观察和监视系统的工具)。

  • 使用您通常用于监视系统的正确工具集来监视系统非常重要。本文档不详细介绍您可以使用的特定指标和工具,它仅描述了您应该监视的资源类型,但是您应该遵循最佳的监视实践来获取正确的数据。

  • 确定对您而言最重要的性能方面(您想要改进的内容)

  • 观察您的系统以查看瓶颈所在:CPU、内存、I/O 是通常的限制因素

  • 根据您的期望和观察结果 - 确定您的下一个改进,然后返回观察您的性能和瓶颈。性能改进是一个迭代过程。

哪些资源可能会限制调度器的性能

您应该注意以下几个资源使用方面:

  • 文件系统性能。Airflow 调度器严重依赖于解析(有时是大量)Python 文件,这些文件通常位于共享文件系统上。Airflow 调度器会持续读取和重新解析这些文件。相同的文件必须提供给工作节点,因此它们通常存储在分布式文件系统中。您可以使用各种文件系统来实现此目的(NFS、CIFS、EFS、GCS fuse、Azure 文件系统都是很好的例子)。您可以控制这些文件系统的各种参数并微调其性能,但这超出了本文档的范围。您应该观察文件系统的统计信息和使用情况,以确定问题是否来自文件系统性能。例如,有传闻证据表明,增加 EFS 性能的 IOPS(并支付更多费用)可以显著提高使用 EFS 时解析 Airflow DAG 的稳定性和速度。

  • 如果文件系统性能成为瓶颈,另一种解决方案是转向分发 DAG 的替代机制。将 DAG 嵌入到您的映像中和 GitSync 分发都具有以下特性:文件在本地可用于调度器,并且它不必使用分布式文件系统来读取文件,这些文件在本地可用于调度器,并且通常尽可能快,尤其是在您的机器使用快速 SSD 磁盘进行本地存储的情况下。这些分发机制具有其他特性,可能使其不是您的最佳选择,但是如果您的性能问题来自分布式文件系统性能,那么它们可能是最佳选择。

  • 当您想提高性能并并行处理更多事物时,数据库连接和数据库使用可能会成为问题。众所周知,Airflow “需要大量数据库连接” - 您拥有的 DAG 越多,并且您想并行处理的越多,将打开的数据库连接就越多。对于 MySQL 来说,这通常不是问题,因为其处理连接的模型是基于线程的,但是对于 Postgres 来说,这可能是一个问题,因为其连接处理是基于进程的。人们普遍认为,如果您有即使是中等规模的基于 Postgres 的 Airflow 安装,最好的解决方案是使用 PGBouncer 作为数据库的代理。Apache Airflow 的 Helm Chart 开箱即用地支持 PGBouncer。

  • CPU 使用率对于 FileProcessor 最重要 - 这些是解析和执行 Python DAG 文件的进程。由于调度器会持续触发此类解析,因此当您有大量 DAG 时,处理可能会占用大量 CPU。您可以通过增加 min_file_process_interval 来缓解此问题,但这是上述权衡之一,其结果是对此类文件的更改将被较慢地拾取,并且您将看到提交文件与在 Airflow UI 中获取文件并由调度器执行之间存在延迟。优化 DAG 的构建方式,避免外部数据源是提高 CPU 使用率的最佳方法。如果您有更多的 CPU 可用,则可以增加处理线程的数量 parsing_processes,此外,Airflow 调度器几乎可以随着多个实例线性扩展,因此如果您的调度器性能受 CPU 限制,您也可以添加更多的调度器。

  • 当您尝试从中获得更高的性能时,Airflow 可能会使用大量的内存。通常,在 Airflow 中,通过增加处理负载的进程数量可以获得更高的性能,并且每个进程都需要加载完整的 Python 解释器、导入许多类和临时的内存中存储。Airflow 通过使用 fork 和 copy-on-write 内存来优化其中许多内容,但是如果在 fork 之后导入新类,则可能会导致额外的内存压力。您需要观察您的系统是否使用了比它拥有的更多的内存 - 这会导致使用交换磁盘,这会显著降低性能。请注意,在 2.1.4 之前的版本中,Airflow 调度器生成了大量日志文件使用的 Page Cache 内存(当日志文件未被删除时)。这通常是无害的,因为内存只是缓存,系统可以随时回收,但是,在 2.1.4 及更高版本中,写入日志不会生成过多的 Page Cache 内存。无论如何 - 在查看内存使用情况时,请确保注意您正在观察的内存类型。通常,您应该查看 工作内存(名称可能因您的部署而异)而不是``使用的总内存``

您可以做些什么来提高调度器的性能

当您知道您的资源使用情况时,您可以考虑的改进可能是:

  • 改进逻辑,提高解析效率,并降低顶层 DAG Python 代码的复杂性。它会持续解析,因此优化该代码可能会带来巨大的改进,尤其是在您尝试在解析 DAG 时访问某些外部数据库等时(应不惜一切代价避免这种情况)。顶层 Python 代码 解释了编写顶层 Python 代码的最佳实践。降低 DAG 复杂性 文档提供了一些您在想要降低代码复杂性时可以查看的方面。

  • 提高资源利用率。当您的系统中存在似乎未被充分利用的空闲容量时(CPU、内存 I/O、网络是主要候选对象) - 您可以采取诸如增加调度器数量、解析进程数量或缩短间隔以更频繁操作的措施,这些措施可能会在提高性能的同时,以更高的利用率为代价。

  • 提高硬件容量(例如,如果您发现 CPU 限制了您,或者您用于 DAG 文件系统的 I/O 已达到极限)。调度器性能的问题通常仅仅是因为您的系统“能力”不足,这可能是唯一的方法。例如,如果您看到您在机器上使用了所有 CPU,您可能需要在新机器上添加另一个调度器 - 在大多数情况下,当您添加第二个或第三个调度器时,调度容量会线性增长(除非共享数据库或文件系统是瓶颈)。

  • 尝试使用“调度器可调参数”的不同值。通常,您只需将一个性能方面换成另一个方面即可获得更好的效果。例如,如果您想降低 CPU 使用率,您可以增加文件处理间隔(但结果是新 DAG 将会以更大的延迟出现)。通常,性能调整是平衡不同方面的艺术。

  • 有时,您可以稍微更改调度器的行为(例如,更改解析排序顺序),以便为您的特定部署获得更好的微调结果。

调度器配置选项

以下配置设置可用于控制调度器的各个方面。但是,您还可以查看 配置参考[scheduler] 部分中提供的其他与性能无关的调度器配置参数。

  • max_dagruns_to_create_per_loop

    此设置更改了在创建 DAG 运行时每个调度器锁定的 DAG 数量。降低此值的一个可能原因是,如果你的 DAG 非常庞大(每个 DAG 有 10,000 个以上的任务),并且正在运行多个调度器,你不会希望一个调度器完成所有工作。

  • max_dagruns_per_loop_to_schedule

    调度器在调度和将任务排队时应检查(和锁定)多少个 DagRun。增加此限制将允许较小的 DAG 获得更高的吞吐量,但可能会降低较大(例如,>500 个任务)DAG 的吞吐量。当使用多个调度器时,如果此设置过高,也可能导致一个调度器获取所有 DAG 运行,而其他调度器则无事可做。

  • use_row_level_locking

    调度器是否应在相关查询中发出 SELECT ... FOR UPDATE。如果此项设置为 False,则不应同时运行多个调度器。

  • pool_metrics_interval

    池使用情况统计信息应多久(以秒为单位)发送到 StatsD(如果启用了 statsd_on)。这是一个计算量相对较大的查询,因此应将其设置为与 StatsD 汇总周期相同的时间段。

  • orphaned_tasks_check_interval

    调度器应多久(以秒为单位)检查孤立的任务或失效的 SchedulerJob。

    此设置控制如何注意到失效的调度器,以及如何将它“监督”的任务由另一个调度器接管。任务将保持运行,因此暂时不检测到这种情况没有害处。

    当检测到 SchedulerJob“失效”(由 scheduler_health_check_threshold 确定)时,任何由失效进程启动的正在运行或排队的任务都将被此调度器“采用”并监控。

  • dag_dir_list_interval 扫描 DAG 目录以查找新文件的频率(以秒为单位)。

  • file_parsing_sort_mode 调度器将列出并排序 DAG 文件以决定解析顺序。

  • max_tis_per_query 调度主循环中查询的批量大小。此值不应大于 core.parallelism。如果此值过高,则 SQL 查询性能可能会受到查询谓词复杂性和/或过度锁定的影响。

    此外,你可能会达到数据库允许的最大查询长度。将其设置为 0 以使用 core.parallelism 的值。

  • min_file_process_interval 重新解析 DAG 文件之间的秒数。每隔 min_file_process_interval 秒解析一次 DAG 文件。在此间隔之后会反映对 DAG 的更新。保持此值较低会增加 CPU 使用率。

  • parsing_processes 调度器可以并行运行多个进程来解析 DAG 文件。这定义了将运行多少个进程。

  • scheduler_idle_sleep_time 控制调度器在循环之间休眠多长时间,但前提是在循环中没有要执行的操作。即,如果它调度了某些内容,它将立即开始下一个循环迭代。此参数命名不佳(历史原因),将来会重命名并弃用当前名称。

  • schedule_after_task_execution 任务主管进程是否应执行“迷你调度”以尝试调度同一 DAG 的更多任务。启用此设置将意味着同一 DAG 中的任务执行得更快,但在某些情况下可能会使其他 DAG 处于饥饿状态。

此条目是否有帮助?