Airflow 峰会 2025 将于 10 月 07-09 日举行。立即注册获取早鸟票!

设置和拆卸

在数据工作流程中,通常需要创建一个资源(例如计算资源),使用它完成一些工作,然后将其拆卸。Airflow 提供了设置和拆卸任务来支持这一需求。

设置和拆卸任务的关键特性

  • 如果您清除一个任务,其设置和拆卸任务也将被清除。

  • 默认情况下,拆卸任务在评估 DAG 运行状态时会被忽略。

  • 如果其设置成功,即使工作任务失败,拆卸任务也会运行。但如果设置被跳过,拆卸任务也会跳过。

  • 在针对任务组设置依赖关系时,拆卸任务会被忽略。

  • 即使 DAG 运行被手动设置为“失败”或“成功”,拆卸任务也会执行,以确保资源被清理。

设置和拆卸的工作原理

基本用法

假设您有一个 DAG,它创建集群,运行查询,然后删除集群。如果不使用设置和拆卸任务,您可能会设置以下依赖关系

create_cluster >> run_query >> delete_cluster

要将 create_clusterdelete_cluster 启用为设置和拆卸任务,我们使用 as_setupas_teardown 方法标记它们,并在它们之间添加上游/下游依赖关系

create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()
create_cluster >> delete_cluster

为了方便,我们可以通过将 create_cluster 传递给 as_teardown 方法在一行中完成此操作

create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)

这是此 DAG 的图

../_images/setup-teardown-simple.png

注意事项

  • 如果您清除 run_query 以便再次运行它,则 create_clusterdelete_cluster 都会被清除。

  • 如果 run_query 失败,则 delete_cluster 仍将运行。

  • DAG 运行的成功取决于 run_query 的成功。

此外,如果我们有多个要包裹的任务,我们可以使用拆卸作为上下文管理器

with delete_cluster().as_teardown(setups=create_cluster()):
    [RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
    WorkOne() >> [do_this_stuff(), do_other_stuff()]

这将设置 create_cluster 在上下文中的任务之前运行,并在这些任务之后运行 delete_cluster。

图表中显示如下

../_images/setup-teardown-complex.png

请注意,如果您尝试将一个已实例化的任务添加到设置上下文中,则需要显式执行此操作

with my_teardown_task as scope:
    scope.add_task(work_task)  # work_task was already instantiated elsewhere

设置的“范围”

设置与其拆卸之间的任务处于设置/拆卸对的“范围”内。

我们来看一个示例

s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
w2 >> w4

以及图表

../_images/setup-teardown-scope.png

在上面的示例中,w1w2 处于 s1t1 “之间”,因此假定它们需要 s1。因此,如果清除 w1w2s1t1 也将被清除。但如果清除 w3w4s1t1 都不会被清除。

您可以将多个设置任务连接到一个拆卸任务。如果至少有一个设置任务成功完成,拆卸任务将运行。

您可以有一个没有拆卸的设置

create_cluster >> run_query >> other_task

在这种情况下,create_cluster 的所有下游任务都假定需要它。因此,如果您清除 other_task,它也会清除 create_cluster。假设我们在 run_query 之后为 create_cluster 添加一个拆卸

create_cluster >> run_query >> other_task
run_query >> delete_cluster.as_teardown(setups=create_cluster)

现在,Airflow 可以推断 other_task 不需要 create_cluster,因此如果我们清除 other_task,create_cluster 也不会被清除。

在那个示例中,我们(在假想的文档中)实际上想要删除集群。但假设我们不这样做,而只是想说“other_task 不需要 create_cluster”,那么我们可以使用 EmptyOperator 来限制设置的范围

create_cluster >> run_query >> other_task
run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster)

隐式的 ALL_SUCCESS 约束

任何处于设置范围内的任务对其设置都有一个隐式的“all_success”约束。这对于确保当一个具有间接设置的任务被清除时,它会等待这些设置完成是必要的。如果设置失败或被跳过,依赖于它们的工作任务将被标记为失败或跳过。我们还要求直接位于设置下游的任何非拆卸任务必须具有触发规则 ALL_SUCCESS。

控制 DAG 运行状态

设置/拆卸任务的另一个特性是您可以选择拆卸任务是否应对 DAG 运行状态产生影响。也许您不关心拆卸任务执行的“清理”工作是否失败,而只在“工作”任务失败时才认为 DAG 运行失败。默认情况下,拆卸任务不计入 DAG 运行状态的评估。

继续上面的示例,如果您希望运行的成功依赖于 delete_cluster,则在将 delete_cluster 设置为拆卸任务时,设置 on_failure_fail_dagrun=True。例如

create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True)

使用任务组进行编排

从任务组到任务组,或从任务组到任务添加依赖关系时,我们会忽略拆卸任务。这允许拆卸任务并行运行,并允许 DAG 执行即使在拆卸任务失败的情况下也能继续进行。

考虑这个示例

with TaskGroup("my_group") as tg:
    s1 = s1()
    w1 = w1()
    t1 = t1()
    s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2

图表

../_images/setup-teardown-group.png

如果 t1 不是一个拆卸任务,那么这个 DAG 实际上将是 s1 >> w1 >> t1 >> w2。但是由于我们已将 t1 标记为拆卸任务,它在 tg >> w2 中被忽略了。因此,该 DAG 等同于以下内容

s1 >> w1 >> [t1.as_teardown(setups=s1), w2]

现在我们考虑一个嵌套的示例

with TaskGroup("my_group") as tg:
    s1 = s1()
    w1 = w1()
    t1 = t1()
    s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2
dag_s1 = dag_s1()
dag_t1 = dag_t1()
dag_s1 >> [tg, w2] >> dag_t1.as_teardown(setups=dag_s1)

图表

../_images/setup-teardown-nesting.png

在此示例中,s1dag_s1 的下游任务,因此它必须等待 dag_s1 成功完成。但是 t1dag_t1 可以并行运行,因为 t1 在表达式 tg >> dag_t1 中被忽略了。如果您清除 w2,它将清除 dag_s1dag_t1,但不会清除任务组中的任何内容。

并行运行设置和拆卸

您可以并行运行设置任务

(
    [create_cluster, create_bucket]
    >> run_query
    >> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)]
)

图表

../_images/setup-teardown-parallel.png

将它们放入一个组中在视觉上可能更佳

with TaskGroup("setup") as tg_s:
    create_cluster = create_cluster()
    create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
    delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
    delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t

以及图表

../_images/setup-teardown-setup-group.png

拆卸的触发规则行为

拆卸任务使用一个(不可配置的)触发规则,称为 ALL_DONE_SETUP_SUCCESS。使用此规则,只要所有上游任务完成且至少一个直接连接的设置任务成功,拆卸任务就会运行。如果拆卸任务的所有设置都被跳过或失败,这些状态将传播到拆卸任务。

手动更改 DAG 状态的副作用

由于拆卸任务通常用于清理资源,因此即使 DAG 被手动终止,它们也需要运行。为了尽早终止,用户可以手动将 DAG 运行标记为“成功”或“失败”,这会在完成之前杀死所有任务。如果 DAG 包含拆卸任务,它们仍将执行。因此,允许调度拆卸任务的一个副作用是,即使用户请求,DAG 也不会立即进入终止状态。

此条目是否有帮助?