设置和拆卸¶
在数据工作流程中,通常需要创建一个资源(例如计算资源),使用它完成一些工作,然后将其拆卸。Airflow 提供了设置和拆卸任务来支持这一需求。
设置和拆卸任务的关键特性
如果您清除一个任务,其设置和拆卸任务也将被清除。
默认情况下,拆卸任务在评估 DAG 运行状态时会被忽略。
如果其设置成功,即使工作任务失败,拆卸任务也会运行。但如果设置被跳过,拆卸任务也会跳过。
在针对任务组设置依赖关系时,拆卸任务会被忽略。
即使 DAG 运行被手动设置为“失败”或“成功”,拆卸任务也会执行,以确保资源被清理。
设置和拆卸的工作原理¶
基本用法¶
假设您有一个 DAG,它创建集群,运行查询,然后删除集群。如果不使用设置和拆卸任务,您可能会设置以下依赖关系
create_cluster >> run_query >> delete_cluster
要将 create_cluster
和 delete_cluster
启用为设置和拆卸任务,我们使用 as_setup
和 as_teardown
方法标记它们,并在它们之间添加上游/下游依赖关系
create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()
create_cluster >> delete_cluster
为了方便,我们可以通过将 create_cluster
传递给 as_teardown
方法在一行中完成此操作
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)
这是此 DAG 的图

注意事项
如果您清除
run_query
以便再次运行它,则create_cluster
和delete_cluster
都会被清除。如果
run_query
失败,则delete_cluster
仍将运行。DAG 运行的成功仅取决于
run_query
的成功。
此外,如果我们有多个要包裹的任务,我们可以使用拆卸作为上下文管理器
with delete_cluster().as_teardown(setups=create_cluster()):
[RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
WorkOne() >> [do_this_stuff(), do_other_stuff()]
这将设置 create_cluster 在上下文中的任务之前运行,并在这些任务之后运行 delete_cluster。
图表中显示如下

请注意,如果您尝试将一个已实例化的任务添加到设置上下文中,则需要显式执行此操作
with my_teardown_task as scope:
scope.add_task(work_task) # work_task was already instantiated elsewhere
设置的“范围”¶
设置与其拆卸之间的任务处于设置/拆卸对的“范围”内。
我们来看一个示例
s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
w2 >> w4
以及图表

在上面的示例中,w1
和 w2
处于 s1
和 t1
“之间”,因此假定它们需要 s1
。因此,如果清除 w1
或 w2
,s1
和 t1
也将被清除。但如果清除 w3
或 w4
,s1
和 t1
都不会被清除。
您可以将多个设置任务连接到一个拆卸任务。如果至少有一个设置任务成功完成,拆卸任务将运行。
您可以有一个没有拆卸的设置
create_cluster >> run_query >> other_task
在这种情况下,create_cluster 的所有下游任务都假定需要它。因此,如果您清除 other_task,它也会清除 create_cluster。假设我们在 run_query 之后为 create_cluster 添加一个拆卸
create_cluster >> run_query >> other_task
run_query >> delete_cluster.as_teardown(setups=create_cluster)
现在,Airflow 可以推断 other_task 不需要 create_cluster,因此如果我们清除 other_task,create_cluster 也不会被清除。
在那个示例中,我们(在假想的文档中)实际上想要删除集群。但假设我们不这样做,而只是想说“other_task 不需要 create_cluster”,那么我们可以使用 EmptyOperator 来限制设置的范围
create_cluster >> run_query >> other_task
run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster)
隐式的 ALL_SUCCESS 约束¶
任何处于设置范围内的任务对其设置都有一个隐式的“all_success”约束。这对于确保当一个具有间接设置的任务被清除时,它会等待这些设置完成是必要的。如果设置失败或被跳过,依赖于它们的工作任务将被标记为失败或跳过。我们还要求直接位于设置下游的任何非拆卸任务必须具有触发规则 ALL_SUCCESS。
控制 DAG 运行状态¶
设置/拆卸任务的另一个特性是您可以选择拆卸任务是否应对 DAG 运行状态产生影响。也许您不关心拆卸任务执行的“清理”工作是否失败,而只在“工作”任务失败时才认为 DAG 运行失败。默认情况下,拆卸任务不计入 DAG 运行状态的评估。
继续上面的示例,如果您希望运行的成功依赖于 delete_cluster
,则在将 delete_cluster
设置为拆卸任务时,设置 on_failure_fail_dagrun=True
。例如
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True)
并行运行设置和拆卸¶
您可以并行运行设置任务
(
[create_cluster, create_bucket]
>> run_query
>> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)]
)
图表

将它们放入一个组中在视觉上可能更佳
with TaskGroup("setup") as tg_s:
create_cluster = create_cluster()
create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t
以及图表

拆卸的触发规则行为¶
拆卸任务使用一个(不可配置的)触发规则,称为 ALL_DONE_SETUP_SUCCESS。使用此规则,只要所有上游任务完成且至少一个直接连接的设置任务成功,拆卸任务就会运行。如果拆卸任务的所有设置都被跳过或失败,这些状态将传播到拆卸任务。
手动更改 DAG 状态的副作用¶
由于拆卸任务通常用于清理资源,因此即使 DAG 被手动终止,它们也需要运行。为了尽早终止,用户可以手动将 DAG 运行标记为“成功”或“失败”,这会在完成之前杀死所有任务。如果 DAG 包含拆卸任务,它们仍将执行。因此,允许调度拆卸任务的一个副作用是,即使用户请求,DAG 也不会立即进入终止状态。