优先级权重¶
priority_weight
定义了执行器队列中的优先级。默认的 priority_weight
是 1
,可以提升到任意整数。此外,每个任务都有一个真实的 priority_weight
,它是根据其 weight_rule
计算得出的,weight_rule
定义了用于计算任务有效总优先级权重的加权方法。
以下是加权方法。默认情况下,Airflow 的加权方法是 downstream
。
downstream
任务的有效权重是所有下游后代的总和。因此,当使用正权重值时,上游任务将具有更高的权重,并在调度时更积极。当你有多个 DAG 运行实例,并且希望在每个 DAG 继续处理下游任务之前,所有运行的所有上游任务都能完成时,这很有用。
upstream
有效权重是所有上游祖先的总和。这与 downstream
相反,下游任务具有更高的权重,并在调度时更积极(当使用正权重值时)。当你有多个 DAG 运行实例,并且更倾向于让每个 DAG 完成后再开始其他 DAG 运行的上游任务时,这很有用。
absolute
有效权重就是指定的 priority_weight
,不进行额外的加权。当你清楚地知道每个任务应该具有的确切优先级权重时,可能希望这样做。此外,当设置为 absolute
时,还有一个额外的好处,可以显著加快任务创建过程,尤其是对于非常大的 DAG。
priority_weight
参数可以与 Pools (资源池) 结合使用。
注意
由于大多数数据库引擎对整数使用 32 位,任何计算或定义的 priority_weight
的最大值为 2,147,483,647,最小值为 -2,147,483,648。
自定义权重规则¶
在 2.9.0 版本中添加。
你可以通过扩展 PriorityWeightStrategy
类并在插件中注册它来实现自己的自定义加权方法。
src/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py
class DecreasingPriorityStrategy(PriorityWeightStrategy):
"""A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""
def get_weight(self, ti: TaskInstance):
return max(3 - ti.try_number + 1, 1)
class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
name = "decreasing_priority_weight_strategy_plugin"
priority_weight_strategies = [DecreasingPriorityStrategy]
要检查自定义优先级权重策略是否已在 Airflow 中可用,你可以运行 bash 命令 airflow plugins
。然后要使用它,你可以创建自定义类的实例并将其提供给任务的 weight_rule
参数,或者提供自定义类的路径。
src/airflow/example_dags/example_custom_weight.py
with DAG(
dag_id="example_custom_weight",
schedule="0 0 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
) as dag:
start = EmptyOperator(
task_id="start",
)
# provide the class instance
task_1 = BashOperator(task_id="task_1", bash_command="echo 1", weight_rule=DecreasingPriorityStrategy())
# or provide the path of the class
task_2 = BashOperator(
task_id="task_2",
bash_command="echo 1",
weight_rule="airflow.example_dags.plugins.decreasing_priority_weight_strategy.DecreasingPriorityStrategy",
)
task_non_custom = BashOperator(task_id="task_non_custom", bash_command="echo 1", priority_weight=2)
start >> [task_1, task_2, task_non_custom]
在 DAG 运行后,你可以在任务上检查 priority_weight
参数,以验证它正在使用自定义优先级策略规则。
这是一项实验性功能。