调试 Airflow DAGs¶
使用 dag.test() 测试 DAG¶
要在 IDE 中调试 DAG,您可以在您的 DAG 文件中设置 dag.test
命令,并在单个序列化的 Python 进程中运行您的 DAG。
此方法可以与任何受支持的数据库(包括本地 SQLite 数据库)一起使用,并且由于所有任务都在单个进程中运行,因此会快速失败。
要设置 dag.test
,请将以下两行添加到您的 DAG 文件底部
if __name__ == "__main__":
dag.test()
就是这样!您可以添加可选参数来微调测试,否则您可以根据需要运行或调试 DAG。以下是一些参数示例
execution_date
如果您想测试特定参数的 DAG 运行use_executor
如果您想使用执行器测试 DAG。默认情况下,dag.test
在没有执行器的情况下运行 DAG,它只是在本地运行所有任务。通过提供此参数,DAG 将使用 Airflow 环境中配置的执行器执行。
有条件地跳过任务¶
如果您不希望在本地环境中执行某些任务子集(例如,依赖检查传感器或清理步骤),您可以提供在 mark_success_pattern
参数中与其 task_id
匹配的模式来自动将它们标记为成功。
在以下示例中,测试 DAG 不会等待任何上游 DAG 完成。相反,测试数据是手动提取的。清理步骤也被跳过,使得中间 CSV 文件可用于检查。
with DAG("example_dag", default_args=default_args) as dag:
sensor = ExternalTaskSensor(task_id="wait_for_ingestion_dag", external_dag_id="ingest_raw_data")
sensor2 = ExternalTaskSensor(task_id="wait_for_dim_dag", external_dag_id="ingest_dim")
collect_stats = PythonOperator(task_id="extract_stats_csv", python_callable=extract_stats_csv)
# ... run other tasks
cleanup = PythonOperator(task_id="cleanup", python_callable=Path.unlink, op_args=[collect_stats.output])
[sensor, sensor2] >> collect_stats >> cleanup
if __name__ == "__main__":
ingest_testing_data()
run = dag.test(mark_success_pattern="wait_for_.*|cleanup")
print(f"Intermediate csv: {run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}")
与 DebugExecutor 的比较¶
与现在已弃用的 DebugExecutor
类相比,dag.test
命令具有以下优点
它根本不需要运行执行器。任务一次运行一个,没有执行器或调度器日志。
它比使用 DebugExecutor 运行代码快得多,因为它不需要经过调度器循环。
它不执行回填。
在命令行调试 Airflow DAGs¶
通过与上一节中提到的相同的两行添加,您现在也可以使用 pdb 轻松调试 DAG。运行 python -m pdb <dag 文件的路径>.py
,以便在命令行上获得交互式调试体验。
root@ef2c84ad4856:/opt/airflow# python -m pdb airflow/example_dags/example_bash_operator.py
> /opt/airflow/airflow/example_dags/example_bash_operator.py(18)<module>()
-> """Example DAG demonstrating the usage of the BashOperator."""
(Pdb) b 45
Breakpoint 1 at /opt/airflow/airflow/example_dags/example_bash_operator.py:45
(Pdb) c
> /opt/airflow/airflow/example_dags/example_bash_operator.py(45)<module>()
-> bash_command='echo 1',
(Pdb) run_this_last
<Task(EmptyOperator): run_this_last>
Debug Executor (已弃用)¶
DebugExecutor
旨在作为调试工具,可以在 IDE 中使用。它是一个单进程执行器,它将 TaskInstance
排队,并通过运行 _run_raw_task
方法执行它们。
由于其性质,该执行器可以与 SQLite 数据库一起使用。当与传感器一起使用时,执行器会将传感器模式更改为 reschedule
,以避免阻塞 DAG 的执行。
此外,DebugExecutor
可以在快速失败模式下使用,这将使所有其他正在运行或计划的任务立即失败。要启用此选项,请设置 AIRFLOW__DEBUG__FAIL_FAST=True
或在您的 airflow.cfg
中调整 fail_fast
选项。有关设置配置的更多信息,请参阅 设置配置选项。
IDE 设置步骤
在您的 DAG 文件末尾添加
main
代码块使其可运行。
它将运行回填作业
if __name__ == "__main__":
from airflow.utils.state import State
dag.clear()
dag.run()
在您的 IDE 的运行配置中设置
AIRFLOW__CORE__EXECUTOR=DebugExecutor
。在此步骤中,您还应该设置 DAG 所需的所有环境变量。运行/调试 DAG 文件。