设置和清理¶
在数据工作流中,通常会创建一个资源(例如计算资源),使用它来完成一些工作,然后再将其清理。Airflow 提供设置和清理任务来支持此需求。
设置和清理任务的关键特性
如果您清除一个任务,其设置和清理也将被清除。
默认情况下,清理任务在评估 DAG 运行状态时会被忽略。
即使其工作任务失败,如果其设置成功,清理任务也会运行。
在设置针对任务组的依赖项时,清理任务会被忽略。
设置和清理的工作原理¶
基本用法¶
假设您有一个 DAG,它创建一个集群、运行一个查询并删除该集群。如果不使用设置和清理任务,您可能会设置以下关系
create_cluster >> run_query >> delete_cluster
要将 create_cluster 和 delete_cluster 作为设置和清理任务启用,我们将它们标记为 as_setup
和 as_teardown
方法,并在它们之间添加上游/下游关系
create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()
create_cluster >> delete_cluster
为了方便起见,我们可以通过将 create_cluster
传递给 as_teardown
方法来在一行中完成此操作
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)
这是此 DAG 的图
观察
如果您清除
run_query
以再次运行它,则create_cluster
和delete_cluster
都将被清除。如果
run_query
失败,delete_cluster
仍将运行。DAG 运行的成功与否将仅取决于
run_query
的成功。
此外,如果我们有多个任务需要包装,我们可以将清理用作上下文管理器
with delete_cluster().as_teardown(setups=create_cluster()):
[RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
WorkOne() >> [do_this_stuff(), do_other_stuff()]
这将设置 create_cluster 在上下文中任务之前运行,并在它们之后运行 delete_cluster。
这是它,在图中显示
请注意,如果您尝试将已实例化的任务添加到设置上下文中,则需要显式执行此操作
with my_teardown_task as scope:
scope.add_task(work_task) # work_task was already instantiated elsewhere
设置“作用域”¶
设置及其清理之间的任务位于设置/清理对的“作用域”内。
让我们看一个例子
s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
w2 >> w4
以及图
在上面的示例中,w1
和 w2
“位于” s1
和 t1
之间,因此假定需要 s1
。因此,如果 w1
或 w2
被清除,s1
和 t1
也将被清除。但是,如果 w3
或 w4
被清除,则 s1
和 t1
都不会被清除。
您可以将多个设置任务连接到单个清理。如果至少有一个设置成功完成,清理将运行。
您可以有一个没有清理的设置
create_cluster >> run_query >> other_task
在这种情况下,create_cluster 的所有下游都被假定为需要它。因此,如果您清除 other_task,它也会清除 create_cluster。假设我们在 run_query 之后为 create_cluster 添加一个清理
create_cluster >> run_query >> other_task
run_query >> delete_cluster.as_teardown(setups=create_cluster)
现在,Airflow 可以推断出 other_task 不需要 create_cluster,因此如果我们清除 other_task,则不会同时清除 create_cluster。
在该示例中,我们(在假想的文档世界中)实际上想要删除该集群。但是,假设我们没有这样做,而只是想说“other_task 不需要 create_cluster”,那么我们可以使用 EmptyOperator 来限制设置的作用域
create_cluster >> run_query >> other_task
run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster)
隐式 ALL_SUCCESS 约束¶
设置作用域中的任何任务对其设置都有隐式的“all_success”约束。这对于确保如果清除了具有间接设置的任务,它将等待它们完成是必要的。如果设置失败或跳过,则依赖于它们的工作任务将被标记为失败或跳过。我们还要求直接位于设置下游的任何非清理任务都必须具有触发规则 ALL_SUCCESS。
控制 DAG 运行状态¶
设置/清理任务的另一个特点是您可以选择清理任务是否应对 DAG 运行状态产生影响。也许您不关心清理任务执行的“清理”工作是否失败,并且仅当“工作”任务失败时,您才认为 DAG 运行失败。默认情况下,DAG 运行状态不会考虑清理任务。
继续上面的示例,如果您希望运行的成功与否取决于 delete_cluster
,则在将 delete_cluster
设置为清理时,设置 on_failure_fail_dagrun=True
。例如
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True)
使用任务组进行创作¶
当从任务组指向任务组,或从任务组指向任务时,我们会忽略清理。这允许清理并行运行,并允许即使清理任务失败,DAG 执行也能继续进行。
考虑以下示例
with TaskGroup("my_group") as tg:
s1 = s1()
w1 = w1()
t1 = t1()
s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2
图
如果 t1
不是清理任务,则此 DAG 将有效地成为 s1 >> w1 >> t1 >> w2
。但是,由于我们将 t1
标记为清理,因此在 tg >> w2
中会被忽略。因此,DAG 等效于以下内容
s1 >> w1 >> [t1.as_teardown(setups=s1), w2]
现在让我们考虑一个嵌套的示例
with TaskGroup("my_group") as tg:
s1 = s1()
w1 = w1()
t1 = t1()
s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2
dag_s1 = dag_s1()
dag_t1 = dag_t1()
dag_s1 >> [tg, w2] >> dag_t1.as_teardown(setups=dag_s1)
图
在此示例中,s1
位于 dag_s1
的下游,因此它必须等待 dag_s1
成功完成。但是 t1
和 dag_t1
可以并发运行,因为 t1
在表达式 tg >> dag_t1
中被忽略。如果您清除 w2
,它将清除 dag_s1
和 dag_t1
,但不会清除任务组中的任何内容。
并行运行设置和清理¶
您可以并行运行设置任务
(
[create_cluster, create_bucket]
>> run_query
>> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)]
)
图
将它们放在一个组中,在视觉上会更好
with TaskGroup("setup") as tg_s:
create_cluster = create_cluster()
create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t
以及图
清理的触发规则行为¶
清理使用一个(不可配置的)触发规则,称为 ALL_DONE_SETUP_SUCCESS。使用此规则,只要所有上游都完成且至少有一个直接连接的设置成功,清理就会运行。如果清理的所有设置都被跳过或失败,这些状态将传播到清理。