池 (Pools)

当太多进程同时访问某些系统时,这些系统可能会不堪重负。Airflow 池可用于限制在任意任务集上的执行并行度。池的列表在 UI 中管理(菜单 -> 管理 -> ),方法是给池命名并为其分配一定数量的工作槽位。您还可以在那里决定池是否应在其对已占用槽位的计算中包含延迟任务

然后,可以通过在创建任务时使用 pool 参数将任务与现有池之一关联起来。

aggregate_db_message_job = BashOperator(
    task_id="aggregate_db_message_job",
    execution_timeout=timedelta(hours=3),
    pool="ep_data_pipeline_db_msg_agg",
    bash_command=aggregate_db_message_job_cmd,
    dag=dag,
)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

当槽位被填满时,任务将照常调度。任务占用的槽位数量可以通过 pool_slots 配置(见下节)。一旦达到容量,可运行的任务就会排队,并且它们的状态将在 UI 中显示。随着槽位的释放,排队的任务将根据任务及其后代的优先级权重开始运行。

请注意,如果任务未被分配到池,它们将被分配到默认池 default_pool,该池初始化时具有 128 个槽位,可以通过 UI 或 CLI 修改(但不能删除)。

使用多个池槽位

默认情况下,Airflow 任务将各自占用一个池槽位,但如果需要,可以使用 pool_slots 参数将其配置为占用更多槽位。当属于同一池的几个任务不具有相同的“计算权重”时,这尤其有用。

例如,考虑一个具有 2 个槽位的池,Pool(pool='maintenance', slots=2),以及以下任务

BashOperator(
    task_id="heavy_task",
    bash_command="bash backup_data.sh",
    pool_slots=2,
    pool="maintenance",
)

BashOperator(
    task_id="light_task1",
    bash_command="bash check_files.sh",
    pool_slots=1,
    pool="maintenance",
)

BashOperator(
    task_id="light_task2",
    bash_command="bash remove_files.sh",
    pool_slots=1,
    pool="maintenance",
)

由于重型任务配置为使用 2 个池槽位,因此它在运行时会耗尽池。因此,任何轻型任务都必须排队并等待重型任务完成才能执行。在这里,就资源使用而言,重型任务相当于两个轻型任务同时运行。

此实现可以防止系统资源不堪重负,这(在本例中)可能在重型和轻型任务同时运行时发生。另一方面,两个轻型任务可以同时运行,因为它们每个只占用一个池槽位,而重型任务必须等待两个池槽位可用才能执行。

警告

池和子 DAG 的交互方式可能与您最初的预期不同。子 DAG 将遵守您在顶层设置的任何池;必须直接在子 DAG 内部的任务上设置池。

此条目是否有帮助?