集群策略

如果您想在集群范围内检查或修改 DAG 或任务,则可以使用集群策略。它们有三个主要目的

  • 检查 DAG/任务是否符合特定标准

  • 在 DAG/任务上设置默认参数

  • 执行自定义路由逻辑

集群策略主要有三种类型

  • dag_policy:接受一个名为 dagDAG 参数。在从 DagBag DagBag 加载 DAG 时运行。

  • task_policy:接受一个名为 taskBaseOperator 参数。当在加载时从 DagBag 解析任务创建任务时,会执行此策略。这意味着可以在任务策略中更改整个任务定义。它与 DagRun 中运行的特定任务无关。task_policy 定义应用于将来执行的所有任务实例。

  • task_instance_mutation_hook:接受一个名为 task_instanceTaskInstance 参数。task_instance_mutation_hook 不是应用于任务,而是应用于与特定 DagRun 相关的任务实例。它在“worker”中执行,而不是在 dag 文件处理器中,在执行任务实例之前执行。该策略仅应用于该任务的当前执行运行(即实例)。

DAG 和任务集群策略可以引发 AirflowClusterPolicyViolation 异常,以表明传递给它们的 dag/task 不符合规范,不应加载。

当需要有意跳过 DAG 时,它们也可以引发 AirflowClusterPolicySkipDag 异常。与 AirflowClusterPolicyViolation 不同,此异常不会显示在 Airflow Web UI 上(在内部,它不会记录在元数据库的 import_error 表中。)

集群策略设置的任何额外属性都优先于 DAG 文件中定义的属性;例如,如果在 DAG 文件中为任务设置了 sla,然后您的集群策略也设置了 sla,则集群策略的值将优先。

如何定义策略函数

有两种方法可以配置集群策略

  1. 在 python 搜索路径中的某个位置创建一个 airflow_local_settings.py 文件($AIRFLOW_HOME 下的 config/ 文件夹是一个很好的“默认”位置),然后将可调用对象添加到与上述一个或多个集群策略名称匹配的文件中(例如 dag_policy)。

有关如何配置本地设置的详细信息,请参阅 配置本地设置

  1. 通过使用自定义模块中的 setuptools 入口点,使用 Pluggy 接口。

    2.6 版本中的新功能。

    此方法更高级,适用于已经熟悉 python 打包的人员。

    首先在模块中创建您的策略函数

    from airflow.policies import hookimpl
    
    
    @hookimpl
    def task_policy(task) -> None:
        # Mutate task in place
        # ...
        print(f"Hello from {__file__}")
    

    然后将入口点添加到您的项目规范中。例如,使用 pyproject.tomlsetuptools

    [build-system]
    requires = ["setuptools", "wheel"]
    build-backend = "setuptools.build_meta"
    
    [project]
    name = "my-airflow-plugin"
    version = "0.0.1"
    # ...
    
    dependencies = ["apache-airflow>=2.6"]
    [project.entry-points.'airflow.policy']
    _ = 'my_airflow_plugin.policies'
    

    入口点组必须是 airflow.policy,并且名称将被忽略。值应该是用 @hookimpl 标记装饰的模块(或类)。

    完成此操作并将您的发行版安装到 Airflow 环境中后,策略函数将由各种 Airflow 组件调用。(确切的调用顺序是未定义的,因此如果您有多个插件,请不要依赖任何特定的调用顺序)。

需要注意的一件重要事情(对于定义策略函数的任何一种方式)是参数名称必须与下面记录的完全匹配。

可用的策略函数

airflow.policies.task_policy(task)[源代码]

允许在 DagBag 中加载任务后修改任务。

它允许管理员重新连接某些任务的参数。或者,您可以引发 AirflowClusterPolicyViolation 异常以阻止 DAG 被执行。

以下是一些说明其用途的示例

  • 您可以为使用 SparkOperator 的任务强制执行特定的队列(例如 spark 队列),以确保这些任务连接到正确的工作人员

  • 您可以强制执行任务超时策略,确保没有任务运行超过 48 小时

参数

task (airflow.models.baseoperator.BaseOperator) – 要修改的任务

airflow.policies.dag_policy(dag)[源代码]

允许在 DagBag 中加载 DAG 后修改 DAG。

它允许管理员重新连接某些 DAG 的参数。或者,您可以引发 AirflowClusterPolicyViolation 异常以阻止 DAG 被执行。

以下是一些说明其用途的示例

  • 您可以为 DAG 强制执行默认用户

  • 检查每个 DAG 是否都配置了标签

参数

dag (airflow.models.dag.DAG) – 要修改的 dag

airflow.policies.task_instance_mutation_hook(task_instance)[源代码]

允许在 Airflow 调度程序排队之前修改任务实例。

例如,这可以用于在重试期间修改任务实例。

参数

task_instance (airflow.models.taskinstance.TaskInstance) – 要修改的任务实例

airflow.policies.pod_mutation_hook(pod)[源代码]

在调度之前修改 Pod。

此设置允许修改 kubernetes.client.models.V1Pod 对象,然后再将它们传递给 Kubernetes 客户端进行调度。

例如,这可以用于向 KubernetesExecutor 或 KubernetesPodOperator 启动的每个工作器 Pod 添加 Sidecar 或 Init 容器。

airflow.policies.get_airflow_context_vars(context)[源代码]

将 Airflow 上下文变量注入默认的 Airflow 上下文变量中。

此设置允许获取 Airflow 上下文变量,这些变量是键值对。然后将它们注入到默认的 Airflow 上下文变量中,最终在运行任务时作为环境变量可用。dag_id、task_id、execution_date、dag_run_id、try_number 是保留的键。

参数

context – 感兴趣的任务实例的上下文。

示例

DAG 策略

此策略检查每个 DAG 是否至少定义了一个标签

def dag_policy(dag: DAG):
    """Ensure that DAG has at least one tag and skip the DAG with `only_for_beta` tag."""
    if not dag.tags:
        raise AirflowClusterPolicyViolation(
            f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
        )

    if "only_for_beta" in dag.tags:
        raise AirflowClusterPolicySkipDag(
            f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
        )


注意

为了避免导入循环,如果您在集群策略的类型注释中使用 DAG,请务必从 airflow.models 而不是 airflow 导入。

注意

DAG 策略在 DAG 完全加载后应用,因此覆盖 default_args 参数不起作用。如果要覆盖默认的运算符设置,请改用任务策略。

任务策略

这是一个在每个任务上强制执行最大超时策略的示例

class TimedOperator(BaseOperator, ABC):
    timeout: timedelta


def task_policy(task: TimedOperator):
    if task.task_type == "HivePartitionSensor":
        task.queue = "sensor_queue"
    if task.timeout > timedelta(hours=48):
        task.timeout = timedelta(hours=48)


您还可以实施保护措施,以防止常见错误,而不是作为技术安全控制。例如,不要在没有 Airflow 所有者的情况下运行任务

def task_must_have_owners(task: BaseOperator):
    if task.owner and not isinstance(task.owner, str):
        raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")

    if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
        raise AirflowClusterPolicyViolation(
            f"""Task must have non-None non-default owner. Current value: {task.owner}"""
        )


如果您有多个检查要应用,最佳实践是在单独的 Python 模块中管理这些规则,并使用单个策略/任务修改钩子来执行多个自定义检查并聚合各种错误消息,以便可以在 UI 中报告单个 AirflowClusterPolicyViolation(以及数据库中的导入错误表)。

例如,您的 airflow_local_settings.py 可能遵循以下模式

TASK_RULES: list[Callable[[BaseOperator], None]] = [
    task_must_have_owners,
]


def _check_task_rules(current_task: BaseOperator):
    """Check task rules for given task."""
    notices = []
    for rule in TASK_RULES:
        try:
            rule(current_task)
        except AirflowClusterPolicyViolation as ex:
            notices.append(str(ex))
    if notices:
        notices_list = " * " + "\n * ".join(notices)
        raise AirflowClusterPolicyViolation(
            f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
            f"Notices:\n"
            f"{notices_list}"
        )


def example_task_policy(task: BaseOperator):
    """Ensure Tasks have non-default owners."""
    _check_task_rules(task)


有关如何配置本地设置的详细信息,请参阅 配置本地设置

任务实例修改

这是一个将第二次(或更高次)重试的任务重新路由到不同队列的示例

def task_instance_mutation_hook(task_instance: TaskInstance):
    if task_instance.try_number >= 1:
        task_instance.queue = "retry_queue"


请注意,由于优先级权重是使用权重规则动态确定的,因此您无法在修改钩子中更改任务实例的 priority_weight

此条目是否有帮助?