最佳实践

创建新的 DAG 是一个三步过程

  • 编写 Python 代码以创建 DAG 对象,

  • 测试代码是否符合您的预期,

  • 配置环境依赖项以运行您的 DAG

本教程将向您介绍这三个步骤的最佳实践。

编写 DAG

在 Airflow 中创建新的 DAG 非常简单。但是,您需要注意许多事项,以确保 DAG 运行或失败不会产生意外结果。

创建自定义操作符/钩子

请遵循我们关于 自定义操作符 的指南。

创建任务

您应该将 Airflow 中的任务视为等同于数据库中的事务。这意味着您永远不应该从任务中产生不完整的结果。例如,不要在任务结束时在 HDFSS3 中生成不完整的数据。

如果任务失败,Airflow 可以重试该任务。因此,任务应该在每次重新运行时产生相同的结果。您可以通过以下几种方法来避免产生不同的结果 -

  • 不要在任务重新运行期间使用 INSERT,INSERT 语句可能会导致数据库中出现重复行。将其替换为 UPSERT。

  • 在特定分区中读取和写入。切勿在任务中读取最新的可用数据。有人可能会在重新运行之间更新输入数据,从而导致不同的输出。更好的方法是从特定分区读取输入数据。您可以使用 data_interval_start 作为分区。在将数据写入 S3/HDFS 时,您也应该遵循这种分区方法。

  • Python datetime now() 函数返回当前的日期时间对象。永远不应该在任务内部使用此函数,尤其是在进行关键计算时,因为它会导致每次运行的结果不同。例如,使用它来生成临时日志是可以的。

提示

您应该在 default_args 中定义重复的参数,例如 connection_id 或 S3 路径,而不是为每个任务声明它们。 default_args 有助于避免错误,例如拼写错误。此外,大多数连接类型在任务中都有唯一的参数名称,因此您可以在 default_args 中只声明一次连接(例如 gcp_conn_id),并且所有使用此连接类型的操作符都会自动使用它。

删除任务

从 DAG 中删除任务时要小心。您将无法在图形视图、网格视图等中看到该任务,这使得从 Web 服务器检查该任务的日志变得困难。如果不需要这样做,请创建一个新的 DAG。

通信

如果您使用的是 Kubernetes 执行器Celery 执行器,则 Airflow 会在不同的服务器上执行 DAG 的任务。因此,您不应该在本地文件系统中存储任何文件或配置,因为下一个任务很可能会在没有访问权限的不同服务器上运行 - 例如,一个下载下一个任务要处理的数据文件的任务。对于 本地 执行器,将文件存储在磁盘上会使重试变得更加困难,例如,您的任务需要一个由 DAG 中的另一个任务删除的配置文件。

如果可能,请使用 XCom 在任务之间传递小消息,而在任务之间传递较大数据的良好方法是使用远程存储,例如 S3/HDFS。例如,如果我们有一个将处理后的数据存储在 S3 中的任务,则该任务可以将输出数据的 S3 路径推送到 Xcom 中,下游任务可以从 XCom 中拉取该路径并使用它来读取数据。

任务也不应存储任何身份验证参数,例如密码或令牌。在任何可能的情况下,请使用 连接 将数据安全地存储在 Airflow 后端,并使用唯一的连接 ID 检索它们。

顶级 Python 代码

您应该避免编写顶级代码,这些代码对于创建操作符和构建它们之间的 DAG 关系不是必需的。这是因为 Airflow 调度器的设计决策以及顶级代码解析速度对 Airflow 性能和可扩展性的影响。

Airflow 调度器以最短 min_file_process_interval 秒的间隔执行操作符 execute 方法之外的代码。这样做是为了允许动态调度 DAG - 其中调度和依赖关系可能会随着时间的推移而改变,并影响 DAG 的下一次调度。Airflow 调度器会尝试持续确保 DAG 中的内容正确反映在已调度的任务中。

特别是,您不应该运行任何数据库访问、繁重的计算和网络操作。

Python 开发人员可能会忽略的一个影响 DAG 加载时间的重要因素是,顶级导入可能需要相当长的时间,并且它们可能会产生大量的开销,而这可以通过将它们转换为 Python 可调用对象内的本地导入来轻松避免,例如。

请考虑以下两个示例。在第一个示例中,与功能等效的第二个示例相比,DAG 将需要额外的 1000 秒来解析,在第二个示例中,expensive_api_call 是从其任务的上下文中执行的。

不避免顶级 DAG 代码

import pendulum

from airflow import DAG
from airflow.decorators import task


def expensive_api_call():
    print("Hello from Airflow!")
    sleep(1000)


my_expensive_response = expensive_api_call()

with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        print(my_expensive_response)

避免顶级 DAG 代码

import pendulum

from airflow import DAG
from airflow.decorators import task


def expensive_api_call():
    sleep(1000)
    return "Hello from Airflow!"


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        my_expensive_response = expensive_api_call()
        print(my_expensive_response)

在第一个示例中,每次解析 DAG 文件时都会执行 expensive_api_call,这将导致 DAG 文件处理的性能欠佳。在第二个示例中,expensive_api_call 仅在任务运行时调用,因此能够在不影响任何性能的情况下进行解析。要亲自测试,请实现第一个 DAG,并在调度器日志中查看“Hello from Airflow!”!

请注意,导入语句也算作顶级代码。因此,如果您有一个导入语句需要很长时间,或者导入的模块本身在顶级执行代码,那么这也可能会影响调度器的性能。以下示例说明了如何处理昂贵的导入。

# It's ok to import modules that are not expensive to load at top-level of a DAG file
import random
import pendulum

# Expensive imports should be avoided as top level imports, because DAG files are parsed frequently, resulting in top-level code being executed.
#
# import pandas
# import torch
# import tensorflow
#

...


@task()
def do_stuff_with_pandas_and_torch():
    import pandas
    import torch

    # do some operations using pandas and torch


@task()
def do_stuff_with_tensorflow():
    import tensorflow

    # do some operations using tensorflow

如何检查我的代码是否是“顶级”代码

为了理解您的代码是否是“顶级”代码,您需要了解很多关于 Python 解析工作原理的复杂性。通常,当 Python 解析 python 文件时,它会执行它看到的代码,但(通常)它不会执行方法的内部代码。

它有一些不明显的特例 - 例如,顶级代码还表示用于确定方法默认值的任何代码。

但是,有一种简单的方法可以检查您的代码是否是“顶级”代码。您只需要解析您的代码,看看这部分代码是否被执行了。

想象一下这段代码

from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum


def get_task_id():
    return "print_array_task"  # <- is that code going to be executed?


def get_array():
    return [1, 2, 3]  # <- is that code going to be executed?


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    operator = PythonOperator(
        task_id=get_task_id(),
        python_callable=get_array,
        dag=dag,
    )

您可以做的是在要检查的代码中添加一些打印语句,然后运行 python <my_dag_file>.py

from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum


def get_task_id():
    print("Executing 1")
    return "print_array_task"  # <- is that code going to be executed? YES


def get_array():
    print("Executing 2")
    return [1, 2, 3]  # <- is that code going to be executed? NO


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    operator = PythonOperator(
        task_id=get_task_id(),
        python_callable=get_array,
        dag=dag,
    )

当您执行该代码时,您将看到

root@cf85ab34571e:/opt/airflow# python /files/test_python.py
Executing 1

这意味着 get_array 不是作为顶级代码执行的,而 get_task_id 则是。

动态 DAG 生成

有时手动编写 DAG 是不切实际的。也许您有很多 DAG 做着类似的事情,只是它们之间的参数不同。或者,您可能需要一组 DAG 来加载表,但不想在每次表更改时手动更新 DAG。在这些情况下以及其他情况下,动态生成 DAG 可能更有用。

避免在上一章中描述的顶级代码中进行过度处理在动态 DAG 配置的情况下尤其重要,动态 DAG 配置基本上可以通过以下一种方式进行配置

  • 通过 环境变量(不要与 Airflow 变量 混淆)

  • 通过外部提供的、生成的 Python 代码,其中包含 DAG 文件夹中的元数据

  • 通过外部提供的、生成的配置元数据文件,该文件位于 DAG 文件夹中

动态 DAG 生成 部分介绍了动态 DAG 生成的一些案例。

Airflow 变量

使用 Airflow 变量会产生网络调用和数据库访问,因此应尽可能避免在 DAG 的顶级 Python 代码中使用它们,如上一章 顶级 Python 代码 中所述。如果必须在顶级 DAG 代码中使用 Airflow 变量,则可以通过 启用实验性缓存 来减轻它们对 DAG 解析的影响,该缓存配置了合理的 ttl

您可以在运算符的 execute() 方法中自由使用 Airflow 变量,但您也可以通过 Jinja 模板将 Airflow 变量传递给现有运算符,这将延迟读取值,直到任务执行。

执行此操作的模板语法是

{{ var.value.<variable_name> }}

或者,如果您需要从变量中反序列化 JSON 对象,则可以使用

{{ var.json.<variable_name> }}

在顶级代码中,使用 Jinja 模板的变量在任务运行之前不会生成请求,而如果未启用缓存,则 Variable.get() 会在每次调度程序解析 dag 文件时生成请求。在未 启用缓存 的情况下使用 Variable.get() 将导致 dag 文件处理性能欠佳。在某些情况下,这可能会导致 dag 文件在完全解析之前超时。

错误示例

from airflow.models import Variable

foo_var = Variable.get("foo")  # AVOID THAT
bash_use_variable_bad_1 = BashOperator(
    task_id="bash_use_variable_bad_1", bash_command="echo variable foo=${foo_env}", env={"foo_env": foo_var}
)

bash_use_variable_bad_2 = BashOperator(
    task_id="bash_use_variable_bad_2",
    bash_command=f"echo variable foo=${Variable.get('foo')}",  # AVOID THAT
)

bash_use_variable_bad_3 = BashOperator(
    task_id="bash_use_variable_bad_3",
    bash_command="echo variable foo=${foo_env}",
    env={"foo_env": Variable.get("foo")},  # AVOID THAT
)

正确示例

bash_use_variable_good = BashOperator(
    task_id="bash_use_variable_good",
    bash_command="echo variable foo=${foo_env}",
    env={"foo_env": "{{ var.value.get('foo') }}"},
)
@task
def my_task():
    var = Variable.get("foo")  # this is fine, because func my_task called only run task, not scan DAGs.
    print(var)

出于安全目的,建议您对包含敏感数据的任何变量使用 Secrets 后端

时间表

避免在时间表代码的顶层使用 Airflow 变量/连接或访问 Airflow 数据库。数据库访问应延迟到 DAG 的执行时间。这意味着您不应将变量/连接检索作为时间表类初始化的参数,也不应在自定义时间表模块的顶层使用变量/连接。

错误示例

from airflow.models.variable import Variable
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something=Variable.get("something"), **kwargs):
        self._something = something
        super().__init__(*args, **kwargs)

正确示例

from airflow.models.variable import Variable
from airflow.timetables.interval import CronDataIntervalTimetable


class CustomTimetable(CronDataIntervalTimetable):
    def __init__(self, *args, something="something", **kwargs):
        self._something = Variable.get(something)
        super().__init__(*args, **kwargs)

更改后触发 DAG

避免在更改 DAG 或 DAG 文件夹中更改的任何其他伴随文件后立即触发 DAG。

您应该给系统足够的时间来处理更改后的文件。这需要几个步骤。首先,必须将文件分发到调度程序 - 通常通过分布式文件系统或 Git-Sync,然后调度程序必须解析 Python 文件并将它们存储在数据库中。根据您的配置、分布式文件系统的速度、文件数量、DAG 数量、文件中的更改数量、文件大小、调度程序数量、CPU 速度,这可能需要几秒钟到几分钟,在极端情况下可能需要几分钟。您应该等待 DAG 出现在 UI 中才能触发它。

如果您发现更新它与它准备好被触发之间存在很长的延迟,您可以查看以下配置参数并根据您的需要对其进行微调(每个参数的详细信息请点击链接)

带有触发规则的观察者模式示例

观察者模式是指我们如何调用一个 DAG,该 DAG 中有一个任务正在“观察”其他任务的状态。它的主要目的是在任何其他任务失败时使 DAG 运行失败。这种需求来自 Airflow 系统测试,这些测试是具有不同任务的 DAG(类似于包含步骤的测试)。

通常,当任何任务失败时,所有其他任务都不会执行,并且整个 DAG 运行也会获得失败状态。但是,当我们使用触发规则时,我们可以中断正在运行的任务的正常流程,并且整个 DAG 可能会呈现出我们期望的不同状态。例如,我们可以有一个拆卸任务(触发规则设置为 TriggerRule.ALL_DONE),无论其他任务的状态如何,该任务都将执行(例如,清理资源)。在这种情况下,DAG 将始终运行此任务,并且 DAG 运行将获得此特定任务的状态,因此我们可能会丢失有关失败任务的信息。如果我们想确保带有拆卸任务的 DAG 在任何任务失败时都会失败,我们需要使用观察者模式。观察者任务是一个在被触发时始终会失败的任务,但它只需要在任何其他任务失败时才被触发。它需要将触发规则设置为 TriggerRule.ONE_FAILED,并且它还需要是 DAG 中所有其他任务的下游任务。因此,如果所有其他任务都通过,则观察者将被跳过,但是当某些任务失败时,观察者任务将被执行并失败,从而使 DAG 运行也失败。

注意

请注意,触发规则仅依赖于直接上游(父)任务,例如,TriggerRule.ONE_FAILED 将忽略不是参数化任务的直接父任务的任何失败(或 upstream_failed)任务。

举个例子更容易理解这个概念。假设我们有以下 DAG

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.exceptions import AirflowException
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule


@task(trigger_rule=TriggerRule.ONE_FAILED, retries=0)
def watcher():
    raise AirflowException("Failing task because one or more upstream tasks failed.")


with DAG(
    dag_id="watcher_example",
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    catchup=False,
) as dag:
    failing_task = BashOperator(task_id="failing_task", bash_command="exit 1", retries=0)
    passing_task = BashOperator(task_id="passing_task", bash_command="echo passing_task")
    teardown = BashOperator(
        task_id="teardown",
        bash_command="echo teardown",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    failing_task >> passing_task >> teardown
    list(dag.tasks) >> watcher()

执行后此 DAG 的可视化表示如下所示

_images/watcher.png

我们有几个服务于不同目的的任务

  • failing_task 总是失败,

  • passing_task 总是成功(如果执行),

  • teardown 总是被触发(无论其他任务的状态如何),并且它应该总是成功,

  • watcher 是每个其他任务的下游任务,即,当任何任务失败时,它将被触发,从而使整个 DAG 运行失败,因为它是一个叶任务。

需要注意的是,如果没有 watcher 任务,则整个 DAG 运行将获得 success 状态,因为唯一失败的任务不是叶任务,并且 teardown 任务将以 success 结束。如果我们希望 watcher 监视所有任务的状态,我们需要使其分别依赖于所有任务。因此,如果任何任务失败,我们可以使 DAG 运行失败。请注意,观察者任务的触发规则设置为 "one_failed"。另一方面,如果没有 teardown 任务,则不需要 watcher 任务,因为 failing_task 会将其 failed 状态传播到下游任务 passed_task,并且整个 DAG 运行也将获得 failed 状态。

在集群策略中使用 AirflowClusterPolicySkipDag 异常跳过特定的 DAG

2.7 版中的新功能。

Airflow DAG 通常可以通过 git-sync 使用 Git 存储库的特定分支进行部署和更新。但是,当您出于某些操作原因必须运行多个 Airflow 集群时,维护多个 Git 分支非常麻烦。特别是,当您需要使用适当的分支策略定期同步两个独立的分支(如 prodbeta)时,您会遇到一些困难。

  • cherry-pick 对于维护 Git 存储库来说太麻烦了。

  • 不建议对 GitOps 使用 hard-reset

因此,您可以考虑将多个 Airflow 集群与同一个 Git 分支(如 main)连接起来,并使用不同的环境变量和具有相同 connection_id 的不同连接配置来维护它们。如果需要,您还可以在集群策略上引发 AirflowClusterPolicySkipDag 异常,以便仅在特定的 Airflow 部署上将特定的 DAG 加载到 DagBag 中。

def dag_policy(dag: DAG):
    """Skipping the DAG with `only_for_beta` tag."""

    if "only_for_beta" in dag.tags:
        raise AirflowClusterPolicySkipDag(
            f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
        )

上面的示例显示了 dag_policy 代码片段,用于根据 DAG 具有的标签跳过 DAG。

降低 DAG 的复杂性

虽然 Airflow 能够很好地处理具有大量任务及其之间依赖关系的大量 DAG,但是当您拥有许多复杂的 DAG 时,它们的复杂性可能会影响调度的性能。为了保持 Airflow 实例的高效运行和充分利用,您应该尽可能地简化和优化您的 DAG - 您必须记住,DAG 解析过程和创建只是在执行 Python 代码,并且由您来使其尽可能高效。没有神奇的配方可以使您的 DAG“不那么复杂” - 因为这是 Python 代码,所以控制代码复杂性的是 DAG 编写者。

没有用于衡量 DAG 复杂性的“指标”,尤其是没有指标可以告诉您您的 DAG 是否“足够简单”。但是,与任何 Python 代码一样,当您的 DAG 代码经过优化后,您绝对可以判断它是“更简单”还是“更快”。如果您想优化您的 DAG,可以采取以下措施

  • 让您的 DAG 加载更快。 这是一条可以以多种方式实现的单一改进建议,但它对调度程序性能的影响最大。 如果您的目标是提高性能,那么只要有机会让您的 DAG 加载更快,就去做吧。 查看顶级 Python 代码,获取有关如何实现此目标的一些技巧。 另请参阅DAG 加载器测试,了解如何评估您的 DAG 加载时间。

  • 让您的 DAG 生成更简单的结构。 每个任务依赖项都会增加调度和执行的额外处理开销。 例如,具有简单线性结构A -> B -> C的 DAG 在任务调度中的延迟将少于具有深度嵌套树结构且依赖任务数量呈指数增长的 DAG。 如果您可以使 DAG 更线性 - 在执行的单个点上,任务中只有少数几个潜在的候选者可以运行,这可能会提高整体调度性能。

  • 每个文件创建更少的 DAG。 虽然 Airflow 2 针对在一个文件中包含多个 DAG 的情况进行了优化,但系统中的一些部分有时会使其性能下降,或者引入比将这些 DAG 拆分到多个文件中更多的延迟。 例如,只有一个文件只能由一个 FileProcessor 解析,这使得它的可扩展性降低。 如果您有许多从一个文件生成的 DAG,并且您观察到在 Airflow 的 UI 中反映 DAG 文件的更改需要很长时间,请考虑拆分它们。

  • 编写高效的 Python 代码。 如上所述,必须在每个文件更少的 DAG 和总体上编写更少的代码之间取得平衡。 创建描述 DAG 的 Python 文件应遵循最佳编程实践,而不是将其视为配置。 如果您的 DAG 共享相似的代码,则不应将它们一遍又一遍地复制到大量几乎相同的源文件中,因为这会导致对相同资源进行大量不必要的重复导入。 相反,您的目标应该是最大限度地减少所有 DAG 中的重复代码,以便应用程序可以高效运行并且易于调试。 有关如何使用类似代码创建多个 DAG,请参阅动态 DAG 生成

测试 DAG

Airflow 用户应将 DAG 视为生产级代码,并且 DAG 应具有各种关联的测试,以确保它们产生预期的结果。 您可以为 DAG 编写各种测试。 让我们来看看其中的一些。

DAG 加载器测试

此测试应确保您的 DAG 不包含在加载时引发错误的代码段。 用户无需编写任何其他代码即可运行此测试。

python your-dag-file.py

在没有任何错误的情况下运行上述命令可确保您的 DAG 不包含任何未安装的依赖项、语法错误等。确保您在与调度程序环境相对应的环境中加载 DAG - 具有相同的依赖项、环境变量、从 DAG 引用的公共代码。

如果您想尝试优化 DAG 加载时间,这也是在优化后检查 DAG 是否加载更快的好方法。 只需运行 DAG 并测量所需的时间,但同样您必须确保您的 DAG 使用相同的依赖项、环境变量、公共代码运行。

有很多方法可以测量处理时间,其中一种在 Linux 环境中是使用内置的time命令。 确保连续运行多次以考虑缓存的影响。 比较优化前后(在相同条件下 - 使用相同的机器、环境等)的结果,以评估优化的影响。

time python airflow/example_dags/example_python_operator.py

结果

real    0m0.699s
user    0m0.590s
sys     0m0.108s

重要的指标是“实际时间” - 它告诉您处理 DAG 花费了多长时间。 请注意,以这种方式加载文件时,您正在启动一个新的解释器,因此在 Airflow 解析 DAG 时不存在初始加载时间。 您可以通过运行以下命令来评估初始化时间

time python -c ''

结果

real    0m0.073s
user    0m0.037s
sys     0m0.039s

在这种情况下,初始解释器启动时间约为 0.07 秒,约为解析上述 example_python_operator.py 所需时间的 10%,因此示例 DAG 的实际解析时间约为 0.62 秒。

您可以查看测试 DAG,详细了解如何测试各个运算符。

单元测试

单元测试可确保您的 DAG 中没有错误代码。 您可以为任务和 DAG 编写单元测试。

用于加载 DAG 的单元测试

import pytest

from airflow.models import DagBag


@pytest.fixture()
def dagbag():
    return DagBag()


def test_dag_loaded(dagbag):
    dag = dagbag.get_dag(dag_id="hello_world")
    assert dagbag.import_errors == {}
    assert dag is not None
    assert len(dag.tasks) == 1

**单元测试 DAG 结构:**这是一个示例测试,希望根据字典对象验证代码生成的 DAG 的结构

def assert_dag_dict_equal(source, dag):
    assert dag.task_dict.keys() == source.keys()
    for task_id, downstream_list in source.items():
        assert dag.has_task(task_id)
        task = dag.get_task(task_id)
        assert task.downstream_task_ids == set(downstream_list)


def test_dag():
    assert_dag_dict_equal(
        {
            "DummyInstruction_0": ["DummyInstruction_1"],
            "DummyInstruction_1": ["DummyInstruction_2"],
            "DummyInstruction_2": ["DummyInstruction_3"],
            "DummyInstruction_3": [],
        },
        dag,
    )

自定义运算符的单元测试

import datetime

import pendulum
import pytest

from airflow import DAG
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)

TEST_DAG_ID = "my_custom_operator_dag"
TEST_TASK_ID = "my_custom_operator_task"


@pytest.fixture()
def dag():
    with DAG(
        dag_id=TEST_DAG_ID,
        schedule="@daily",
        start_date=DATA_INTERVAL_START,
    ) as dag:
        MyCustomOperator(
            task_id=TEST_TASK_ID,
            prefix="s3://bucket/some/prefix",
        )
    return dag


def test_my_custom_operator_execute_no_trigger(dag):
    dagrun = dag.create_dagrun(
        state=DagRunState.RUNNING,
        execution_date=DATA_INTERVAL_START,
        data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
        start_date=DATA_INTERVAL_END,
        run_type=DagRunType.MANUAL,
    )
    ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
    ti.task = dag.get_task(task_id=TEST_TASK_ID)
    ti.run(ignore_ti_state=True)
    assert ti.state == TaskInstanceState.SUCCESS
    # Assert something related to tasks results.

自检

您还可以在 DAG 中实施检查,以确保任务按预期生成结果。 例如,如果您有一个将数据推送到 S3 的任务,则可以在下一个任务中实施检查。 例如,该检查可以确保在 S3 中创建了分区,并执行一些简单的检查以确定数据是否正确。

同样,如果您有一个在 Kubernetes 或 Mesos 中启动微服务的任务,则应使用airflow.providers.http.sensors.http.HttpSensor检查服务是否已启动。

task = PushToS3(...)
check = S3KeySensor(
    task_id="check_parquet_exists",
    bucket_key="s3://bucket/key/foo.parquet",
    poke_interval=0,
    timeout=0,
)
task >> check

暂存环境

如果可能,请保留一个暂存环境,以便在生产环境中部署之前测试完整的 DAG 运行。 确保您的 DAG 已参数化以更改变量,例如 S3 操作的输出路径或用于读取配置的数据库。 不要在 DAG 中硬编码值,然后根据环境手动更改它们。

您可以使用环境变量来参数化 DAG。

import os

dest = os.environ.get("MY_DAG_DEST_PATH", "s3://default-target/path/")

模拟变量和连接

当您为使用变量或连接的代码编写测试时,必须确保它们在运行测试时存在。 显而易见的解决方案是将这些对象保存到数据库中,以便在您的代码执行时可以读取它们。 但是,向数据库读取和写入对象会增加额外的时间开销。 为了加快测试执行速度,值得在不将这些对象保存到数据库的情况下模拟它们的存在。 为此,您可以使用unittest.mock.patch.dict()创建带有模拟os.environ的环境变量。

对于变量,请使用AIRFLOW_VAR_{KEY}

with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
    assert "env-value" == Variable.get("key")

对于连接,请使用AIRFLOW_CONN_{CONN_ID}

conn = Connection(
    conn_type="gcpssh",
    login="cat",
    host="conn-host",
)
conn_uri = conn.get_uri()
with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
    assert "cat" == Connection.get_connection_from_secrets("my_conn").login

元数据数据库维护

随着时间的推移,随着更多 DAG 和任务运行以及事件日志的累积,元数据数据库将增加其存储空间。

您可以使用 Airflow CLI 通过命令airflow db clean清除旧数据。

有关更多详细信息,请参阅db clean 使用方法

升级和降级

备份您的数据库

在执行任何修改数据库的操作之前备份元数据数据库始终是一个明智的想法。

禁用调度程序

您可能会考虑在执行此类维护时禁用 Airflow 集群。

一种方法是将参数[scheduler] > use_job_schedule设置为False并等待任何正在运行的 DAG 完成; 在此之后,除非外部触发,否则不会创建新的 DAG 运行。

一种*更好的*方法(尽管它有点手动)是使用dags pause命令。 您需要跟踪在开始此操作之前已暂停的 DAG,以便您知道在维护完成后要取消暂停哪些 DAG。 首先运行airflow dags list并存储未暂停的 DAG 列表。 然后使用相同的列表在维护之前为每个 DAG 运行dags pause,并在维护之后运行dags unpause。 这样做的好处是,您可以在升级后尝试仅取消暂停一两个 DAG(可能是专用的测试 DAG),以确保一切正常,然后再重新打开所有内容。

添加“集成测试”DAG

添加几个“集成测试”DAG 会很有帮助,这些 DAG 使用您生态系统中的所有常见服务(例如 S3、Snowflake、Vault),但使用测试资源或“开发”帐户。 这些测试 DAG 可以是您在升级后*首先*打开的 DAG,因为如果它们失败了,也没关系,您可以恢复到备份而不会产生负面后果。 但是,如果它们成功了,它们应该证明您的集群能够使用您需要使用的库和服务运行任务。

例如,如果您使用外部密钥后端,请确保您有一个检索连接的任务。 如果您使用 KubernetesPodOperator,请添加一个运行sleep 30; echo "hello"的任务。 如果您需要写入 s3,请在测试任务中执行此操作。 如果您需要访问数据库,请添加一个从服务器执行select 1的任务。

在升级之前修剪数据

某些数据库迁移可能非常耗时。 如果您的元数据数据库非常大,请考虑在执行升级之前使用db clean命令修剪一些旧数据。 *谨慎使用。*

处理冲突/复杂的 Python 依赖项

Airflow 有许多 Python 依赖项,有时 Airflow 依赖项与您的任务代码期望的依赖项冲突。由于默认情况下 Airflow 环境只是一组 Python 依赖项和单个 Python 环境,因此通常也可能出现某些任务需要与其他任务不同的依赖项,并且这些依赖项在这些任务之间基本上发生冲突的情况。

如果您使用预定义的 Airflow Operator 与外部服务通信,则没有太多选择,但通常这些 Operator 的依赖项不会与基本的 Airflow 依赖项冲突。Airflow 使用约束机制,这意味着您有一组“固定”的依赖项,社区保证可以使用这些依赖项安装 Airflow(包括所有社区提供程序)而不会触发冲突。但是,您可以独立升级提供程序,并且它们的约束不会限制您,因此依赖项冲突的可能性较低(您仍然需要测试这些依赖项)。因此,当您使用预定义的 Operator 时,您几乎不会遇到依赖项冲突的问题。

但是,当您以更“现代的方式”使用 Airflow 时,例如您使用 TaskFlow Api 并且您的大多数 Operator 都是使用自定义 Python 代码编写的,或者当您想编写自己的自定义 Operator 时,您可能会遇到这种情况:您的自定义代码所需的依赖项与 Airflow 的依赖项冲突,或者甚至您的多个自定义 Operator 的依赖项之间也存在冲突。

可以使用多种策略来缓解此问题。虽然处理自定义 Operator 中的依赖项冲突很困难,但在使用 airflow.operators.python.PythonVirtualenvOperatorairflow.operators.python.ExternalPythonOperator 时,这实际上要容易得多 - 无论是直接使用经典的“Operator”方法,还是使用装饰有 @task.virtualenv@task.external_python 装饰器的任务(如果您使用 TaskFlow)。

让我们从最容易实现的策略开始(有一些限制和开销),我们将逐步介绍那些需要对您的 Airflow 部署进行一些更改的策略。

使用 PythonVirtualenvOperator

这是最容易使用但限制最多的策略。PythonVirtualenvOperator 允许您动态创建一个虚拟环境,您的 Python 可调用函数将在其中执行。在 使用 TaskFlow 中描述的现代 TaskFlow 方法中,这也可以通过使用 @task.virtualenv 装饰器装饰您的可调用对象来完成(推荐的 Operator 使用方式)。每个 airflow.operators.python.PythonVirtualenvOperator 任务都可以拥有自己独立的 Python 虚拟环境(每次运行任务时都会动态创建),并且可以指定需要为该任务执行安装的细粒度需求集。

Operator 负责

  • 根据您的环境创建虚拟环境

  • 序列化您的 Python 可调用对象,并将其传递给虚拟环境 Python 解释器执行

  • 执行它并检索可调用对象的结果,并在指定的情况下通过 xcom 推送它

Operator 的优点是

  • 无需预先准备虚拟环境。它将在任务运行之前动态创建,并在任务完成后删除,因此除了在您的 Airflow 依赖项中包含 virtualenv 包之外,无需任何特殊操作即可使用多个虚拟环境

  • 您可以在同一工作器上运行具有不同依赖项集的任务 - 因此内存资源被重用(尽管请参阅下文关于创建虚拟环境所涉及的 CPU 开销)。

  • 在更大的安装中,DAG 作者不需要请求任何人为您创建虚拟环境。作为 DAG 作者,您只需要安装 virtualenv 依赖项,就可以根据需要指定和修改环境。

  • 部署要求没有变化 - 无论您使用本地虚拟环境、Docker 还是 Kubernetes,任务都可以在不向部署添加任何内容的情况下工作。

  • 作为 DAG 作者,无需了解有关容器、Kubernetes 的更多信息。以这种方式编写 DAG 只需要了解 Python 需求。

此 Operator 引入了一些限制和开销

  • 您的 Python 可调用对象必须是可序列化的。有许多 Python 对象无法使用标准 pickle 库进行序列化。您可以使用 dill 库来缓解其中一些限制,但即使是该库也无法解决所有序列化限制。

  • Airflow 环境中不可用的所有依赖项都必须在您使用的可调用对象中本地导入,并且 DAG 的顶级 Python 代码不应导入/使用这些库。

  • 虚拟环境在同一操作系统中运行,因此它们不能具有冲突的系统级依赖项(aptyum 可安装软件包)。只有 Python 依赖项可以在这些环境中独立安装。

  • Operator 为运行每个任务增加了 CPU、网络和运行时间开销 - Airflow 必须为每个任务从头开始重新创建虚拟环境

  • 工作器需要能够访问 PyPI 或私有存储库才能安装依赖项

  • 虚拟环境的动态创建容易出现瞬态故障(例如,当您的存储库不可用或在连接存储库时出现网络问题时)

  • 很容易陷入“过于”动态的环境 - 因为您安装的依赖项可能会升级,并且它们的传递依赖项可能会进行独立升级,您最终可能会遇到任务停止工作的情况,因为有人发布了新版本的依赖项,或者您可能会成为“供应链”攻击的受害者,其中新版本的依赖项可能会变成恶意软件

  • 任务仅通过在不同环境中运行来彼此隔离。这使得运行的任务仍然有可能相互干扰 - 例如,在同一工作器上执行的后续任务可能会受到先前任务创建/修改文件等的影响。

您可以在 Taskflow 虚拟环境示例 中查看使用 airflow.operators.python.PythonVirtualenvOperator 的详细示例

使用 ExternalPythonOperator

2.4 版中的新功能。

使用 airflow.operators.python.ExternalPythonOperator` 更复杂一些,但开销、安全性和稳定性问题要少得多。在 使用 TaskFlow 中描述的现代 TaskFlow 方法中,这也可以通过使用 @task.external_python 装饰器装饰您的可调用对象来完成(推荐的 Operator 使用方式)。但是,它要求您有一个预先存在的、不可变的 Python 环境,该环境是预先准备好的。与 airflow.operators.python.PythonVirtualenvOperator 不同,您不能向此类预先存在的环境中添加新的依赖项。如果您在分布式环境中运行 Airflow,则需要在环境中预先添加所有需要的依赖项,并在所有工作器中都可用。

这样,您就避免了重新创建虚拟环境的开销和问题,但必须将它们与 Airflow 安装一起准备和部署。通常,管理 Airflow 安装的人员需要参与其中,而在更大的安装中,这些人通常与 DAG 作者(DevOps/系统管理员)不同。

这些虚拟环境可以通过各种方式准备 - 如果您使用 LocalExecutor,则只需将它们安装在运行调度程序的机器上;如果您使用的是分布式 Celery 虚拟环境安装,则应该有一个管道在多台机器上安装这些虚拟环境;最后,如果您使用的是 Docker 镜像(例如,通过 Kubernetes),则应将虚拟环境创建添加到自定义镜像构建的管道中。

Operator 的优点是

  • 运行任务时没有设置开销。当您开始运行任务时,虚拟环境已准备就绪。

  • 您可以在同一工作器上运行具有不同依赖项集的任务 - 因此所有资源都被重用。

  • 工作器无需访问 PyPI 或私有存储库。由网络导致的瞬态错误的可能性较小。

  • 依赖项可以由管理员和您的安全团队预先审查,不会动态添加新的意外代码。这对安全性和稳定性都有好处。

  • 对部署的影响有限 - 您不需要切换到 Docker 容器或 Kubernetes 就可以充分利用 Operator。

  • 作为 DAG 作者,无需了解有关容器、Kubernetes 的更多信息。以这种方式编写 DAG 只需要了解 Python 需求。

缺点

  • 您的环境需要预先准备好虚拟环境。这通常意味着您无法动态更改它,添加新的或更改需求至少需要重新部署 Airflow,并且当您处理新版本时,迭代时间可能会更长。

  • 您的 Python 可调用对象必须是可序列化的。有许多 Python 对象无法使用标准 pickle 库进行序列化。您可以使用 dill 库来缓解其中一些限制,但即使是该库也无法解决所有序列化限制。

  • Airflow 环境中不可用的所有依赖项都必须在您使用的可调用对象中本地导入,并且 DAG 的顶级 Python 代码不应导入/使用这些库。

  • 虚拟环境在同一操作系统中运行,因此它们不能具有冲突的系统级依赖项(aptyum 可安装软件包)。只有 Python 依赖项可以在这些环境中独立安装

  • 任务仅通过在不同环境中运行来彼此隔离。这使得运行的任务仍然有可能相互干扰 - 例如,在同一工作器上执行的后续任务可能会受到先前任务创建/修改文件等的影响。

您可以将 PythonVirtualenvOperatorExternalPythonOperator 视为对应物 - 它们可以更顺畅地从开发阶段过渡到生产阶段。作为 DAG 作者,您通常会使用 PythonVirtualenvOperator 迭代依赖项并开发 DAG(从而使用 @task.virtualenv 装饰器装饰您的任务),而在迭代和更改之后,您可能希望在生产中将其更改为 ExternalPythonOperator(以及 @task.external_python),在您的 DevOps/系统管理员团队将您的新依赖项部署到生产中预先存在的虚拟环境中之后。这样做的好处是您可以随时切换回装饰器,并使用 PythonVirtualenvOperator“动态地”继续开发它。

您可以在 Taskflow External Python 示例 中查看使用 airflow.operators.python.ExternalPythonOperator 的详细示例。

使用 DockerOperator 或 Kubernetes Pod Operator

另一种策略是使用 airflow.providers.docker.operators.docker.DockerOperator airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator。这些操作符要求 Airflow 可以访问 Docker 引擎或 Kubernetes 集群。

与 Python 操作符的情况类似,如果您想使用这些操作符来执行可调用的 Python 代码,那么 taskflow 装饰器对您来说非常方便。

但是,这涉及的内容要多得多 - 如果您想使用这种方法,您需要了解 Docker/Kubernetes Pod 的工作原理,但任务彼此完全隔离,您甚至不限于运行 Python 代码。您可以使用任何您想要的编程语言编写任务。此外,您的依赖项完全独立于 Airflow 的依赖项(包括系统级依赖项),因此如果您的任务需要非常不同的环境,那么这就是您要走的路。

版本 2.2 中的新功能。

从 Airflow 2.2 版本开始,您可以使用 @task.docker 装饰器使用 DockerOperator 运行您的函数。

2.4 版中的新功能。

从 Airflow 2.2 版本开始,您可以使用 @task.kubernetes 装饰器使用 KubernetesPodOperator 运行您的函数。

使用这些操作符的好处是:

  • 您可以使用不同的 Python 和系统级依赖项集运行任务,甚至可以使用完全不同的语言甚至不同的处理器架构(x86 与 arm)编写的任务。

  • 用于运行任务的环境享有容器的优化和不可变性,其中一组类似的依赖项可以有效地重用图像的多个缓存层,因此该环境针对您具有多个相似但不同的环境的情况进行了优化。

  • 依赖项可以由管理员和您的安全团队预先审查,不会动态添加新的意外代码。这对安全性和稳定性都有好处。

  • 任务之间完全隔离。除了使用标准的 Airflow XCom 机制外,它们不能以其他方式相互影响。

缺点

  • 启动任务会有一定的开销。通常不像动态创建虚拟环境时那么大,但仍然很明显(尤其是对于 KubernetesPodOperator)。

  • 在 TaskFlow 装饰器的情况下,需要序列化整个要调用的方法并将其发送到 Docker 容器或 Kubernetes Pod,并且对方法的大小有系统级限制。在远程端序列化、发送和最终反序列化该方法也会增加开销。

  • 需要多个进程,这会导致资源开销。在这两种操作符的情况下运行任务至少需要两个进程 - 一个进程(在 Docker 容器或 Kubernetes Pod 中运行)执行任务,另一个进程在 Airflow worker 中监督将作业提交到 Docker/Kubernetes 并监视执行。

  • 您的环境需要预先准备好容器镜像。这通常意味着您不能动态地更改它们。添加系统依赖项、修改或更改 Python 需求需要重新构建和发布镜像(通常在您的私有注册表中)。当您处理新的依赖项时,迭代时间通常更长,并且如果开发人员更改了依赖项,则需要他们在迭代期间构建和使用自己的镜像。这里,一个合适的部署管道对于可靠地维护您的部署至关重要。

  • 如果要通过装饰器运行 Python 可调用对象,则它必须是可序列化的,在这种情况下,Airflow 环境中不可用的所有依赖项都必须在您使用的可调用对象中本地导入,并且 DAG 的顶级 Python 代码不应导入/使用这些库。

  • 您需要了解有关 Docker 容器或 Kubernetes 如何工作的更多详细信息。这两个操作符提供的抽象是“泄漏的”,因此您需要更多地了解资源、网络、容器等,以便编写使用这些操作符的 DAG。

您可以在 Taskflow Docker 示例airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator Taskflow Kubernetes 示例 中查看使用 airflow.operators.providers.Docker 的详细示例。

使用多个 Docker 镜像和 Celery 队列

可以使用多个独立的 Docker 镜像运行 Airflow 任务(尽管这需要对 Airflow 部署有深入的了解)。这可以通过将不同的任务分配给不同的队列,并将您的 Celery worker 配置为对不同的队列使用不同的镜像来实现。但是,这(至少目前)需要大量的手动部署配置和对 Airflow、Celery 和 Kubernetes 如何工作的内在了解。此外,它还为运行任务带来了相当大的开销 - 资源重用的机会更少,而且在不影响性能和稳定性的情况下,很难微调这样的部署以降低资源成本。

使它更有用的可能方法之一是 AIP-46 Airflow 任务和 DAG 解析的运行时隔离 以及完成 AIP-43 DAG 处理器分离。在实现这些 AIP 之前,使用这种方法的好处很少,因此不推荐使用。

但是,当这些 AIP 实现后,这将为更多多租户方法打开大门,多个团队将能够拥有完全隔离的依赖项集,这些依赖项将在 DAG 的整个生命周期中使用 - 从解析到执行。

此条目有帮助吗?