优先级权重

priority_weight 定义执行器队列中的优先级。默认的 priority_weight1,可以提升到任何整数。此外,每个任务都有一个真正的 priority_weight,它是基于其 weight_rule 计算的,该规则定义了用于计算任务有效总优先级权重的方法。

以下是权重计算方法。默认情况下,Airflow 的权重计算方法是 downstream

downstream

任务的有效权重是所有下游后代的总和。因此,当使用正权重值时,上游任务将具有更高的权重,并将更积极地被调度。当你有多个 DAG 运行实例,并且希望在每个 DAG 继续处理下游任务之前完成所有运行的所有上游任务时,这很有用。

upstream

有效权重是所有上游祖先的总和。这是相反的情况,即下游任务具有更高的权重,并且在使用正权重值时将更积极地被调度。当您有多个 DAG 运行实例,并且希望在启动其他 DAG 运行的上游任务之前完成每个 DAG 时,这很有用。

absolute

有效权重是指定的精确的 priority_weight,没有额外的权重。当您确切知道每个任务应该具有的优先级权重时,您可能希望这样做。此外,当设置为 absolute 时,还有一个额外的好处,即可以显著加快大型 DAG 的任务创建过程。

priority_weight 参数可以与 一起使用。

注意

由于大多数数据库引擎都使用 32 位整数,因此任何计算或定义的 priority_weight 的最大值为 2,147,483,647,最小值为 -2,147,483,648。

自定义权重规则

2.9.0 版本中的新功能。

您可以通过扩展 PriorityWeightStrategy 类并在插件中注册它来实现您自己的自定义权重计算方法。

airflow/example_dags/plugins/decreasing_priority_weight_strategy.py[源代码]

class DecreasingPriorityStrategy(PriorityWeightStrategy):
    """A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""

    def get_weight(self, ti: TaskInstance):
        return max(3 - ti.try_number + 1, 1)


class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
    name = "decreasing_priority_weight_strategy_plugin"
    priority_weight_strategies = [DecreasingPriorityStrategy]


然后要使用它,您可以创建自定义类的实例,并在任务的 weight_rule 参数中提供它,或者提供自定义类的路径

from custom_weight_rule_module import CustomPriorityWeightStrategy

# provide the class instance
task1 = BashOperator(task_id="task", bash_command="echo 1", weight_rule=CustomPriorityWeightStrategy())

# or provide the path of the class
task1 = BashOperator(
    task_id="task",
    bash_command="echo 1",
    weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy",
)

这是一个 实验性功能

这个条目有帮助吗?