优先级权重¶
priority_weight
定义执行器队列中的优先级。默认 priority_weight
为 1
,可提升到任意整数。此外,每个任务都有一个真实的 priority_weight
,该权重根据其 weight_rule
计算得出,后者定义了用于任务的有效总优先级权重的加权方法。
以下是加权方法。默认情况下,Airflow 的加权方法是 downstream
。
downstream
任务的有效权重是所有下游后代的总和。因此,当使用正权重值时,上游任务将具有更高的权重,并会更积极地进行计划。当有多个 DAG 运行实例,并且希望在每个 DAG 继续处理下游任务之前,所有上游任务都完成所有运行时,这将非常有用。
upstream
有效权重是所有上游祖先的总和。这恰恰相反,当使用正权重值时,下游任务具有更高的权重,并且会更积极地进行计划。当有多个 DAG 运行实例,并且希望在启动其他 DAG 运行的上游任务之前完成每个 DAG 时,这将非常有用。
absolute
有效权重是指定的精确 priority_weight
,没有附加权重。当您确切知道每个任务应具有的优先级权重时,您可能希望这样做。此外,当设置为 absolute
时,对于非常大的 DAG,还有显著加快任务创建过程的额外效果。
priority_weight
参数可与 池 结合使用。
自定义权重规则¶
2.9.0 版中的新增功能。
您可以通过扩展 PriorityWeightStrategy
类并在插件中注册它来实现自己的自定义加权方法。
class DecreasingPriorityStrategy(PriorityWeightStrategy):
"""A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""
def get_weight(self, ti: TaskInstance):
return max(3 - ti._try_number + 1, 1)
class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
name = "decreasing_priority_weight_strategy_plugin"
priority_weight_strategies = [DecreasingPriorityStrategy]
然后要使用它,您可以创建自定义类的实例,并在任务的 weight_rule
参数中提供它,或提供自定义类的路径
from custom_weight_rule_module import CustomPriorityWeightStrategy
# provide the class instance
task1 = BashOperator(task_id="task", bash_command="echo 1", weight_rule=CustomPriorityWeightStrategy())
# or provide the path of the class
task1 = BashOperator(
task_id="task",
bash_command="echo 1",
weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy",
)
这是一个 实验性功能。