动态 DAG 生成¶
本文档描述了创建结构动态生成但 DAG Run 之间 DAG 中任务数量不变的 DAG。如果您想实现一个基于先前任务的输出/结果可以更改任务(或自 Airflow 2.6 起的任务组)数量的 DAG,请参阅 动态任务映射。
注意
生成任务和任务组的一致序列
在所有动态生成 DAG 的情况下,您都应确保每次生成 DAG 时都以一致的序列生成任务和任务组,否则您最终可能会在每次刷新页面时在网格视图中更改任务和任务组的序列。例如,可以通过在数据库查询中使用稳定的排序机制或在 Python 中使用 sorted()
函数来实现此目的。
具有环境变量的动态 DAG¶
如果您想使用变量来配置代码,则应始终在顶级代码中使用 环境变量,而不是 Airflow 变量。在顶级代码中使用 Airflow 变量会创建到 Airflow 元数据数据库的连接以获取值,这会减慢解析速度并给数据库带来额外的负载。请参阅 关于 Airflow 变量的最佳实践,以便在 DAG 中使用 Jinja 模板充分利用 Airflow 变量。
例如,您可以为您的生产和开发环境以不同的方式设置 DEPLOYMENT
变量。变量 DEPLOYMENT
可以在您的生产环境中设置为 PROD
,在您的开发环境中设置为 DEV
。然后,您可以根据环境变量的值在生产和开发环境中以不同的方式构建您的 DAG。
deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
task = Operator(param="prod-param")
elif deployment == "DEV":
task = Operator(param="dev-param")
生成带有嵌入元数据的 Python 代码¶
您可以外部生成包含元数据(作为可导入常量)的 Python 代码。您的 DAG 随后可以直接导入此类常量,并用于构建对象和构建依赖项。这样便于从多个 DAG 导入此类代码,而无需查找、加载和解析存储在常量中的元数据 - 当 Python 解释器处理“import”语句时,它会自动完成此操作。这听起来起初很奇怪,但生成此类代码并确保这是您可以从 DAG 导入的有效 Python 代码出人意料地容易。
例如,假设您动态生成(在您的 DAG 文件夹中)my_company_utils/common.py
文件
# This file is generated automatically !
ALL_TASKS = ["task1", "task2", "task3"]
然后,您可以像这样在所有 DAG 中导入并使用 ALL_TASKS
常量
from my_company_utils.common import ALL_TASKS
with DAG(
dag_id="my_dag",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
):
for task in ALL_TASKS:
# create your operators and relations here
...
请不要忘记,在这种情况下,您需要在 my_company_utils
文件夹中添加空的 __init__.py
文件,并且您应该将 my_company_utils/.*
行添加到 .airflowignore
文件(如果使用正则表达式忽略语法),以便当调度程序查找 DAG 时忽略整个文件夹。
具有来自结构化数据文件的外部配置的动态 DAG¶
如果您需要使用更复杂的元数据来准备 DAG 结构,并且您更愿意将数据保留在结构化的非 Python 格式中,则您应该将数据导出到 DAG 文件夹中的一个文件中,并将其推送到 DAG 文件夹,而不是尝试通过 DAG 的顶级代码提取数据 - 原因在父级 顶级 Python 代码 中进行了说明。
元数据应导出并与 DAG 一起存储在 DAG 文件夹中的一个方便的文件格式(JSON、YAML 格式是不错的选择)中。理想情况下,元数据应与您从中加载它的 DAG 文件的模块位于同一包/文件夹中,因为这样您就可以轻松地在 DAG 中找到元数据文件的位置。可以使用包含 DAG 的模块的 __file__
属性找到要读取的文件的位置
my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
configuration = yaml.safe_load(yaml_file)
# Configuration dict is available here
注册动态 DAG¶
当使用 @dag
装饰器或 with DAG(..)
上下文管理器时,您可以动态生成 DAG,而 Airflow 会自动注册它们。
from datetime import datetime
from airflow.decorators import dag, task
configs = {
"config1": {"message": "first DAG will receive this message"},
"config2": {"message": "second DAG will receive this message"},
}
for config_name, config in configs.items():
dag_id = f"dynamic_generated_dag_{config_name}"
@dag(dag_id=dag_id, start_date=datetime(2022, 2, 1))
def dynamic_generated_dag():
@task
def print_message(message):
print(message)
print_message(config["message"])
dynamic_generated_dag()
以下代码将为每个配置生成一个 DAG:dynamic_generated_dag_config1
和 dynamic_generated_dag_config2
。它们中的每一个都可以使用相关配置单独运行。
如果您不希望自动注册 DAG,可以通过在 DAG 上设置 auto_register=False
来禁用该行为。
2.4 版中已更改:从 2.4 版开始,通过调用 @dag
修饰函数(或在 with DAG(...)
上下文管理器中使用)创建的 DAG 会自动注册,不再需要存储在全局变量中。
优化执行期间的 DAG 解析延迟¶
2.4 版中的新增功能。
这是一个 实验性功能。
有时,当您从单个 DAG 文件生成大量动态 DAG 时,在任务执行期间解析 DAG 文件时可能会造成不必要的延迟。影响是任务开始前的延迟。
为什么会发生这种情况?您可能不知道,但在任务执行之前,Airflow 会解析 DAG 所在的 Python 文件。
Airflow 调度程序(或 DAG 文件处理器)需要加载完整的 DAG 文件才能处理所有元数据。但是,任务执行只需要一个 DAG 对象来执行任务。了解这一点,我们可以在执行任务时跳过不必要的 DAG 对象的生成,从而缩短解析时间。当生成的 DAG 数量较多时,此优化最有效。
您可以采用一种实验性方法来优化此行为。请注意,它并不总是可用的(例如,当后续 DAG 的生成依赖于之前的 DAG 时)或当您的 DAG 生成存在某些副作用时。此外,以下代码片段非常复杂,虽然我们对其进行了测试并且它在大多数情况下都能正常工作,但在某些情况下,当前解析的 DAG 的检测可能会失败,并且它将恢复为创建所有 DAG 或失败。请谨慎使用此解决方案并对其进行彻底测试。
您可以在 Airflow 的 Magic Loop 博客文章中看到性能改进的一个很好的示例,该文章描述了如何将任务执行期间的解析从 120 秒减少到 200 毫秒。(该示例是在 Airflow 2.4 之前编写的,因此它使用了 Airflow 的未记录行为。)
在 Airflow 2.4 中,您可以使用 get_parsing_context()
方法以可记录且可预测的方式检索当前上下文。
在对要为其生成 DAG 的事物的集合进行迭代时,你可以使用上下文来确定是否需要生成所有 DAG 对象(在 DAG 文件处理器中进行解析时),或仅生成单个 DAG 对象(在执行任务时)。
get_parsing_context()
返回当前解析上下文。上下文为 AirflowParsingContext
,如果仅需要单个 DAG/任务,则它包含已设置的 dag_id
和 task_id
字段。如果需要“完全”解析(例如在 DAG 文件处理器中),则上下文的 dag_id
和 task_id
设置为 None
。
from airflow.models.dag import DAG
from airflow.utils.dag_parsing_context import get_parsing_context
current_dag_id = get_parsing_context().dag_id
for thing in list_of_things:
dag_id = f"generated_dag_{thing}"
if current_dag_id is not None and current_dag_id != dag_id:
continue # skip generation of non-selected DAG
with DAG(dag_id=dag_id, ...):
...