Airflow Summit 2025 即将于 10 月 07-09 日举行。立即注册以获得早鸟票!

模块管理

Airflow 允许您在 DAG 和 Airflow 配置中使用您自己的 Python 模块。 下面的文章将描述如何创建您自己的模块,以便 Airflow 可以正确加载它,以及在模块未正确加载时诊断问题。

通常,您希望在 Airflow 部署中使用您自己的 python 代码,例如通用代码、库,您可能希望使用共享 python 代码生成 dag,并拥有多个 DAG python 文件。

您可以通过以下方式之一来完成它

  • 将您的模块添加到 Airflow 自动添加到 PYTHONPATH 的文件夹之一

  • 将您保存代码的额外文件夹添加到 PYTHONPATH

  • 将您的代码打包到 Python 包中,并与 Airflow 一起安装。

下一章将概述 Python 如何加载包和模块,并深入探讨上述三种可能性的具体细节。

Python 中包/模块加载的工作原理

Python 尝试从中加载模块的目录列表由变量 sys.path 给出。 Python 确实尝试 智能地确定 此变量的内容,具体取决于操作系统以及 Python 的安装方式和使用的 Python 版本。

您可以通过运行交互式终端来检查当前 Python 环境中此变量的内容,如下例所示

>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/.pyenv/versions/3.9.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.9.4/lib/python3.9',
 '/home/arch/.pyenv/versions/3.9.4/lib/python3.9/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.9/site-packages']

sys.path 在程序启动期间初始化。 第一个优先级被赋予当前目录,即 path[0] 是包含用于调用的当前脚本的目录,或者如果它是交互式 shell,则为空字符串。 第二个优先级被赋予 PYTHONPATH(如果已提供),然后是由 site 模块管理的、依赖于安装的默认路径。

也可以在 Python 会话期间通过简单地使用 append 来修改 sys.path(例如,sys.path.append("/path/to/custom/package"))。 Python 将在添加新路径后开始在较新的路径中搜索包。 Airflow 利用此功能,如 将目录添加到 PYTHONPATH 一节中所述。

在变量 sys.path 中,有一个目录 site-packages,其中包含已安装的 **外部包**,这意味着您可以使用 pipanaconda 安装包,并且可以在 Airflow 中使用它们。 在下一节中,您将学习如何创建自己的简单可安装包,以及如何使用环境变量 PYTHONPATH 指定要添加到 sys.path 的其他目录。

另请确保 将 init 文件添加到您的文件夹

包的典型结构

这是您在 dags 文件夹中可能拥有的示例结构

<DIRECTORY ON PYTHONPATH>
| .airflowignore  -- only needed in ``dags`` folder, see below
| -- my_company
              | __init__.py
              | common_package
              |              |  __init__.py
              |              | common_module.py
              |              | subpackage
              |                         | __init__.py
              |                         | subpackaged_util_module.py
              |
              | my_custom_dags
                              | __init__.py
                              | my_dag1.py
                              | my_dag2.py
                              | base_dag.py

在上面的例子中,这些是您可以导入 python 文件的方式

from my_company.common_package.common_module import SomeClass
from my_company.common_package.subpackage.subpackaged_util_module import AnotherClass
from my_company.my_custom_dags.base_dag import BaseDag

您可以在文件夹的根目录中看到 .airflowignore 文件。 这是一个您可以放在 dags 文件夹中的文件,用于告诉 Airflow 在 Airflow 调度程序查找 dags 时应忽略文件夹中的哪些文件。 它应包含正则表达式(默认)或 glob 表达式,用于要忽略的路径。 您不需要在 PYTHONPATH 中的任何其他文件夹中拥有该文件(并且您只能将共享代码保留在其他文件夹中,而不是实际的 dags)。

在上面的例子中,dags 仅位于 my_custom_dags 文件夹中,当调度程序搜索 DAGS 时,不应扫描 common_package,因此我们应该忽略 common_package 文件夹。 如果您在此处保留一个基本 DAG,my_dag1.pymy_dag2.py 派生自该基本 DAG,您还需要忽略 base_dag.py。 您的 .airflowignore 应如下所示(使用默认的 glob 语法)

my_company/common_package/
my_company/my_custom_dags/base_dag.py

Airflow 中内置的 PYTHONPATH 条目

Airflow 在动态运行时,会将三个目录添加到 sys.path

  • dags 文件夹:它在 [core] 部分使用选项 dags_folder 配置。

  • config 文件夹:它通过设置 AIRFLOW_HOME 变量(默认为 {AIRFLOW_HOME}/config)进行配置。

  • plugins 文件夹:它在 [core] 部分使用选项 plugins_folder 配置。

注意

Airflow 2 中的 DAGS 文件夹不应与 webserver 共享。 虽然您可以这样做,但与 Airflow 1.10 不同,Airflow 并不期望 webserver 中存在 DAGS 文件夹。 实际上,与 webserver 共享 dags 文件夹存在一定的安全风险,因为这意味着编写 DAGS 的人可以编写 webserver 能够执行的代码(理想情况下,webserver 永远不应运行可以由编写 dags 的用户修改的代码)。 因此,如果您需要与 webserver 共享一些代码,强烈建议您通过 configplugins 文件夹或通过已安装的 Airflow 包(见下文)共享它。 这些文件夹通常由与 DAG 文件夹(通常是数据科学家)不同的用户(管理员/DevOps)管理和访问,因此它们被认为是安全的,因为它们是 Airflow 安装配置的一部分,并且由管理安装的人员控制。

代码命名的最佳实践

导入代码时,需要注意一些陷阱。

有时,您可能会看到 Airflow 或您使用的其他库代码引发异常 module 'X' has no attribute 'Y'。 这通常是由于您在顶级 PYTHONPATH 中有一个名为“X”的模块或包,并且它被导入而不是原始代码期望的模块。

您应该始终为您的包和模块使用唯一的名称,并且有多种方法可以确保强制执行唯一性,如下所述。

使用唯一的顶级包名

最重要的是,避免对直接添加到 PYTHONPATH 顶层的任何内容使用通用名称。 例如,如果您将包含 __init__.pyairflow 文件夹添加到您的 DAGS_FOLDER,它将与 Airflow 包冲突,您将无法从 Airflow 包中导入任何内容。 类似地,不要直接在此处添加 airflow.py 文件。 也不应将标准库包使用的通用名称(例如 multiprocessinglogging 等)用作顶级 - 无论是作为包(即包含 __init__.py 的文件夹)还是作为模块(即 .py 文件)。

这同样适用于 configplugins 文件夹,它们也位于 PYTHONPATH,以及您手动添加到 PYTHONPATH 的任何内容(请参阅以下章节中的详细信息)。

建议您始终将 DAG 和通用文件放在特定于您部署的子包中(以下示例中的 my_company)。 为文件夹使用通用名称很容易,这会与系统中已存在的其他包冲突。 例如,如果您创建 airflow/operators 子文件夹,则它将无法访问,因为 Airflow 已经有一个名为 airflow.operators 的包,并且在导入 from airflow.operators 时它会在此处查找。

不要使用相对导入

永远不要使用 Python 3 中添加的相对导入(以 . 开头)。

my_dag1.py 中这样做是很诱人的。

from .base_dag import BaseDag  # NEVER DO THAT!!!!

您应该使用完整路径(从添加到 PYTHONPATH 的目录开始)导入此类共享 DAG。

from my_company.my_custom_dags.base_dag import BaseDag  # This is cool

相对导入是违反直觉的,并且取决于您启动 Python 代码的方式,它们的行为可能会有所不同。 在 Airflow 中,同一个 DAG 文件可能会在不同的上下文中被解析(由调度程序、worker 或在测试期间),在这些情况下,相对导入的行为可能会有所不同。 在 Airflow DAG 中导入任何内容时,始终使用完整的 Python 包路径,这将为您节省很多麻烦。 您可以在 此 Stack Overflow 线程 中阅读更多关于相对导入注意事项的信息。

在包文件夹中添加 __init__.py

创建文件夹时,应将 __init__.py 文件作为空文件添加到文件夹中。 虽然在 Python 3 中有一个隐式命名空间的概念,您不必将这些文件添加到文件夹中,但 Airflow 希望将这些文件添加到您添加的所有包中。

检查您的 PYTHONPATH 加载配置

您还可以使用 airflow info 命令查看确切的路径,并像使用环境变量 PYTHONPATH 指定的目录一样使用它们。 此命令指定的 sys.path 变量的内容示例如下

Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/python3.9:/usr/lib/python3.9/lib-dynload:/home/rootcss/venvs/airflow/lib/python3.9/site-packages:/home/rootcss/airflow/dags:/home/rootcss/airflow/config:/home/rootcss/airflow/plugins]

以下是 airflow info 命令的示例输出

Apache Airflow: 2.0.0b3

System info
OS              | Linux
architecture    | x86_64
uname           | uname_result(system='Linux', node='85cd7ab7018e', release='4.19.76-linuxkit', version='#1 SMP Tue May 26 11:42:35 UTC 2020', machine='x86_64', processor='')
locale          | ('en_US', 'UTF-8')
python_version  | 3.9.6 (default, Nov 25 2020, 02:47:44)  [GCC 8.3.0]
python_location | /usr/local/bin/python

Tools info
git             | git version 2.20.1
ssh             | OpenSSH_7.9p1 Debian-10+deb10u2, OpenSSL 1.1.1d  10 Sep 2019
kubectl         | NOT AVAILABLE
gcloud          | NOT AVAILABLE
cloud_sql_proxy | NOT AVAILABLE
mysql           | mysql  Ver 8.0.22 for Linux on x86_64 (MySQL Community Server - GPL)
sqlite3         | 3.27.2 2019-02-25 16:06:06 bd49a8271d650fa89e446b42e513b595a717b9212c91dd384aab871fc1d0alt1
psql            | psql (PostgreSQL) 11.9 (Debian 11.9-0+deb10u1)

Paths info
airflow_home    | /root/airflow
system_path     | /usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
python_path     | /usr/local/bin:/opt/airflow:/files/plugins:/usr/local/lib/python38.zip:/usr/local/lib/python3.9:/usr/
                | local/lib/python3.9/lib-dynload:/usr/local/lib/python3.9/site-packages:/files/dags:/root/airflow/conf
                | ig:/root/airflow/plugins
airflow_on_path | True

Config info
executor             | LocalExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn     | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder          | /files/dags
plugins_folder       | /root/airflow/plugins
base_log_folder      | /root/airflow/logs

Providers info
apache-airflow-providers-amazon           | 1.0.0b2
apache-airflow-providers-apache-cassandra | 1.0.0b2
apache-airflow-providers-apache-druid     | 1.0.0b2
apache-airflow-providers-apache-hdfs      | 1.0.0b2
apache-airflow-providers-apache-hive      | 1.0.0b2

将目录添加到 PYTHONPATH

您可以使用环境变量 PYTHONPATH 指定要添加到 sys.path 的其他目录。 通过使用以下命令提供项目根目录的路径来启动 python shell

PYTHONPATH=/home/arch/projects/airflow_operators python

sys.path 变量将如下所示

>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
 '/home/arch/projects/airflow_operators'
 '/home/arch/.pyenv/versions/3.9.4/lib/python37.zip',
 '/home/arch/.pyenv/versions/3.9.4/lib/python3.9',
 '/home/arch/.pyenv/versions/3.9.4/lib/python3.9/lib-dynload',
 '/home/arch/venvs/airflow/lib/python3.9/site-packages']

我们可以看到我们提供的目录现在已添加到路径中,让我们尝试现在导入包

>>> import airflow_operators
Hello from airflow_operators
>>>

我们还可以将 PYTHONPATH 变量与 airflow 命令一起使用。 例如,如果我们运行以下 Airflow 命令

PYTHONPATH=/home/arch/projects/airflow_operators airflow info

我们将看到 Python PATH 使用我们提到的 PYTHONPATH 值进行更新,如下所示

Python PATH: [/home/arch/venv/bin:/home/arch/projects/airflow_operators:/usr/lib/python38.zip:/usr/lib/python3.9:/usr/lib/python3.9/lib-dynload:/home/arch/venv/lib/python3.9/site-packages:/home/arch/airflow/dags:/home/arch/airflow/config:/home/arch/airflow/plugins]

在 Python 中创建包

这是添加自定义代码的最有组织的方式。 感谢使用包,您可以组织您的版本控制方法,控制安装共享代码的哪些版本,并通过受控的方式将代码部署到您的所有实例和容器 - 全部由系统管理员/DevOps 而不是 DAG 编写者完成。 当您有一个单独的团队管理此共享代码时,它通常是合适的,但是如果您了解您的 Python 方式,您也可以在较小的部署中以这种方式分发您的代码。 您还可以将您的 插件Providers 作为 Python 包安装,因此学习如何构建您的包是很有用的。

以下是如何创建您的包

1. 在开始之前,选择并安装您将使用的构建/打包工具,理想情况下它应该符合 PEP-621,以便能够轻松切换到其他工具。 流行的选择是 setuptools、poetry、hatch、flit。

  1. 决定何时创建您自己的包。 创建包目录 - 在我们的例子中,我们将其称为 airflow_operators

mkdir airflow_operators
  1. 在包内创建文件 __init__.py 并添加以下代码

print("Hello from airflow_operators")

当我们导入这个包时,它应该打印上面的消息。

4. 创建 pyproject.toml 并使用您选择的构建工具配置填充它。 请参阅 pyproject.toml 规范

  1. 使用您选择的工具构建您的项目。 例如,对于 hatch,它可以是

hatch build -t wheel

这将在您的 dist 文件夹中创建 .whl 文件

  1. 使用 pip 安装 .whl 文件

pip install dist/airflow_operators-0.0.0-py3-none-any.whl
  1. 该包现在可以使用了!

>>> import airflow_operators
Hello from airflow_operators
>>>

可以使用 pip 命令删除该包

pip uninstall airflow_operators

有关如何创建和发布 Python 包的更多详细信息,请参阅 打包 Python 项目

此条目对您有帮助吗?