将 Airflow® 升级到较新版本

为什么需要升级

较新的 Airflow 版本可能包含数据库迁移,因此您必须运行 airflow db migrate 来使用您要升级到的 Airflow 版本中的模式更改来迁移您的数据库。不用担心,即使没有要执行的迁移,也可以安全运行。

Airflow 版本 x 和 y 之间有哪些变化?

发行说明 列出了任何给定 Airflow 版本中包含的更改。

升级准备 - 备份数据库

强烈建议在任何迁移之前备份您的元数据数据库。如果您的数据库没有“热备份”功能,您应该在关闭 Airflow 实例后进行备份,以便您的数据库备份保持一致。如果您没有进行备份并且您的迁移失败,您可能会最终处于半迁移状态,并且从备份恢复数据库并重复迁移可能是唯一的简单方法。例如,这可能是由于 CLI 和数据库之间的网络连接在迁移发生时断开而导致的,因此进行备份是避免此类问题的重要预防措施。

何时需要升级

如果您有基于 virtualenv 或 Docker 容器的自定义部署,您通常需要在升级过程中手动运行数据库迁移。

在某些情况下,升级会自动发生 - 这取决于您的部署中,升级是否内置为安装后操作。例如,当您使用 适用于 Apache Airflow 的 Helm Chart 并启用了升级后钩子时,数据库升级会在新软件安装后自动发生。类似地,当您选择通过其 UI 升级 airflow 时,所有 Airflow-As-A-Service 解决方案都会自动为您执行升级。

如何升级

重新安装 Apache Airflow®,指定所需的新版本。

要升级引导的本地实例,您可以在重新运行安装命令之前将 AIRFLOW_VERSION 环境变量设置为预期版本。按补丁版本增量升级:例如,如果从 2.8.2 版升级到 2.8.4 版,则先升级到 2.8.3 版。有关更详细的指导,请参阅快速入门

要升级 PyPI 包,请在您的环境中使用所需的版本作为约束重新运行 pip install 命令。有关更详细的指导,请参阅从 PyPI 安装

为了手动迁移数据库,您应该在您的环境中运行 airflow db migrate 命令。它可以在您的虚拟环境或允许您访问 Airflow CLI 使用命令行界面 和数据库的容器中运行。

离线 SQL 迁移脚本

如果要离线运行升级脚本,可以使用 -s--show-sql-only 标志来获取将要执行的 SQL 语句。您还可以使用 --from-version 标志指定起始 airflow 版本,并使用 -n--to-version 标志指定结束 airflow 版本。从 Airflow 2.0.0 开始,Postgres 和 MySQL 支持此功能。

Airflow 2.7.0 或更高版本的示例用法

airflow db migrate -s --from-version "2.4.3" -n "2.7.3" airflow db migrate --show-sql-only --from-version "2.4.3" --to-version "2.7.3"

注意

自 Airflow 2.7.0 版本起,airflow db upgrade 已被 airflow db migrate 取代,并且前者已被弃用。

处理迁移问题

MySQL 数据库中错误的编码

如果您使用旧的 Airflow 1.10 作为最初手动创建或使用先前版本的 MySQL 创建的数据库,则根据数据库的原始字符集,您可能在迁移到较新版本的 Airflow 时遇到问题,并且您的迁移可能会失败并出现奇怪的错误(“键大小太大”、“缺少索引”等)。下一章将介绍如何手动解决此问题。

为什么会出现错误?MySQL 8 数据库的建议字符集/排序规则分别为 utf8mb4utf8mb4_bin。但是,这在不同版本的 MySQL 中一直在变化,并且您可能会有使用不同字符集自定义创建的数据库。如果您的数据库是使用旧版本的 Airflow 或 MySQL 创建的,则在创建数据库时或在迁移过程中,编码可能已错误。

不幸的是,MySQL 限制了索引键大小,而使用 utf8mb4,Airflow 索引键大小可能太大而无法由 MySQL 处理。因此,在 Airflow 中,我们强制所有“ID”键使用 utf8 字符集(这在 MySQL 8 中等效于 utf8mb3)。这限制了索引的大小,以便 MySQL 可以处理它们。

以下是您可以在尝试迁移之前遵循的步骤(如果您知道自己在做什么,也可以选择以自己的方式执行)。

熟悉 Airflow 的内部数据库结构,您可以在 数据库 ERD 模式 中找到它,并在 数据库迁移参考 中找到迁移列表。

  1. 备份您的数据库,以便您在出现错误时可以恢复它。

  2. 检查哪些表需要修复。查看这些表

SHOW CREATE TABLE task_reschedule;
SHOW CREATE TABLE xcom;
SHOW CREATE TABLE task_fail;
SHOW CREATE TABLE rendered_task_instance_fields;
SHOW CREATE TABLE task_instance;

请务必复制输出。您将在最后一步中需要它。您的 dag_idrun_idtask_idkey 列应明确设置了 utf8utf8mb3 字符集,类似于

``task_id`` varchar(250) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,  # correct

``task_id`` varchar(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL,  # correct

问题在于如果您的字段没有编码

``task_id`` varchar(250),  # wrong !!

或者只是排序规则设置为 utf8mb4

``task_id`` varchar(250) COLLATE utf8mb4_unicode_ci DEFAULT NULL,  # wrong !!

或者将字符集和排序规则设置为 utf8mb4

``task_id`` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,  # wrong !!

你需要修复那些字符集/排序规则设置错误的字段。

3. 删除你需要修改的表的外键索引(你不需要删除所有索引 - 只需删除你需要修改的表的索引)。你需要在最后一步重新创建它们(这就是为什么你需要保留第 2 步中的 SHOW CREATE TABLE 输出)。

ALTER TABLE task_reschedule DROP FOREIGN KEY task_reschedule_ti_fkey;
ALTER TABLE xcom DROP FOREIGN KEY xcom_task_instance_fkey;
ALTER TABLE task_fail DROP FOREIGN KEY task_fail_ti_fkey;
ALTER TABLE rendered_task_instance_fields DROP FOREIGN KEY rtif_ti_fkey;

4. 修改你的 ID 字段,使其具有正确的字符集/编码。只对编码错误的字段执行此操作(以下是你可能需要使用的所有潜在命令)

ALTER TABLE task_instance MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_reschedule MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE rendered_task_instance_fields MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE rendered_task_instance_fields MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_fail MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_fail MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE sla_miss MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE sla_miss MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_map MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE xcom MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY key VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
  1. 重新创建在第 3 步中删除的外键。

对你删除的所有索引重复此操作。请注意,根据你拥有的 Airflow 版本,索引可能会略有不同(例如,map_index 是在 2.3.0 中添加的),但如果你保留第 2 步中准备的 SHOW CREATE TABLE 输出,你将找到正确的 CONSTRAINT_NAMECONSTRAINT 来使用。

# Here you have to copy the statements from SHOW CREATE TABLE output
ALTER TABLE <TABLE> ADD CONSTRAINT `<CONSTRAINT_NAME>` <CONSTRAINT>

这应该将数据库带到可以运行迁移到新 Airflow 版本的状态。

升级后警告

通常,你只需要成功运行 airflow db migrate 命令即可。但是,在某些情况下,迁移可能会在你的数据库中找到一些旧的、过时的并且可能错误的数据,并将其移到一个单独的表中。在这种情况下,你可能会在你的 Web 服务器 UI 中收到有关已找到数据的警告。

你可能会看到的典型消息

Airflow 在元数据库的 <原始表> 表中找到了不兼容的数据,并且在数据库迁移升级期间已将其移动到 <新表>。请检查移动的数据以决定是否需要保留它们,并手动删除 <新表> 表以消除此警告。

当你看到这样的消息时,这意味着你的一些数据已损坏,你应该检查它以确定是否要保留或删除其中的一些数据。最有可能的是,数据已损坏并且是从某些错误中遗留下来的,可以安全地删除 - 因为这些数据在 Airflow 中不会以任何方式可见且有用。但是,如果你出于特定的审计或历史原因而需要,你可能会选择将其存储在某处。除非你有特定的原因要保留数据,否则最有可能的是删除它是你的最佳选择。

你可以通过多种方式检查和删除数据 - 如果你可以使用自己的工具(通常是显示数据库对象的图形工具)直接访问数据库,则可以使用这些工具删除此类表或重命名它或将其移动到另一个数据库。如果你没有此类工具,则可以使用 airflow db shell 命令 - 这会将你放入数据库的 shell 工具中,你将能够检查和删除表。

如何使用 Kubernetes 删除表

  1. 进入任何 Airflow pod - Web 服务器或调度器:kubectl exec -it <你的-web服务器-pod> python

  2. 在 python shell 中运行以下命令

from airflow.settings import Session

session = Session()
session.execute("DROP TABLE _airflow_moved__2_2__task_instance")
session.commit()

请将示例中的 <table> 替换为警告消息中打印的实际表名。

检查表

SELECT * FROM <table>;

删除表

DROP TABLE <table>;

迁移最佳实践

根据你的数据库的大小和实际迁移情况,迁移可能需要相当长的时间,因此如果你的历史记录很长且数据库很大,建议先制作数据库的副本并执行测试迁移以评估迁移需要多长时间。通常,“主要”升级可能需要更长的时间,因为添加新功能有时需要重组数据库。

此条目是否有帮助?