模块管理¶
Airflow 允许你在 DAG 和 Airflow 配置中使用你自己的 Python 模块。以下文章将描述如何创建自己的模块,以便 Airflow 可以正确加载它,以及在模块未正确加载时诊断问题。
通常,您希望在 Airflow 部署中使用自己的 Python 代码,例如通用代码、库,您可能希望使用共享 Python 代码生成 DAG,并拥有多个 DAG Python 文件。
您可以通过以下方式之一进行操作
将您的模块添加到 Airflow 自动添加到
PYTHONPATH
的文件夹之一将您保留代码的额外文件夹添加到
PYTHONPATH
将您的代码打包到 Python 包中,并与 Airflow 一起安装。
下一章对 Python 如何加载包和模块进行了总体描述,并深入探讨了上述三个可能性的具体内容。
Python 中的包/模块加载工作原理¶
Python 尝试从中加载模块的目录列表由变量 sys.path
给出。Python 确实会尝试 智能地确定此变量的内容,具体取决于操作系统以及如何安装 Python 和使用的 Python 版本。
您可以通过运行交互式终端(如下例所示)来检查当前 Python 环境中此变量的内容
>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
'/home/arch/.pyenv/versions/3.8.4/lib/python37.zip',
'/home/arch/.pyenv/versions/3.8.4/lib/python3.8',
'/home/arch/.pyenv/versions/3.8.4/lib/python3.8/lib-dynload',
'/home/arch/venvs/airflow/lib/python3.8/site-packages']
sys.path
在程序启动期间初始化。第一个优先级是给当前目录,即 path[0]
是包含用于调用或在它是交互式 shell 时为空字符串的当前脚本的目录。第二个优先级是(如果提供)给 PYTHONPATH
,然后是 site 模块管理的安装相关默认路径。
您还可以通过简单地使用追加(例如,sys.path.append("/path/to/custom/package")
)在 Python 会话期间修改 sys.path
。一旦添加了较新的路径,Python 将开始在其中搜索包。Airflow 利用了此功能,如 将目录添加到 PYTHONPATH 一节中所述。
在变量 sys.path
中有一个目录 site-packages
,其中包含已安装的外部包,这意味着你可以使用 pip
或 anaconda
安装包,并且可以在 Airflow 中使用它们。在下一部分中,你将学习如何创建自己的简单可安装包,以及如何使用环境变量 PYTHONPATH
指定要添加到 sys.path
的其他目录。
还要确保 向你的文件夹中添加 init 文件。
包的典型结构¶
这是你可能在 dags
文件夹中拥有的一个示例结构
<DIRECTORY ON PYTHONPATH>
| .airflowignore -- only needed in ``dags`` folder, see below
| -- my_company
| __init__.py
| common_package
| | __init__.py
| | common_module.py
| | subpackage
| | __init__.py
| | subpackaged_util_module.py
|
| my_custom_dags
| __init__.py
| my_dag1.py
| my_dag2.py
| base_dag.py
在上述情况下,这些是你导入 python 文件的方法
from my_company.common_package.common_module import SomeClass
from my_company.common_package.subpackage.subpackaged_util_module import AnotherClass
from my_company.my_custom_dags.base_dag import BaseDag
你可以在文件夹的根目录中看到 .airflowignore
文件。这是一个你可以放在 dags
文件夹中的文件,用于告诉 Airflow 当 Airflow 调度程序查找 DAG 时应忽略文件夹中的哪些文件。它应包含正则表达式(默认)或应忽略的路径的 glob 表达式。你无需在 PYTHONPATH
中的任何其他文件夹中拥有该文件(并且你只能在其他文件夹中保留共享代码,而不是实际的 DAG)。
在上面的示例中,DAG 仅位于 my_custom_dags
文件夹中,调度程序在搜索 DAG 时不应扫描 common_package
,因此我们应该忽略 common_package
文件夹。如果你在那里保留一个基础 DAG,并且 my_dag1.py
和 my_dag2.py
从该基础 DAG 派生,那么你还要忽略 base_dag.py
。你的 .airflowignore
应如下所示
my_company/common_package/.*
my_company/my_custom_dags/base_dag\.py
如果 DAG_IGNORE_FILE_SYNTAX
设置为 glob
,则等效的 .airflowignore
文件将是
my_company/common_package/
my_company/my_custom_dags/base_dag.py
Airflow 中的内置 PYTHONPATH
条目¶
Airflow 在动态运行时会向 sys.path
添加三个目录
dags
文件夹:它通过[core]
部分中的dags_folder
选项进行配置。config
文件夹:它通过设置AIRFLOW_HOME
变量(默认情况下为{AIRFLOW_HOME}/config
)进行配置。plugins
文件夹:它通过[core]
部分中的plugins_folder
选项进行配置。
注意
Airflow 2 中的 DAGS 文件夹不应与 Web 服务器共享。虽然你可以这样做,但与 Airflow 1.10 不同,Airflow 并不期望 Web 服务器中存在 DAGS 文件夹。事实上,将 dags
文件夹与 Web 服务器共享会带来一定的安全风险,因为这意味着编写 DAGS 的人员可以编写 Web 服务器能够执行的代码(理想情况下,Web 服务器永远不应运行可由编写 DAG 的用户修改的代码)。因此,如果你需要与 Web 服务器共享一些代码,强烈建议你通过 config
或 plugins
文件夹或通过已安装的 Airflow 软件包(见下文)进行共享。这些文件夹通常由与 DAG 文件夹不同的用户(管理员/DevOps)管理和访问(这些文件夹通常是数据科学家),因此它们被认为是安全的,因为它们是 Airflow 安装配置的一部分,并由管理安装的人员控制。
代码命名的最佳实践¶
在你导入代码时,需要注意一些需要注意的陷阱。
有时,您可能会看到 Airflow 或您使用的其他库代码引发的异常,例如 module 'X' has no attribute 'Y'
。这通常是由于您在 PYTHONPATH
的顶层有一个名为“X”的模块或包,并且导入了该模块或包,而不是原始代码预期的模块。
您应始终为您的包和模块使用唯一名称,并且有方法可以确保强制执行唯一性,如下所述。
使用唯一的顶级包名称¶
最重要的是,避免对直接添加到 PYTHONPATH
顶层的任何内容使用通用名称。例如,如果您将包含 __init__.py
的 airflow
文件夹添加到您的 DAGS_FOLDER
,它将与 Airflow 包冲突,您将无法从 Airflow 包中导入任何内容。同样,不要直接在那里添加 airflow.py
文件。此外,标准库包使用的通用名称,例如 multiprocessing
或 logging
等,不应作为顶级使用 - 无论是作为包(即包含 __init__.py
的文件夹)还是作为模块(即 .py
文件)。
这同样适用于 config
和 plugins
文件夹,它们也在 PYTHONPATH
中,以及您手动添加到 PYTHONPATH
中的任何内容(请参阅以下章节中的详细信息)。
建议您始终将 DAG/常用文件放置在对您的部署来说唯一的子包中(如下例中的 my_company
)。使用通用名称来命名文件夹很容易与系统中已有的其他包发生冲突。例如,如果您创建 airflow/operators
子文件夹,则无法访问它,因为 Airflow 已有一个名为 airflow.operators
的包,并且在导入 from airflow.operators
时它会查找该包。
不要使用相对导入¶
切勿使用 Python 3 中添加的相对导入(以 .
开头)。
在 my_dag1.py
中执行类似操作很诱人
from .base_dag import BaseDag # NEVER DO THAT!!!!
您应该使用完整路径(从添加到 PYTHONPATH
的目录开始)导入此类共享 DAG
from my_company.my_custom_dags.base_dag import BaseDag # This is cool
相对导入违反直觉,并且根据您启动 Python 代码的方式,它们的行为可能有所不同。在 Airflow 中,同一 DAG 文件可能会在不同的上下文中进行解析(由调度程序、工作进程或在测试期间),并且在这些情况下,相对导入的行为可能有所不同。在 Airflow DAG 中导入任何内容时,请始终使用完整的 Python 包路径,这将为您省去很多麻烦。您可以在 此 Stack Overflow 线程 中阅读有关相对导入注意事项的更多信息。
在包文件夹中添加 __init__.py
¶
创建文件夹时,您应该在文件夹中添加 __init__.py
文件作为空文件。虽然 Python 3 中有一个隐式命名空间的概念,您不必将这些文件添加到文件夹,但 Airflow 期望将文件添加到您添加的所有包。
检查您的 PYTHONPATH
加载配置¶
您还可以使用 airflow info
命令查看确切路径,并使用它们类似于使用环境变量 PYTHONPATH
指定的目录。此命令指定的 sys.path 变量的内容示例可能如下
Python PATH: [/home/rootcss/venvs/airflow/bin:/usr/lib/python38.zip:/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/home/rootcss/venvs/airflow/lib/python3.8/site-packages:/home/rootcss/airflow/dags:/home/rootcss/airflow/config:/home/rootcss/airflow/plugins]
以下是 airflow info
命令的示例输出
另请参阅
Apache Airflow: 2.0.0b3
System info
OS | Linux
architecture | x86_64
uname | uname_result(system='Linux', node='85cd7ab7018e', release='4.19.76-linuxkit', version='#1 SMP Tue May 26 11:42:35 UTC 2020', machine='x86_64', processor='')
locale | ('en_US', 'UTF-8')
python_version | 3.8.6 (default, Nov 25 2020, 02:47:44) [GCC 8.3.0]
python_location | /usr/local/bin/python
Tools info
git | git version 2.20.1
ssh | OpenSSH_7.9p1 Debian-10+deb10u2, OpenSSL 1.1.1d 10 Sep 2019
kubectl | NOT AVAILABLE
gcloud | NOT AVAILABLE
cloud_sql_proxy | NOT AVAILABLE
mysql | mysql Ver 8.0.22 for Linux on x86_64 (MySQL Community Server - GPL)
sqlite3 | 3.27.2 2019-02-25 16:06:06 bd49a8271d650fa89e446b42e513b595a717b9212c91dd384aab871fc1d0alt1
psql | psql (PostgreSQL) 11.9 (Debian 11.9-0+deb10u1)
Paths info
airflow_home | /root/airflow
system_path | /usr/local/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
python_path | /usr/local/bin:/opt/airflow:/files/plugins:/usr/local/lib/python38.zip:/usr/local/lib/python3.8:/usr/
| local/lib/python3.8/lib-dynload:/usr/local/lib/python3.8/site-packages:/files/dags:/root/airflow/conf
| ig:/root/airflow/plugins
airflow_on_path | True
Config info
executor | LocalExecutor
task_logging_handler | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder | /files/dags
plugins_folder | /root/airflow/plugins
base_log_folder | /root/airflow/logs
Providers info
apache-airflow-providers-amazon | 1.0.0b2
apache-airflow-providers-apache-cassandra | 1.0.0b2
apache-airflow-providers-apache-druid | 1.0.0b2
apache-airflow-providers-apache-hdfs | 1.0.0b2
apache-airflow-providers-apache-hive | 1.0.0b2
将目录添加到 PYTHONPATH
¶
您可以使用环境变量 PYTHONPATH
指定要添加到 sys.path
的其他目录。使用以下命令提供项目的根路径来启动 Python shell
PYTHONPATH=/home/arch/projects/airflow_operators python
sys.path
变量将如下所示
>>> import sys
>>> from pprint import pprint
>>> pprint(sys.path)
['',
'/home/arch/projects/airflow_operators'
'/home/arch/.pyenv/versions/3.8.4/lib/python37.zip',
'/home/arch/.pyenv/versions/3.8.4/lib/python3.8',
'/home/arch/.pyenv/versions/3.8.4/lib/python3.8/lib-dynload',
'/home/arch/venvs/airflow/lib/python3.8/site-packages']
正如我们所看到的,我们提供的目录现在已添加到路径中,我们现在尝试导入包
>>> import airflow_operators
Hello from airflow_operators
>>>
我们还可以将 PYTHONPATH
变量与 airflow 命令一起使用。例如,如果我们运行以下 airflow 命令
PYTHONPATH=/home/arch/projects/airflow_operators airflow info
我们将看到 Python PATH
使用我们提到的 PYTHONPATH
值进行更新,如下所示
Python PATH: [/home/arch/venv/bin:/home/arch/projects/airflow_operators:/usr/lib/python38.zip:/usr/lib/python3.8:/usr/lib/python3.8/lib-dynload:/home/arch/venv/lib/python3.8/site-packages:/home/arch/airflow/dags:/home/arch/airflow/config:/home/arch/airflow/plugins]
在 Python 中创建包¶
这是添加自定义代码的最有组织的方式。由于使用了包,您可以组织您的版本控制方法,控制已安装的共享代码版本,并以受控方式将代码部署到所有实例和容器中 - 所有这些都由系统管理员/DevOps 而不是 DAG 编写者完成。当您有一个单独的团队来管理此共享代码时,这通常是合适的,但如果您了解您的 python 方式,您也可以在较小的部署中以这种方式分发您的代码。您还可以安装您的 插件 和 提供程序包 作为 python 包,因此学习如何构建您的包非常方便。
以下是创建包的方法
1. 在开始之前,选择并安装您将使用的构建/打包工具,理想情况下它应该是 PEP-621 兼容的,以便能够轻松切换到不同的工具。流行的选择有 setuptools、poetry、hatch、flit。
决定何时创建自己的包。创建包目录 - 在我们的例子中,我们将称其为
airflow_operators
。
mkdir airflow_operators
在包中创建文件
__init__.py
并添加以下代码
print("Hello from airflow_operators")
当我们导入这个包时,它应该打印上述消息。
4. 创建 pyproject.toml
并用您选择的构建工具配置填充它,请参阅 The pyproject.toml specification
使用您选择的工具构建您的项目。例如,对于 hatch,它可以是
hatch build -t wheel
这将在您的 dist
文件夹中创建 .whl 文件
使用 pip 安装 .whl 文件
pip install dist/airflow_operators-0.0.0-py3-none-any.whl
该包现在可以使用了!
>>> import airflow_operators
Hello from airflow_operators
>>>
可以使用 pip 命令删除该包
pip uninstall airflow_operators
有关如何创建和发布 python 包的更多详细信息,请参阅 Packaging Python Projects。