DAG 序列化¶
为了使 Airflow Webserver 无状态,Airflow >=1.10.7 支持 DAG 序列化和数据库持久化。从 Airflow 2.0.0 开始,调度器也使用序列化的 DAG 来保持一致性并做出调度决策。
在没有 DAG 序列化和数据库持久化的情况下,Webserver 和调度器都需要访问 DAG 文件。调度器和 Webserver 都解析 DAG 文件。
通过DAG 序列化,我们旨在将 Webserver 与 DAG 解析解耦,这将使 Webserver 非常轻量级。
如上图所示,当使用此功能时,调度器中的 DagFileProcessorProcess
会解析 DAG 文件,将其序列化为 JSON 格式,并将其作为 SerializedDagModel
模型保存在元数据数据库中。
现在,Webserver 不必再次解析 DAG 文件,而是读取 JSON 格式的序列化 DAG,将其反序列化并创建 DagBag,并使用它在 UI 中显示。并且,调度器不需要实际的 DAG 来做出调度决策,而是使用序列化的 DAG,其中包含从 Airflow 2.0.0 开始调度 DAG 所需的所有信息(这是作为 调度器 HA 的一部分完成的)。
作为 DAG 序列化一部分实现的关键特性之一是,当 WebServer 启动时,我们不是加载整个 DagBag,而是仅按需从序列化的 DAG 表中加载每个 DAG。这有助于减少 Webserver 的启动时间和内存。当您拥有大量 DAG 时,这种减少非常明显。
您可以启用将源代码存储在数据库中,以使 Webserver 完全独立于 DAG 文件。如果您的文件嵌入在 Docker 镜像中,或者您可以通过其他方式将其提供给 Webserver,则此操作不是必需的。数据存储在 DagCode
模型中。
最后一个要素是渲染模板字段。启用序列化后,模板不会渲染到请求,而是在任务在工作节点上执行之前保存字段内容的副本。数据存储在 RenderedTaskInstanceFields
模型中。为了限制数据库的过度增长,仅保留最新的条目,并清除较旧的条目。
注意
从 Airflow 2.0+ 开始,严格要求 DAG 序列化,并且无法关闭。
DAG 序列化设置¶
在 airflow.cfg
中添加以下设置
[core]
# You can also update the following default configurations based on your needs
min_serialized_dag_update_interval = 30
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
compress_serialized_dags = False
min_serialized_dag_update_interval
: 此标志设置数据库中序列化 DAG 应更新的最小间隔(以秒为单位)。这有助于降低数据库写入速率。min_serialized_dag_fetch_interval
: 此选项控制当序列化 DAG 已加载到 Webserver 的 DagBag 中时,从数据库重新获取序列化 DAG 的频率。将其设置得更高会减少数据库上的负载,但会以显示 DAG 可能过时的缓存版本为代价。max_num_rendered_ti_fields_per_task
: 此选项控制每个任务要在数据库中存储的最大渲染任务实例字段(模板字段)数。compress_serialized_dags
: 此选项控制是否将序列化的 DAG 压缩到数据库。当您的集群中有非常大的 DAG 时,这非常有用。当True
时,这将禁用 DAG 依赖项视图。
如果您要从 <1.10.7 更新 Airflow,请不要忘记运行 airflow db migrate
。
限制¶
当使用用户定义的过滤器和宏时,Webserver 中的渲染视图对于尚未执行的 TI 可能会显示不正确的结果,因为它可能会使用 Webserver 无法访问的外部模块。在这种情况下,请使用
airflow tasks render
CLI 命令来调试或测试 template_fields 的渲染。一旦任务执行开始,渲染的模板字段将存储在数据库中的单独表中,之后 Webserver(渲染视图选项卡)中将显示正确的值。
注意
您需要 Airflow >= 1.10.10 才能获得完全无状态的 Webserver。Airflow 1.10.7 到 1.10.9 在某些情况下需要访问 DAG 文件。更多信息:https://airflow.apache.org/docs/1.10.9/dag-serialization.html#limitations