调试 Airflow DAG¶
使用 dag.test() 测试 DAG¶
要在 IDE 中调试 DAG,可以在 dag 文件中设置 dag.test
命令,并通过单个序列化 Python 进程运行 DAG。
此方法可与任何受支持的数据库(包括本地 SQLite 数据库)一起使用,并且会快速失败,因为所有任务都在单个进程中运行。
要设置 dag.test
,请将这两行添加到 dag 文件的底部
if __name__ == "__main__":
dag.test()
就是这样!你可以添加参数,例如 execution_date
,如果你想测试特定参数的 DAG 运行,但否则你可以根据需要运行或调试 DAG。
与 DebugExecutor 的比较¶
与现已弃用的 DebugExecutor
类相比,dag.test
命令具有以下优点
它根本不需要运行执行器。任务一次运行一个,没有执行器或调度器日志。
它比使用 DebugExecutor 运行代码快得多,因为它不需要经过调度器循环。
它不执行回填。
在命令行上调试 Airflow DAG¶
使用与上述部分中提到的相同的两行代码,您现在还可以使用 pdb 轻松调试 DAG。运行 python -m pdb <path to dag file>.py
以在命令行上获得交互式调试体验。
root@ef2c84ad4856:/opt/airflow# python -m pdb airflow/example_dags/example_bash_operator.py
> /opt/airflow/airflow/example_dags/example_bash_operator.py(18)<module>()
-> """Example DAG demonstrating the usage of the BashOperator."""
(Pdb) b 45
Breakpoint 1 at /opt/airflow/airflow/example_dags/example_bash_operator.py:45
(Pdb) c
> /opt/airflow/airflow/example_dags/example_bash_operator.py(45)<module>()
-> bash_command='echo 1',
(Pdb) run_this_last
<Task(EmptyOperator): run_this_last>
Debug Executor(已弃用)¶
旨在作为调试工具,可以从 IDE 使用。它是一个单进程执行器,对 DebugExecutor
TaskInstance
排队并通过运行 _run_raw_task
方法执行它们。
由于其性质,执行器可以与 SQLite 数据库一起使用。当与传感器一起使用时,执行器会将传感器模式更改为 reschedule
以避免阻止 DAG 的执行。
此外,
可以以快速失败模式使用,这将使所有其他正在运行或已调度的任务立即失败。要启用此选项,请设置 DebugExecutor
AIRFLOW__DEBUG__FAIL_FAST=True
或调整 airflow.cfg
中的 fail_fast
选项。有关设置配置的更多信息,请参阅 设置配置选项。
IDE 设置步骤
在 DAG 文件的末尾添加
main
块,以使其可运行。
它将运行回填作业
if __name__ == "__main__":
from airflow.utils.state import State
dag.clear()
dag.run()
在 IDE 的运行配置中设置
AIRFLOW__CORE__EXECUTOR=DebugExecutor
。在此步骤中,你还应设置 DAG 所需的所有环境变量。运行/调试 DAG 文件。