将 Airflow™ 升级到更新版本¶
为什么需要升级¶
较新的 Airflow 版本可能包含数据库迁移,因此您必须运行 airflow db migrate
以使用要升级到的 Airflow 版本中的架构更改来迁移数据库。别担心,即使没有要执行的迁移,运行它也是安全的。
升级准备 - 备份数据库¶
强烈建议您在任何迁移之前备份元数据数据库。如果您没有数据库的“热备份”功能,则应在关闭 Airflow 实例后进行备份,以便备份保持一致。如果您没有进行备份并且迁移失败,则您最终可能会处于半迁移状态,从备份恢复数据库并重复迁移可能是唯一的简单出路。例如,这可能是由于在迁移发生时 CLI 和数据库之间的网络连接断开造成的,因此进行备份是避免此类问题的重要预防措施。
何时需要升级¶
如果您有一个基于 virtualenv 或 Docker 容器的自定义部署,您通常需要在升级过程中手动运行数据库迁移。
在某些情况下,升级会自动发生 - 这取决于您的部署中升级是否作为安装后操作内置。例如,当您使用启用了升级后钩子的 Apache Airflow 的 Helm Chart 时,数据库升级会在安装新软件后立即自动进行。类似地,所有 Airflow 即服务解决方案都会在您选择通过其 UI 升级 Airflow 时自动为您执行升级。
如何升级¶
为了手动迁移数据库,您应该在您的环境中运行 airflow db migrate
命令。它可以在您的虚拟环境中运行,也可以在允许您访问 Airflow CLI
使用命令行界面 和数据库的容器中运行。
离线 SQL 迁移脚本¶
如果要离线运行升级脚本,可以使用 -s
或 --show-sql-only
标志来获取将要执行的 SQL 语句。您还可以使用 --from-version
标志指定起始 Airflow 版本,并使用 -n
或 --to-version
标志指定结束 Airflow 版本。从 Airflow 2.0.0 开始,Postgres 和 MySQL 支持此功能。
- Airflow 2.7.0 或更高版本的示例用法
airflow db migrate -s --from-version "2.4.3" -n "2.7.3"
airflow db migrate --show-sql-only --from-version "2.4.3" --to-version "2.7.3"
注意
自 Airflow 2.7.0 版本以来,airflow db upgrade
已被 airflow db migrate
替换,前者已被弃用。
处理迁移问题¶
MySQL 数据库中的错误编码¶
如果您使用的是旧的 Airflow 1.10 作为最初手动创建或使用以前版本的 MySQL 创建的数据库,则根据数据库的原始字符集,您在迁移到较新版本的 Airflow 时可能会遇到问题,并且您的迁移可能会失败并出现奇怪的错误(“密钥大小太大”、“索引丢失”等)。下一章介绍如何手动解决此问题。
为什么您可能会收到错误消息?MySQL 8 数据库的推荐字符集/排序规则分别是 utf8mb4
和 utf8mb4_bin
。但是,这在不同版本的 MySQL 中一直在变化,并且您可能已经使用不同的字符集自定义创建了数据库。如果您的数据库是使用旧版本的 Airflow 或 MySQL 创建的,则在创建数据库或在迁移过程中损坏时,编码可能会有误。
不幸的是,MySQL 限制了索引密钥大小,并且使用 utf8mb4
,Airflow 索引密钥大小对于 MySQL 来说可能太大而无法处理。因此,在 Airflow 中,我们强制所有“ID”键使用 utf8
字符集(在 MySQL 8 中等效于 utf8mb3
)。这限制了索引的大小,以便 MySQL 可以处理它们。
以下是您可以在尝试迁移之前按照以下步骤解决此问题(但如果您知道自己在做什么,您也可以选择按照自己的方式进行)。
熟悉 Airflow 的内部数据库结构,您可以在 数据库的 ERD 模式 和您可以在 数据库迁移参考 中找到的迁移列表中找到它。
备份您的数据库,以便您可以在出错时恢复它。
检查您的哪些表需要修复。查看这些表
SHOW CREATE TABLE task_reschedule;
SHOW CREATE TABLE xcom;
SHOW CREATE TABLE task_fail;
SHOW CREATE TABLE rendered_task_instance_fields;
SHOW CREATE TABLE task_instance;
确保复制输出。您将在最后一步中需要它。您的 dag_id
、run_id
、task_id
和 key
列应显式设置 utf8
或 utf8mb3
字符集,类似于
``task_id`` varchar(250) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, # correct
或
``task_id`` varchar(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL, # correct
问题是如果您的字段没有编码
``task_id`` varchar(250), # wrong !!
或者只是将排序规则设置为 utf8mb4
``task_id`` varchar(250) COLLATE utf8mb4_unicode_ci DEFAULT NULL, # wrong !!
或将字符集和排序规则设置为 utf8mb4
``task_id`` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL, # wrong !!
您需要修复那些设置了错误字符集/排序规则的字段。
3. 删除您需要修改的表的外键索引(您不需要删除所有索引 - 只需对您需要修改的那些表执行此操作)。您需要在最后一步中重新创建它们(这就是为什么您需要保留第 2 步中的 SHOW CREATE TABLE
输出)。
ALTER TABLE task_reschedule DROP FOREIGN KEY task_reschedule_ti_fkey;
ALTER TABLE xcom DROP FOREIGN KEY xcom_task_instance_fkey;
ALTER TABLE task_fail DROP FOREIGN KEY task_fail_ti_fkey;
ALTER TABLE rendered_task_instance_fields DROP FOREIGN KEY rtif_ti_fkey;
4. 修改您的 ID
字段以使其具有正确的字符集/编码。仅对编码错误的字段执行此操作(以下是您可能需要使用的所有潜在命令)
ALTER TABLE task_instance MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_reschedule MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE rendered_task_instance_fields MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE rendered_task_instance_fields MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_fail MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_fail MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE sla_miss MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE sla_miss MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY key VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
重新创建在步骤 3 中删除的外键。
对所有已删除的索引重复此操作。请注意,根据您拥有的 Airflow 版本,索引可能略有不同(例如,map_index
是在 2.3.0 版本中添加的),但如果您保留在步骤 2 中准备的 SHOW CREATE TABLE
输出,您将找到要使用的正确的 CONSTRAINT_NAME
和 CONSTRAINT
。
# Here you have to copy the statements from SHOW CREATE TABLE output
ALTER TABLE <TABLE> ADD CONSTRAINT `<CONSTRAINT_NAME>` <CONSTRAINT>
这应该会将数据库恢复到可以运行到新 Airflow 版本的迁移的状态。
升级后警告¶
通常,您只需要成功运行 airflow db migrate
命令即可。但是,在某些情况下,迁移可能会在您的数据库中发现一些旧的、过时的、可能错误的数据,并将它们移到一个单独的表中。在这种情况下,您可能会在 Web 服务器 UI 中收到有关找到的数据的警告。
您可能会看到的典型消息
Airflow 在升级数据库迁移期间在元数据库的 <原始表> 表中发现了不兼容的数据,并将它们移动到了 <新表>。请检查移动的数据以确定是否需要保留它们,并手动删除 <新表> 表以消除此警告。
当您看到此类消息时,这意味着您的某些数据已损坏,您应该对其进行检查以确定是要保留还是删除其中一些数据。这些数据很可能是已损坏的,并且是某些错误的遗留物,可以安全删除 - 因为这些数据在 Airflow 中无论如何都不可见且无用。但是,如果您出于审计或历史原因有特殊需要,您可以选择将其存储在其他地方。除非您有特定的理由保留数据,否则删除它很可能是您的最佳选择。
您可以通过多种方式检查和删除数据 - 如果您可以使用自己的工具(通常是显示数据库对象的图形工具)直接访问数据库,则可以使用这些工具删除、重命名或将此类表移动到另一个数据库。如果您没有此类工具,则可以使用 airflow db shell
命令 - 这会将您带到数据库的数据库 shell 工具中,您将能够检查和删除该表。
如何使用 Kubernetes 删除表
进入任何 Airflow pod - Web 服务器或调度器:
kubectl exec -it <your-webserver-pod> python
在 Python shell 中运行以下命令
from airflow.settings import Session session = Session() session.execute("DROP TABLE _airflow_moved__2_2__task_instance") session.commit()
请将示例中的 <table>
替换为警告消息中打印的实际表名。
检查表
SELECT * FROM <table>;
删除表
DROP TABLE <table>;
迁移最佳实践¶
根据数据库的大小和实际迁移,迁移可能需要相当长的时间,因此,如果您的历史记录很长且数据库很大,建议先创建数据库副本并执行测试迁移以评估迁移所需的时间。通常,“主要”升级可能需要更长的时间,因为添加新功能有时需要重构数据库。