动态 DAG 生成¶
本文档描述了创建具有动态生成结构的 DAG,但 DAG 中任务数量在 DAG 运行之间保持不变。如果您想实现一个 DAG,其任务(或 Airflow 2.6 中的任务组)的数量可以根据前一个任务的输出/结果而变化,请参阅动态任务映射。
注意
生成任务和任务组的一致序列
在所有动态生成 DAG 的情况下,您都应该确保任务和任务组每次生成 DAG 时都以一致的序列生成,否则每次刷新页面时,网格视图(Grid View)中的任务和任务组序列可能会发生变化。这可以通过例如在数据库查询中使用稳定的排序机制或在 Python 中使用 sorted()
函数来实现。
使用环境变量的动态 DAG¶
如果您想使用变量来配置代码,在顶层代码中应始终使用环境变量,而不是Airflow 变量。在顶层代码中使用 Airflow 变量需要连接 Airflow 的元数据数据库来获取值,这会减慢解析速度并增加数据库的负载。请参阅Airflow 变量的最佳实践,了解如何在 DAG 中使用 Jinja 模板来最好地利用 Airflow 变量。
例如,您可以为生产和开发环境设置不同的 DEPLOYMENT
变量。在生产环境中,变量 DEPLOYMENT
可以设置为 PROD
,在开发环境中设置为 DEV
。然后,您可以根据环境变量的值,在生产和开发环境中构建不同的 DAG。
deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
task = Operator(param="prod-param")
elif deployment == "DEV":
task = Operator(param="dev-param")
生成嵌入元数据的 Python 代码¶
您可以外部生成包含元数据的 Python 代码作为可导入的常量。然后,您的 DAG 可以直接导入这些常量,并用于构建对象和依赖关系。这使得可以轻松地从多个 DAG 导入此类代码,而无需查找、加载和解析存储在常量中的元数据——这由 Python 解释器在处理“import”语句时自动完成。这听起来最初很奇怪,但生成此类代码并确保它是您可以从 DAG 中导入的有效 Python 代码却出奇地容易。
例如,假设您在 DAG 文件夹中动态生成了文件 my_company_utils/common.py
# This file is generated automatically !
ALL_TASKS = ["task1", "task2", "task3"]
然后您可以在所有 DAG 中像这样导入和使用 ALL_TASKS
常量
from my_company_utils.common import ALL_TASKS
with DAG(
dag_id="my_dag",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
):
for task in ALL_TASKS:
# create your operators and relations here
...
请不要忘记,在这种情况下,您需要在 my_company_utils
文件夹中添加一个空的 __init__.py
文件,并且应将 my_company_utils/*
行添加到 .airflowignore
文件(使用默认的 glob 语法)中,以便调度器在查找 DAG 时忽略整个文件夹。
使用结构化数据文件的外部配置生成动态 DAG¶
如果您需要使用更复杂的元数据来准备 DAG 结构,并且希望将数据保存在结构化的非 Python 格式中,您应该将数据导出到 DAG 文件夹中的文件并将其推送到 DAG 文件夹,而不是试图通过 DAG 的顶层代码来拉取数据——原因已在父章节 顶层 Python 代码 中解释。
元数据应以方便的文件格式(JSON, YAML 格式是好的选择)导出并与 DAG 一起存储在 DAG 文件夹中。理想情况下,元数据应与您从中加载 DAG 文件的模块位于同一包/文件夹中,因为这样您可以轻松地在 DAG 中找到元数据文件的位置。要读取的文件位置可以使用包含 DAG 的模块的 __file__
属性找到
my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
configuration = yaml.safe_load(yaml_file)
# Configuration dict is available here
注册动态 DAG¶
在使用 @dag
修饰器或 with DAG(..)
上下文管理器时,您可以动态生成 DAG,Airflow 将自动注册它们。
from datetime import datetime
from airflow.sdk import dag, task
configs = {
"config1": {"message": "first DAG will receive this message"},
"config2": {"message": "second DAG will receive this message"},
}
for config_name, config in configs.items():
dag_id = f"dynamic_generated_dag_{config_name}"
@dag(dag_id=dag_id, start_date=datetime(2022, 2, 1))
def dynamic_generated_dag():
@task
def print_message(message):
print(message)
print_message(config["message"])
dynamic_generated_dag()
以下代码将为每个配置生成一个 DAG:dynamic_generated_dag_config1
和 dynamic_generated_dag_config2
。它们都可以独立运行,并带有相关的配置。
如果您不希望自动注册 DAG,可以通过将 DAG 上的 auto_register=False
设置为禁用此行为。
版本 2.4 中的变化:自版本 2.4 起,通过调用 @dag
修饰的函数(或在 with DAG(...)
上下文管理器中使用的)创建的 DAG 会自动注册,不再需要存储在全局变量中。
优化执行期间的 DAG 解析延迟¶
版本 2.4 中新增。
这是一项实验性功能。
有时,当您从单个 DAG 文件生成大量动态 DAG 时,在任务执行期间解析 DAG 文件可能会导致不必要的延迟。其影响是任务开始前的延迟。
为什么会这样?您可能没有意识到,就在您的任务执行之前,Airflow 会解析 DAG 所属的 Python 文件。
Airflow 调度器(或更确切地说,DAG 文件处理器)需要加载完整的 DAG 文件来处理所有元数据。然而,任务执行只需要单个 DAG 对象来执行任务。知道了这一点,我们可以在执行任务时跳过生成不必要的 DAG 对象,从而缩短解析时间。当生成的 DAG 数量很大时,这种优化效果最明显。
您可以采用一种实验性的方法来优化此行为。请注意,并非总是可以使用此方法(例如,当下游 DAG 的生成依赖于上游 DAG 时),或者当您的 DAG 生成存在一些副作用时。此外,下面的代码片段相当复杂,虽然我们已经测试过并且在大多数情况下有效,但可能存在无法检测当前解析的 DAG 的情况,此时将恢复创建所有 DAG 或失败。请谨慎使用此解决方案并进行彻底测试。
Airflow 的魔法循环 博客文章中展示了一个很好的性能改进示例,该文章描述了如何在任务执行期间将解析时间从 120 秒缩短到 200 毫秒。(该示例写于 Airflow 2.4 之前,因此使用了 Airflow 的未文档化行为。)
在 Airflow 2.4 中,您可以改为使用 get_parsing_context()
方法以文档化且可预测的方式检索当前上下文。
遍历需要生成 DAG 的对象集合时,您可以使用上下文来确定是需要生成所有 DAG 对象(在 DAG 文件处理器中解析时),还是只生成单个 DAG 对象(执行任务时)。
get_parsing_context()
返回当前的解析上下文。上下文类型为 AirflowParsingContext
,如果只需要单个 DAG/任务,则其 dag_id
和 task_id
字段会被设置。如果需要“完整”解析(例如在 DAG 文件处理器中),上下文的 dag_id
和 task_id
将设置为 None
。
from airflow.sdk import DAG
from airflow.sdk import get_parsing_context
current_dag_id = get_parsing_context().dag_id
for thing in list_of_things:
dag_id = f"generated_dag_{thing}"
if current_dag_id is not None and current_dag_id != dag_id:
continue # skip generation of non-selected DAG
with DAG(dag_id=dag_id, ...):
...