设置数据库后端¶
Airflow 构建为使用 SqlAlchemy 与其元数据进行交互。
以下文档描述了数据库引擎配置、要与 Airflow 一起使用需要对其配置进行的必要更改,以及对 Airflow 配置的更改以连接到这些数据库。
选择数据库后端¶
如果您想真正试用 Airflow,您应该考虑将数据库后端设置为 PostgreSQL 或 MySQL。默认情况下,Airflow 使用 SQLite,它仅用于开发目的。
Airflow 支持以下数据库引擎版本,因此请确保您拥有哪个版本。旧版本可能不支持所有 SQL 语句。
PostgreSQL:12、13、14、15、16
MySQL:8.0、创新版
SQLite:3.15.0+
如果您计划运行多个调度程序,则必须满足其他要求。有关详细信息,请参阅 调度程序 HA 数据库要求。
警告
尽管 MariaDB 和 MySQL 之间有很大的相似之处,但我们不支持将 MariaDB 作为 Airflow 的后端。MariaDB 和 MySQL 之间存在已知问题(例如索引处理),我们不会在 MariaDB 上测试我们的迁移脚本或应用程序执行。我们知道有些人将 MariaDB 用于 Airflow,这给他们带来了很多运营上的麻烦,因此我们强烈建议不要尝试将 MariaDB 用作后端,用户不能期望为此获得任何社区支持,因为尝试使用 MariaDB 的用户数量非常少用于气流。
数据库 URI¶
Airflow 使用 SQLAlchemy 连接到数据库,这需要您配置数据库 URL。您可以在 [database]
部分中的选项 sql_alchemy_conn
中执行此操作。通常也使用 AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
环境变量配置此选项。
注意
有关设置配置的更多信息,请参阅 设置配置选项。
如果要检查当前值,可以使用 airflow config get-value database sql_alchemy_conn
命令,如下例所示。
$ airflow config get-value database sql_alchemy_conn
sqlite:////tmp/airflow/airflow.db
确切的格式描述在 SQLAlchemy 文档中进行了描述,请参阅 数据库 URL。我们还将在下面展示一些示例。
设置 SQLite 数据库¶
SQLite 数据库可用于运行 Airflow 以进行开发,因为它不需要任何数据库服务器(数据库存储在本地文件中)。使用 SQLite 数据库有很多限制(例如,它只能与 Sequential Executor 一起使用),并且它永远不应该用于生产环境。
运行 Airflow 2.0+ 需要最低版本的 sqlite3 - 最低版本为 3.15.0。一些较旧的系统默认安装了早期版本的 sqlite,对于这些系统,您需要手动升级 SQLite 以使用 3.15.0 以上的版本。请注意,这不是 python library
版本,而是需要升级的 SQLite 系统级应用程序。安装 SQLite 的方式有很多种,您可以在 SQLite 官方网站 和您的操作系统发行版特定的文档中找到有关此信息。
故障排除
有时,即使您将 SQLite 升级到更高版本并且您的本地 python 报告了更高版本,Airflow 使用的 python 解释器可能仍然使用为用于启动 Airflow 的 python 解释器设置的 LD_LIBRARY_PATH
中可用的旧版本。
您可以通过运行以下检查来确保解释器使用哪个版本
root@b8a8e73caa2c:/opt/airflow# python
Python 3.8.10 (default, Mar 15 2022, 12:22:08)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import sqlite3
>>> sqlite3.sqlite_version
'3.27.2'
>>>
但请注意,为您的 Airflow 部署设置环境变量可能会更改首先找到哪个 SQLite 库,因此您可能希望确保“足够高”版本的 SQLite 是系统中安装的唯一版本。
sqlite 数据库的示例 URI
sqlite:////home/airflow/airflow.db
在 AmazonLinux AMI 或容器镜像上升级 SQLite
AmazonLinux SQLite 只能使用源存储库升级到 v3.7。Airflow 需要 v3.15 或更高版本。使用以下说明使用最新的 SQLite3 设置基本镜像(或 AMI)
先决条件:您需要 wget
、tar
、gzip
、gcc
、make
和 expect
才能使升级过程正常工作。
yum -y install wget tar gzip gcc make expect
从 https://sqlite.ac.cn/ 下载源代码,在本地进行 make 和 install。
wget https://www.sqlite.org/src/tarball/sqlite.tar.gz
tar xzf sqlite.tar.gz
cd sqlite/
export CFLAGS="-DSQLITE_ENABLE_FTS3 \
-DSQLITE_ENABLE_FTS3_PARENTHESIS \
-DSQLITE_ENABLE_FTS4 \
-DSQLITE_ENABLE_FTS5 \
-DSQLITE_ENABLE_JSON1 \
-DSQLITE_ENABLE_LOAD_EXTENSION \
-DSQLITE_ENABLE_RTREE \
-DSQLITE_ENABLE_STAT4 \
-DSQLITE_ENABLE_UPDATE_DELETE_LIMIT \
-DSQLITE_SOUNDEX \
-DSQLITE_TEMP_STORE=3 \
-DSQLITE_USE_URI \
-O2 \
-fPIC"
export PREFIX="/usr/local"
LIBS="-lm" ./configure --disable-tcl --enable-shared --enable-tempstore=always --prefix="$PREFIX"
make
make install
安装后将 /usr/local/lib
添加到库路径
export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
设置 PostgreSQL 数据库¶
您需要创建一个数据库和一个数据库用户,Airflow 将使用它们来访问此数据库。在下面的示例中,将创建一个数据库 airflow_db
和用户名为 airflow_user
、密码为 airflow_pass
的用户
CREATE DATABASE airflow_db;
CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
-- PostgreSQL 15 requires additional privileges:
GRANT ALL ON SCHEMA public TO airflow_user;
注意
数据库必须使用 UTF-8 字符集
您可能需要更新您的 Postgres pg_hba.conf
以将 airflow
用户添加到数据库访问控制列表中;并重新加载数据库配置以加载您的更改。请参阅 Postgres 文档中的 pg_hba.conf 文件 以了解更多信息。
警告
当您使用 SQLAlchemy 1.4.0+ 时,您需要在 sql_alchemy_conn
中使用 postgresql://
作为数据库。在 SQLAlchemy 的先前版本中,可以使用 postgres://
,但在 SQLAlchemy 1.4.0+ 中使用它会导致
> raise exc.NoSuchModuleError(
"Can't load plugin: %s:%s" % (self.group, name)
)
E sqlalchemy.exc.NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:postgres
如果您不能立即更改 URL 的前缀,Airflow 将继续使用 SQLAlchemy 1.3,您可以降级 SQLAlchemy,但我们建议更新前缀。
详细信息请参阅 SQLAlchemy 更新日志。
我们建议使用 psycopg2
驱动程序并在您的 SqlAlchemy 连接字符串中指定它。
postgresql+psycopg2://<user>:<password>@<host>/<db>
另请注意,由于 SqlAlchemy 没有公开在数据库 URI 中定位特定模式的方法,因此您需要确保模式 public
在您的 Postgres 用户的 search_path 中。
如果您为 Airflow 创建了一个新的 Postgres 帐户
新 Postgres 用户的默认 search_path 为:
"$user", public
,无需更改。
如果您使用具有自定义 search_path 的当前 Postgres 用户,则可以使用以下命令更改 search_path
ALTER USER airflow_user SET search_path = public;
有关设置 PostgreSQL 连接的更多信息,请参阅 SQLAlchemy 文档中的 PostgreSQL 方言。
注意
众所周知,Airflow,尤其是在高性能设置中,会打开许多到元数据数据库的连接。这可能会导致 Postgres 资源使用出现问题,因为在 Postgres 中,每个连接都会创建一个新进程,当打开大量连接时,这会使 Postgres 资源占用过多。因此,我们建议对所有 Postgres 生产安装使用 PGBouncer 作为数据库代理。PGBouncer 可以处理来自多个组件的连接池,而且,如果您有连接可能不稳定的远程数据库,它将使您的数据库连接对临时网络问题的适应能力更强。PGBouncer 部署的示例实现可以在 Apache Airflow 的 Helm Chart 中找到,您可以在其中通过翻转布尔标志来启用预先配置的 PGBouncer 实例。在您准备自己的部署时,即使您不使用官方 Helm Chart,也可以参考我们采用的方法并将其作为灵感。
另请参阅 Helm Chart 生产指南
注意
对于托管 Postgres(例如 Azure Postgresql、CloudSQL、Amazon RDS),您应该在连接参数中使用 keepalives_idle
并将其设置为小于空闲时间,因为这些服务会在一段时间不活动(通常为 300 秒)后关闭空闲连接,这会导致错误 错误: psycopg2.operationalerror: SSL SYSCALL 错误: 检测到 EOF
。keepalive
设置可以通过 [database]
部分中的 sql_alchemy_connect_args
配置参数 配置参考 进行更改。例如,您可以在 local_settings.py 中配置 args,并且 sql_alchemy_connect_args
应该是存储配置参数的字典的完整导入路径。您可以阅读有关 Postgres Keepalives 的信息。已观察到可以解决此问题的 keepalives
示例设置可能是
keepalive_kwargs = {
"keepalives": 1,
"keepalives_idle": 30,
"keepalives_interval": 5,
"keepalives_count": 5,
}
然后,如果将其放置在 airflow_local_settings.py
中,则配置导入路径将为
sql_alchemy_connect_args = airflow_local_settings.keepalive_kwargs
有关如何配置本地设置的详细信息,请参阅 配置本地设置。
设置 MySQL 数据库¶
您需要创建一个数据库和一个数据库用户,Airflow 将使用它们来访问此数据库。在下面的示例中,将创建一个数据库 airflow_db
和用户名为 airflow_user
、密码为 airflow_pass
的用户
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';
注意
数据库必须使用 UTF-8 字符集。您必须注意的一个小问题是,较新版本 MySQL 中的 utf8 实际上是 utf8mb4,这会导致 Airflow 索引变得过大(请参阅 https://github.com/apache/airflow/pull/17603#issuecomment-901121618)。因此,从 Airflow 2.2 开始,所有 MySQL 数据库都将 sql_engine_collation_for_ids
自动设置为 utf8mb3_bin
(除非您覆盖它)。这可能会导致 Airflow 数据库中 id 字段的排序规则 id 混合,但这不会产生负面后果,因为 Airflow 中所有相关的 ID 都只使用 ASCII 字符。
为了获得合理的默认值,我们依赖于 MySQL 更严格的 ANSI SQL 设置。确保在 my.cnf
文件的 [mysqld]
部分下指定了 explicit_defaults_for_timestamp=1
选项。您也可以使用传递给 mysqld
可执行文件的 --explicit-defaults-for-timestamp
开关来激活这些选项
我们建议使用 mysqlclient
驱动程序并在您的 SqlAlchemy 连接字符串中指定它。
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
重要
MySQL 后端的集成仅在 Apache Airflow 的持续集成 (CI) 过程中使用 mysqlclient
驱动程序进行了验证。
如果您想使用其他驱动程序,请访问 SQLAlchemy 文档中的 MySQL Dialect,以获取有关 SqlAlchemy 连接的下载和设置的更多信息。
此外,您还应特别注意 MySQL 的编码。尽管 utf8mb4
字符集在 MySQL 中越来越流行(实际上,utf8mb4
成为 MySQL8.0 中的默认字符集),但在 Airflow 2+ 中使用 utf8mb4
编码需要进行其他设置(有关详细信息,请参阅 #7570。)。如果您使用 utf8mb4
作为字符集,则还应设置 sql_engine_collation_for_ids=utf8mb3_bin
。
注意
在严格模式下,MySQL 不允许将 0000-00-00
作为有效日期。然后,在某些情况下(某些 Airflow 表使用 0000-00-00 00:00:00
作为时间戳字段默认值),您可能会收到类似 “'end_date' 的默认值无效”
的错误。为了避免此错误,您可以在 MySQL 服务器上禁用 NO_ZERO_DATE
模式。阅读 https://stackoverflow.com/questions/9192027/invalid-default-value-for-create-date-timestamp-field 了解如何禁用它。有关详细信息,请参阅 SQL 模式 - NO_ZERO_DATE。
MsSQL 数据库¶
警告
经过 讨论 和 投票,Airflow 的 PMC 和提交者已达成决议,不再维护 MsSQL 作为受支持的数据库后端。
从 Airflow 2.9.0 开始,Airflow 数据库后端不再支持 MsSQL。这不会影响现有的提供程序包(运算符和钩子),DAG 仍然可以访问和处理来自 MsSQL 的数据。
从 MsSQL Server 迁移¶
由于 Airflow 2.9.0 已停止支持 MSSQL,因此可以使用迁移脚本帮助从 Airflow 版本 2.7.x 或 2.8.x 迁移到 SQL-Server。迁移脚本可在 Github 上的 airflow-mssql-migration 存储库 中找到。
请注意,迁移脚本不提供支持和保修。
其他配置选项¶
还有更多用于配置 SQLAlchemy 行为的配置选项。有关详细信息,请参阅 [database]
部分中 sqlalchemy_*
选项的 参考文档。
例如,您可以指定一个数据库架构,Airflow 将在其中创建其所需的表。如果希望 Airflow 在 PostgreSQL 数据库的 airflow
架构中安装其表,请指定以下环境变量
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN="postgresql://postgres@localhost:5432/my_database?options=-csearch_path%3Dairflow"
export AIRFLOW__DATABASE__SQL_ALCHEMY_SCHEMA="airflow"
请注意 SQL_ALCHEMY_CONN
数据库 URL 末尾的 search_path
。
Airflow 中的数据库监控和维护¶
Airflow 广泛利用关系元数据数据库进行任务调度和执行。监控和正确配置此数据库对于实现最佳 Airflow 性能至关重要。
关键问题¶
性能影响:长时间或过多的查询会严重影响 Airflow 的功能。这些可能是由于工作流的特殊性、缺乏优化或代码错误造成的。
数据库统计信息:数据库引擎做出的不正确的优化决策(通常是由于数据统计信息过时)会导致性能下降。
职责¶
Airflow 环境中数据库监控和维护的职责因您是使用自管数据库和 Airflow 实例还是选择托管服务而异。
自管环境:
在数据库和 Airflow 都是自管的设置中,部署管理员负责设置、配置和维护数据库。这包括监控其性能、管理备份、定期清理以及确保其与 Airflow 的最佳运行。
托管服务:
托管数据库服务:使用托管数据库服务时,许多维护任务(如备份、修补和基本监控)都由提供商处理。但是,部署管理员仍然需要监督 Airflow 的配置并优化特定于其工作流的性能设置,管理定期清理并监控其数据库以确保与 Airflow 的最佳运行。
托管 Airflow 服务:使用托管 Airflow 服务时,这些服务提供商负责 Airflow 及其数据库的配置和维护。但是,部署管理员需要与服务配置协作,以确保规模和工作流要求与托管服务的规模和配置相匹配。
工具和策略¶
Airflow 不提供用于数据库监控的直接工具。
使用服务器端监控和日志记录来获取指标。
根据定义的阈值启用对长时间运行查询的跟踪。
定期运行维护任务(如
ANALYZE
SQL 命令)。
数据库清理工具¶
Airflow DB 清理命令:使用
airflow db clean
命令帮助管理和清理数据库。``airflow.utils.db_cleanup`` 中的 Python 方法:此模块提供了用于数据库清理和维护的其他 Python 方法,为特定需求提供了更精细的控制和定制。
建议¶
主动监控:在生产环境中实施监控和日志记录,而不会显著影响性能。
特定于数据库的指导:查阅所选数据库的文档,了解具体的监控设置说明。
托管数据库服务:检查您的数据库提供商是否提供自动维护任务。
SQLAlchemy 日志记录¶
要进行详细的查询分析,请启用 SQLAlchemy 客户端日志记录(在 SQLAlchemy 引擎配置中使用 echo=True
)。
此方法侵入性更强,可能会影响 Airflow 的客户端性能。
它会生成大量日志,尤其是在繁忙的 Airflow 环境中。
适用于非生产环境,例如测试系统。
您可以按照 SQLAlchemy 日志记录文档 中的说明,使用 echo=True
作为 sqlalchemy 引擎配置来实现。
使用 sql_alchemy_engine_args 配置参数将 echo 参数设置为 True。
注意¶
启用大量日志记录时,请注意对 Airflow 性能和系统资源的影响。
对于生产环境,优先选择服务器端监控而不是客户端日志记录,以最大程度地减少对性能的干扰。