Airflow 峰会 2025 将于 10 月 07-09 日举行。立即注册享受早鸟票优惠!

调试 Airflow DAGs

使用 dag.test() 测试 DAGs

要在 IDE 中调试 DAGs,您可以设置 dag.test 命令到您的 DAG 文件中,并在一个序列化的 Python 进程中运行您的 DAG。

这种方法可以与任何受支持的数据库(包括本地 SQLite 数据库)一起使用,并且会 快速失败,因为所有任务都在一个进程中运行。

要设置 dag.test,将这两行添加到您的 DAG 文件底部

if __name__ == "__main__":
    dag.test()

就是这样!您可以添加可选参数来微调测试,否则您可以根据需要运行或调试 DAGs。以下是一些参数示例

  • execution_date 如果您想测试特定参数的 DAG 运行

  • use_executor 如果您想使用 executor 测试 DAG。默认情况下,dag.test 在没有 executor 的情况下运行 DAG,它只在本地运行所有任务。通过提供此参数,DAG 将使用 Airflow 环境中配置的 executor 执行。

有条件地跳过任务

如果您不想在本地环境中执行某些任务子集(例如依赖检查 sensors 或清理步骤),您可以自动将它们标记为成功,方法是在 mark_success_pattern 参数中提供一个匹配它们 task_id 的模式。

在以下示例中,测试该 DAG 不会等待任何上游 DAG 完成。而是手动注入测试数据。清理步骤也被跳过,使得中间生成的 CSV 可供检查。

with DAG("example_dag", default_args=default_args) as dag:
    sensor = ExternalTaskSensor(task_id="wait_for_ingestion_dag", external_dag_id="ingest_raw_data")
    sensor2 = ExternalTaskSensor(task_id="wait_for_dim_dag", external_dag_id="ingest_dim")
    collect_stats = PythonOperator(task_id="extract_stats_csv", python_callable=extract_stats_csv)
    # ... run other tasks
    cleanup = PythonOperator(task_id="cleanup", python_callable=Path.unlink, op_args=[collect_stats.output])

    [sensor, sensor2] >> collect_stats >> cleanup

if __name__ == "__main__":
    ingest_testing_data()
    run = dag.test(mark_success_pattern="wait_for_.*|cleanup")
    print(f"Intermediate csv: {run.get_task_instance('collect_stats').xcom_pull(task_id='collect_stats')}")

在命令行中调试 Airflow DAGs

通过与上面提到的相同的两行添加,您现在也可以轻松地使用 pdb 调试 DAG。运行 python -m pdb <path to dag file>.py 以获得命令行上的交互式调试体验。

root@ef2c84ad4856:/opt/airflow# python -m pdb airflow/example_dags/example_bash_operator.py
> /opt/airflow/airflow/example_dags/example_bash_operator.py(18)<module>()
-> """Example DAG demonstrating the usage of the BashOperator."""
(Pdb) b 45
Breakpoint 1 at /opt/airflow/airflow/example_dags/example_bash_operator.py:45
(Pdb) c
> /opt/airflow/airflow/example_dags/example_bash_operator.py(45)<module>()
-> bash_command='echo 1',
(Pdb) run_this_last
<Task(EmptyOperator): run_this_last>

IDE 设置步骤

  1. 添加 main 块到您的 DAG 文件末尾,使其可运行。

if __name__ == "__main__":
    dag.test()
  1. 运行 / 调试 DAG 文件。

此条目有帮助吗?