DAG 序列化¶
为了使 Airflow Webserver 无状态,Airflow >=1.10.7 支持 DAG 序列化和数据库持久化。从 Airflow 2.0.0 开始,调度器也使用序列化的 dag 来保持一致性并做出调度决策。

在没有 DAG 序列化和数据库持久化的情况下,Webserver 和调度器都需要访问 DAG 文件。调度器和 Webserver 都会解析 DAG 文件。
通过 DAG 序列化,我们旨在将 Webserver 与 DAG 解析解耦,这将使 Webserver 变得非常轻量级。
如上图所示,使用此功能时,调度器中的 DagFileProcessorProcess
会解析 DAG 文件,将其序列化为 JSON 格式并作为 SerializedDagModel
模型保存到元数据数据库 (Metadata DB) 中。
现在,Webserver 无需再次解析 DAG 文件,而是读取 JSON 格式的序列化 dag,将其反序列化并创建 DagBag,然后使用它在 UI 中显示。而调度器无需实际的 dag 来做出调度决策,从 Airflow 2.0.0 开始,我们使用包含调度 dag所需所有信息的序列化 dag,而不是使用 dag 文件(这是 调度器高可用性 (Scheduler HA) 的一部分)。
DAG 序列化实现的关键特性之一是,Webserver 启动时不再加载整个 DagBag,而是从 Serialized Dag 表中按需加载每个 DAG。这有助于减少 Webserver 的启动时间和内存消耗。当您有大量 dag 时,这种减少非常显著。
您可以启用将源代码存储在数据库中,使 Webserver 完全独立于 DAG 文件。如果您的文件嵌入在 Docker 镜像中或您可以通过其他方式将其提供给 Webserver,则无需这样做。数据存储在 DagCode
模型中。
最后一个要素是模板字段的渲染 (rendering)。启用序列化后,模板不会渲染到请求中,而是在任务工作进程 (worker) 上执行之前保存字段内容的副本。数据存储在 RenderedTaskInstanceFields
模型中。为了限制数据库的过度增长,仅保留最新的条目,旧的条目会被清除。
注意
从 Airflow 2.0+ 开始,DAG 序列化是强制要求且无法关闭的。
DAG 序列化设置¶
在 airflow.cfg
中添加以下设置
[core]
# You can also update the following default configurations based on your needs
min_serialized_dag_update_interval = 30
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
compress_serialized_dags = False
min_serialized_dag_update_interval
:此标志设置数据库中序列化 dag 的最小更新间隔(以秒为单位)。这有助于减少数据库写入速率。min_serialized_dag_fetch_interval
:此选项控制当序列化 DAG 已加载到 Webserver 的 DagBag 中时,多久(多频繁)会从数据库中重新获取一次。将其设置得更高会减少数据库负载,但代价是可能会显示过期的 DAG 缓存版本。max_num_rendered_ti_fields_per_task
:此选项控制每个任务在数据库中存储的最大渲染任务实例字段 (Rendered Task Instance Fields)(模板字段)数量。compress_serialized_dags
:此选项控制是否将序列化的 DAG 压缩后存储到数据库中。当您的集群中有非常大的 dag 时,这很有用。当设置为True
时,这将禁用 DAG 依赖视图。
如果您正在从 <1.10.7 版本更新 Airflow,请不要忘记运行 airflow db migrate
。
限制¶
使用用户自定义过滤器和宏时,Webserver 中的渲染视图 (Rendered View) 对于尚未执行的任务实例 (TIs) 可能会显示不正确的结果,因为它可能使用了 Webserver 无法访问的外部模块。在这种情况下,请使用
airflow tasks render
CLI 命令来调试或测试模板字段的渲染。一旦任务开始执行,渲染后的模板字段将存储在数据库的单独表中,之后 Webserver(渲染视图选项卡)中将显示正确的值。
注意
您需要 Airflow >= 1.10.10 才能获得完全无状态的 Webserver。Airflow 1.10.7 到 1.10.9 版本在某些情况下仍需要访问 DAG 文件。更多信息请参阅:https://airflow.apache.org/docs/1.10.9/dag-serialization.html#limitations
使用不同的 JSON 库¶
要使用不同的 JSON 库(例如 ujson
),而不是标准的 json
库,您需要在本地 Airflow 设置文件 (airflow_local_settings.py
) 中定义一个 json
变量,如下所示:
import ujson
json = ujson
有关如何配置本地设置的详细信息,请参阅配置本地设置。