DAG 文件处理¶
DAG 文件处理是指将 DAG 文件夹中包含的 Python 文件转换为包含要调度的任务的 DAG 对象的过程。
DAG 文件处理涉及两个主要组件。DagFileProcessorManager
是执行无限循环的进程,用于确定需要处理哪些文件,而 DagFileProcessorProcess
是一个单独的进程,用于启动将单个文件转换为一个或多个 DAG 对象。
DagFileProcessorManager
运行用户代码。因此,你可以决定在调度程序进程的不同主机中以独立进程的形式运行它。如果你决定以独立进程的形式运行它,则需要设置此配置:AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=True
并运行 airflow dag-processor
CLI 命令,否则,启动调度程序进程(airflow scheduler
)也会启动 DagFileProcessorManager
。
DagFileProcessorManager
具有以下步骤
检查新文件:如果自上次刷新 DAG 以来经过的时间 > dag_dir_list_interval,则更新文件路径列表
排除最近处理的文件:排除最近处理时间晚于 min_file_process_interval 且未修改的文件
排队文件路径:将发现的文件添加到文件路径队列
处理文件:为每个文件启动一个新的
DagFileProcessorProcess
,最多 parsing_processes 个收集结果:收集任何已完成 DAG 处理程序的结果
记录统计信息:打印统计信息并发出
dag_processing.total_parse_time
DagFileProcessorProcess
具有以下步骤
处理文件:整个过程必须在 dag_file_processor_timeout 内完成
DAG 文件作为 Python 模块加载:必须在 dagbag_import_timeout 内完成
进程模块:在 Python 模块中查找 DAG 对象
返回 DagBag:为
DagFileProcessorManager
提供已发现 DAG 对象的列表