将 Airflow™ 升级到更新版本

为什么需要升级

较新的 Airflow 版本可能包含数据库迁移,因此您必须运行 airflow db migrate 以使用要升级到的 Airflow 版本中的架构更改来迁移数据库。别担心,即使没有要执行的迁移,运行它也是安全的。

Airflow 版本 x 和 y 之间有哪些变化?

发行说明 列出了任何给定 Airflow 版本中包含的更改。

升级准备 - 备份数据库

强烈建议您在任何迁移之前备份元数据数据库。如果您没有数据库的“热备份”功能,则应在关闭 Airflow 实例后进行备份,以便备份保持一致。如果您没有进行备份并且迁移失败,则您最终可能会处于半迁移状态,从备份恢复数据库并重复迁移可能是唯一的简单出路。例如,这可能是由于在迁移发生时 CLI 和数据库之间的网络连接断开造成的,因此进行备份是避免此类问题的重要预防措施。

何时需要升级

如果您有一个基于 virtualenv 或 Docker 容器的自定义部署,您通常需要在升级过程中手动运行数据库迁移。

在某些情况下,升级会自动发生 - 这取决于您的部署中升级是否作为安装后操作内置。例如,当您使用启用了升级后钩子的 Apache Airflow 的 Helm Chart 时,数据库升级会在安装新软件后立即自动进行。类似地,所有 Airflow 即服务解决方案都会在您选择通过其 UI 升级 Airflow 时自动为您执行升级。

如何升级

为了手动迁移数据库,您应该在您的环境中运行 airflow db migrate 命令。它可以在您的虚拟环境中运行,也可以在允许您访问 Airflow CLI 使用命令行界面 和数据库的容器中运行。

离线 SQL 迁移脚本

如果要离线运行升级脚本,可以使用 -s--show-sql-only 标志来获取将要执行的 SQL 语句。您还可以使用 --from-version 标志指定起始 Airflow 版本,并使用 -n--to-version 标志指定结束 Airflow 版本。从 Airflow 2.0.0 开始,Postgres 和 MySQL 支持此功能。

Airflow 2.7.0 或更高版本的示例用法

airflow db migrate -s --from-version "2.4.3" -n "2.7.3" airflow db migrate --show-sql-only --from-version "2.4.3" --to-version "2.7.3"

注意

自 Airflow 2.7.0 版本以来,airflow db upgrade 已被 airflow db migrate 替换,前者已被弃用。

处理迁移问题

MySQL 数据库中的错误编码

如果您使用的是旧的 Airflow 1.10 作为最初手动创建或使用以前版本的 MySQL 创建的数据库,则根据数据库的原始字符集,您在迁移到较新版本的 Airflow 时可能会遇到问题,并且您的迁移可能会失败并出现奇怪的错误(“密钥大小太大”、“索引丢失”等)。下一章介绍如何手动解决此问题。

为什么您可能会收到错误消息?MySQL 8 数据库的推荐字符集/排序规则分别是 utf8mb4utf8mb4_bin。但是,这在不同版本的 MySQL 中一直在变化,并且您可能已经使用不同的字符集自定义创建了数据库。如果您的数据库是使用旧版本的 Airflow 或 MySQL 创建的,则在创建数据库或在迁移过程中损坏时,编码可能会有误。

不幸的是,MySQL 限制了索引密钥大小,并且使用 utf8mb4,Airflow 索引密钥大小对于 MySQL 来说可能太大而无法处理。因此,在 Airflow 中,我们强制所有“ID”键使用 utf8 字符集(在 MySQL 8 中等效于 utf8mb3)。这限制了索引的大小,以便 MySQL 可以处理它们。

以下是您可以在尝试迁移之前按照以下步骤解决此问题(但如果您知道自己在做什么,您也可以选择按照自己的方式进行)。

熟悉 Airflow 的内部数据库结构,您可以在 数据库的 ERD 模式 和您可以在 数据库迁移参考 中找到的迁移列表中找到它。

  1. 备份您的数据库,以便您可以在出错时恢复它。

  2. 检查您的哪些表需要修复。查看这些表

SHOW CREATE TABLE task_reschedule;
SHOW CREATE TABLE xcom;
SHOW CREATE TABLE task_fail;
SHOW CREATE TABLE rendered_task_instance_fields;
SHOW CREATE TABLE task_instance;

确保复制输出。您将在最后一步中需要它。您的 dag_idrun_idtask_idkey 列应显式设置 utf8utf8mb3 字符集,类似于

``task_id`` varchar(250) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,  # correct

``task_id`` varchar(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL,  # correct

问题是如果您的字段没有编码

``task_id`` varchar(250),  # wrong !!

或者只是将排序规则设置为 utf8mb4

``task_id`` varchar(250) COLLATE utf8mb4_unicode_ci DEFAULT NULL,  # wrong !!

或将字符集和排序规则设置为 utf8mb4

``task_id`` varchar(250) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,  # wrong !!

您需要修复那些设置了错误字符集/排序规则的字段。

3. 删除您需要修改的表的外键索引(您不需要删除所有索引 - 只需对您需要修改的那些表执行此操作)。您需要在最后一步中重新创建它们(这就是为什么您需要保留第 2 步中的 SHOW CREATE TABLE 输出)。

ALTER TABLE task_reschedule DROP FOREIGN KEY task_reschedule_ti_fkey;
ALTER TABLE xcom DROP FOREIGN KEY xcom_task_instance_fkey;
ALTER TABLE task_fail DROP FOREIGN KEY task_fail_ti_fkey;
ALTER TABLE rendered_task_instance_fields DROP FOREIGN KEY rtif_ti_fkey;

4. 修改您的 ID 字段以使其具有正确的字符集/编码。仅对编码错误的字段执行此操作(以下是您可能需要使用的所有潜在命令)

ALTER TABLE task_instance MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_reschedule MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE rendered_task_instance_fields MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE rendered_task_instance_fields MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_fail MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_fail MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE sla_miss MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE sla_miss MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_map MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE xcom MODIFY task_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY dag_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY run_id VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY key VARCHAR(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
  1. 重新创建在步骤 3 中删除的外键。

对所有已删除的索引重复此操作。请注意,根据您拥有的 Airflow 版本,索引可能略有不同(例如,map_index 是在 2.3.0 版本中添加的),但如果您保留在步骤 2 中准备的 SHOW CREATE TABLE 输出,您将找到要使用的正确的 CONSTRAINT_NAMECONSTRAINT

# Here you have to copy the statements from SHOW CREATE TABLE output
ALTER TABLE <TABLE> ADD CONSTRAINT `<CONSTRAINT_NAME>` <CONSTRAINT>

这应该会将数据库恢复到可以运行到新 Airflow 版本的迁移的状态。

升级后警告

通常,您只需要成功运行 airflow db migrate 命令即可。但是,在某些情况下,迁移可能会在您的数据库中发现一些旧的、过时的、可能错误的数据,并将它们移到一个单独的表中。在这种情况下,您可能会在 Web 服务器 UI 中收到有关找到的数据的警告。

您可能会看到的典型消息

Airflow 在升级数据库迁移期间在元数据库的 <原始表> 表中发现了不兼容的数据,并将它们移动到了 <新表>。请检查移动的数据以确定是否需要保留它们,并手动删除 <新表> 表以消除此警告。

当您看到此类消息时,这意味着您的某些数据已损坏,您应该对其进行检查以确定是要保留还是删除其中一些数据。这些数据很可能是已损坏的,并且是某些错误的遗留物,可以安全删除 - 因为这些数据在 Airflow 中无论如何都不可见且无用。但是,如果您出于审计或历史原因有特殊需要,您可以选择将其存储在其他地方。除非您有特定的理由保留数据,否则删除它很可能是您的最佳选择。

您可以通过多种方式检查和删除数据 - 如果您可以使用自己的工具(通常是显示数据库对象的图形工具)直接访问数据库,则可以使用这些工具删除、重命名或将此类表移动到另一个数据库。如果您没有此类工具,则可以使用 airflow db shell 命令 - 这会将您带到数据库的数据库 shell 工具中,您将能够检查和删除该表。

如何使用 Kubernetes 删除表

  1. 进入任何 Airflow pod - Web 服务器或调度器:kubectl exec -it <your-webserver-pod> python

  2. 在 Python shell 中运行以下命令

from airflow.settings import Session

session = Session()
session.execute("DROP TABLE _airflow_moved__2_2__task_instance")
session.commit()

请将示例中的 <table> 替换为警告消息中打印的实际表名。

检查表

SELECT * FROM <table>;

删除表

DROP TABLE <table>;

迁移最佳实践

根据数据库的大小和实际迁移,迁移可能需要相当长的时间,因此,如果您的历史记录很长且数据库很大,建议先创建数据库副本并执行测试迁移以评估迁移所需的时间。通常,“主要”升级可能需要更长的时间,因为添加新功能有时需要重构数据库。

此条目有帮助吗?