在数据管道中,我们通常需要在进行实际“工作”之前创建基础设施资源,例如集群或现有集群中的 GPU 节点,并在工作完成后将其删除。Airflow 2.7 添加了“设置”和“清理”任务,以更好地支持这种类型的数据管道。本文旨在重点介绍其主要功能,以便您了解可能实现的功能。有关如何使用设置和清理任务的完整文档,请参阅设置和清理文档。
为什么需要设置和清理任务?
在我们深入研究示例之前,让我先从高层面阐述一下设置和清理任务带来的益处。
更富有表现力的依赖关系
在设置和清理任务出现之前,上游和下游关系只能表示一件事:“这在之前发生”。有了设置和清理任务,实际上我们可以说“这依赖于那个”。而在实践中这意味着,如果你清除一个任务,并且它需要一个设置任务,那么该设置任务也会被清除。如果该设置任务关联了一个清理任务,该清理任务也将再次运行。
将工作与基础设施分离
有时你关心的 DAG 部分并不是清理任务。例如,假设你的 DAG 加载一些数据然后删除临时文件。只要数据加载成功,你就希望你的 DAG 被标记为成功。默认情况下,清理任务就是这样工作的;也就是说,在确定 DAG 运行状态时,它们会被忽略。
简单示例
一个简单的示例是一个设置/清理任务对和一个普通或“工作”任务。
设置任务和清理任务分别用向上和向下箭头表示。由此我们可以看到,create_cluster
是一个设置任务,而 delete_cluster
是一个清理任务。设置任务和清理任务之间的链接始终是点状的,以突出其特殊关系。
一些值得注意的事项
- 如果
create_cluster
失败,则run_query
和delete_cluster
都不会运行。 - 如果
create_cluster
成功而run_query
失败,则delete_cluster
仍将运行。 - 如果
create_cluster
被跳过,则run_query
和delete_cluster
也将被跳过。 - 默认情况下,如果
run_query
成功而delete_cluster
失败,则 DAG 运行仍将被标记为成功。(此行为可以覆盖)。
使用任务组编排
当我们设置某个任务位于任务组的下游时,任务组中的任何清理任务都会被忽略。这反映了一个假设,即通常我们不希望仅仅因为一个清理任务失败就停止 DAG 执行。所以,让我们将上面的 DAG 包装到一个任务组中,看看会发生什么。
以下是我们在代码中如何关联这些组的方式
with TaskGroup("do_emr") as do_emr:
create_cluster_task = create_cluster()
run_query(create_cluster_task) >> delete_cluster(create_cluster_task)
with TaskGroup("load") as load:
create_config_task = create_configuration()
load_data(create_config_task) >> delete_configuration(create_config_task)
do_emr >> load
在这段代码中,每个组都有一个清理任务,我们将第一个组指向第二个组。正如所描述的,delete_cluster
,一个清理任务,被忽略了。这带来了两个重要的结果:第一,即使它失败了,load
组仍然会运行;第二,delete_cluster
和 create_configuration
可以并行运行(一般来说,我们认为您不会希望等待清理操作完成再继续执行 DAG 中的其他任务)。当然,您可以通过在 delete_cluster
和 create_configuration
之间添加箭头来覆盖此行为。此外,此 DAG 的成功将仅取决于 load_data
任务是否成功完成。
结论
关于如何使用设置和清理任务编写 DAG 的详细信息,本文省略了很多,请参阅设置和清理文档。但希望本文能让您对设置和清理任务的可能性有足够的了解,从而开始看到它们如何改进您在 Airflow 中的数据管道。
想知道 Airflow 2.7 中还有哪些新功能?前往Airflow 2.7 主博客文章了解更多!
致谢
设置和清理任务是 AIP-52 的成果。感谢所有为此做出贡献的人,包括阅读 AIP 并投票的人。特别感谢 Ash Berlin-Taylor, Brent Bovenzi, Daniel Standish, Ephraim Anierobi, Jed Cunningham, Rahul Vats 和 Vikram Koka。
分享