集群策略¶
如果你想在集群级别检查或修改 DAG 或任务,集群策略可以实现这一点。它们有三个主要目的:
检查 DAG / 任务是否符合特定标准
为 DAG / 任务设置默认参数
执行自定义路由逻辑
集群策略主要有三种类型:
task_policy
:接收一个名为task
的BaseOperator
参数。该策略在加载时从 DagBag 解析任务创建任务时执行。这意味着整个任务定义可以在任务策略中修改。它与在 DagRun 中运行的特定任务无关。定义的task_policy
将应用于将来执行的所有任务实例。task_instance_mutation_hook
:接收一个名为task_instance
的TaskInstance
参数。`task_instance_mutation_hook` 不应用于任务本身,而是应用于与特定 DagRun 相关的任务实例。它在“工作器”中执行,而不是在 DAG 文件处理器中执行,恰好在任务实例执行之前。该策略仅应用于该任务当前执行的运行(即实例)。
DAG 和任务集群策略可以抛出 AirflowClusterPolicyViolation
异常,以指示传递给它们的 DAG/任务不合规,不应被加载。
当有意跳过某个 DAG 时,它们也可以抛出 AirflowClusterPolicySkipDag
异常。与 AirflowClusterPolicyViolation
不同,此异常不会显示在 Airflow Web UI 上(在内部,它不会记录在元数据库的 import_error
表中)。
集群策略设置的任何额外属性优先于你在 DAG 文件中定义的属性;例如,如果你在 DAG 文件中的任务上设置了 sla
,然后集群策略也设置了 sla
,则集群策略的值将优先。
如何定义策略函数¶
有两种配置集群策略的方法:
在 Python 搜索路径中的某个位置创建一个
airflow_local_settings.py
文件($AIRFLOW_HOME 下的config/
文件夹是一个不错的“默认”位置),然后将与上述一个或多个集群策略名称(例如dag_policy
)匹配的可调用对象添加到文件中。
有关如何配置本地设置的详细信息,请参阅 配置本地设置。
通过在自定义模块中使用 Pluggy 接口的 setuptools entrypoint。
在 2.6 版本中添加。
此方法更高级,适用于已经熟悉 Python 打包的人员。
首先在模块中创建你的策略函数
from airflow.policies import hookimpl @hookimpl def task_policy(task) -> None: # Mutate task in place # ... print(f"Hello from {__file__}")
然后将 entrypoint 添加到你的项目规范中。例如,使用
pyproject.toml
和setuptools
[build-system] requires = ["setuptools", "wheel"] build-backend = "setuptools.build_meta" [project] name = "my-airflow-plugin" version = "0.0.1" # ... dependencies = ["apache-airflow>=2.6"] [project.entry-points.'airflow.policy'] _ = 'my_airflow_plugin.policies'
entrypoint 组必须是
airflow.policy
,名称将被忽略。值应该是使用@hookimpl
标记装饰的模块(或类)。完成这些步骤并将你的分发包安装到你的 Airflow 环境后,各种 Airflow 组件将调用策略函数。(具体的调用顺序未定义,因此如果你有多个插件,请不要依赖任何特定的调用顺序)。
需要注意的一点(无论使用哪种方式定义策略函数)是参数名称必须与以下文档中所示的完全匹配。
可用策略函数¶
- airflow.policies.task_policy(task)[source]¶
允许在任务加载到 DagBag 后对其进行修改。
它允许管理员重新配置某些任务的参数。或者,你可以抛出
AirflowClusterPolicyViolation
异常来阻止 DAG 执行。以下是一些展示其用途的示例:
你可以为使用
SparkOperator
的任务强制使用特定的队列(例如spark
队列),以确保这些任务被分配到正确的工作器。你可以强制执行任务超时策略,确保没有任务运行超过 48 小时。
- 参数:
task (airflow.models.baseoperator.BaseOperator) – 要修改的任务
- airflow.policies.dag_policy(dag)[source]¶
允许在 DAG 加载到 DagBag 后对其进行修改。
它允许管理员重新配置某些 DAG 的参数。或者,你可以抛出
AirflowClusterPolicyViolation
异常来阻止 DAG 执行。以下是一些展示其用途的示例:
你可以为 DAG 强制指定默认用户
检查每个 DAG 是否配置了标签
- 参数:
dag (airflow.models.dag.DAG) – 要修改的 DAG
- airflow.policies.task_instance_mutation_hook(task_instance)[source]¶
允许在任务实例被 Airflow 调度器排队之前对其进行修改。
例如,这可以用于在重试期间修改任务实例。
- 参数:
task_instance (airflow.models.taskinstance.TaskInstance) – 要修改的任务实例
示例¶
DAG 策略¶
此策略检查每个 DAG 是否至少定义了一个标签
def dag_policy(dag: DAG):
"""Ensure that DAG has at least one tag and skip the DAG with `only_for_beta` tag."""
if not dag.tags:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
)
if "only_for_beta" in dag.tags:
raise AirflowClusterPolicySkipDag(
f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
)
注意
为避免循环导入,如果在集群策略的类型注解中使用了 DAG
,请确保从 airflow.models
导入,而不是从 airflow
导入。
注意
DAG 策略在 DAG 完全加载后应用,因此覆盖 default_args
参数无效。如果你想覆盖默认的 operator 设置,请改用任务策略。
任务策略¶
以下是强制对每个任务应用最大超时策略的示例
class TimedOperator(BaseOperator, ABC):
timeout: timedelta
def task_policy(task: TimedOperator):
if task.task_type == "HivePartitionSensor":
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)
你也可以实现保护机制来防止常见错误,而不仅仅是作为技术安全控制。例如,不允许运行没有 Airflow 所有者的任务。
def task_must_have_owners(task: BaseOperator):
if task.owner and not isinstance(task.owner, str):
raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")
if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
raise AirflowClusterPolicyViolation(
f"""Task must have non-None non-default owner. Current value: {task.owner}"""
)
如果你有多个要应用的检查,最佳实践是将这些规则整理在一个单独的 Python 模块中,并拥有一个执行多个此类自定义检查并聚合各种错误消息的单一策略/任务修改 hook,以便在 UI(和数据库中的 import_errors 表)中报告单个 AirflowClusterPolicyViolation
。
例如,你的 airflow_local_settings.py
可以遵循此模式
TASK_RULES: list[Callable[[BaseOperator], None]] = [
task_must_have_owners,
]
def _check_task_rules(current_task: BaseOperator):
"""Check task rules for given task."""
notices = []
for rule in TASK_RULES:
try:
rule(current_task)
except AirflowClusterPolicyViolation as ex:
notices.append(str(ex))
if notices:
notices_list = " * " + "\n * ".join(notices)
raise AirflowClusterPolicyViolation(
f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
f"Notices:\n"
f"{notices_list}"
)
def example_task_policy(task: BaseOperator):
"""Ensure Tasks have non-default owners."""
_check_task_rules(task)
有关如何配置本地设置的详细信息,请参阅 配置本地设置。
任务实例修改¶
以下是将正在进行第二次(或更多次)重试的任务重新路由到不同队列的示例
def task_instance_mutation_hook(task_instance: TaskInstance):
if task_instance.try_number >= 1:
task_instance.queue = "retry_queue"
请注意,由于优先级权重是使用权重规则动态确定的,因此你无法在修改 hook 中更改任务实例的 priority_weight
。