DAG

DAG(有向无环图)是 Airflow 的核心概念,它收集了任务,并通过依赖关系和关系进行组织,以说明它们应该如何运行。

这是一个基本的示例 DAG

../_images/basic-dag.png

它定义了四个任务 - A、B、C 和 D - 并规定了它们必须运行的顺序,以及哪些任务依赖于其他任务。它还将说明 DAG 的运行频率 - 也许是“从明天开始每 5 分钟一次”,或“自 2020 年 1 月 1 日以来每天一次”。

DAG 本身并不关心任务内部发生什么;它仅仅关注如何执行它们——运行它们的顺序、重试它们的次数、它们是否有超时,等等。

声明 DAG

有三种方法可以声明 DAG——你可以使用上下文管理器,它会将 DAG 隐式添加到其中的任何内容

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 with DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 ):
     EmptyOperator(task_id="task")

或者,你可以使用标准构造函数,将 DAG 传递给使用的任何运算符

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 my_dag = DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 )
 EmptyOperator(task_id="task", dag=my_dag)

或者,你可以使用 @dag 装饰器将函数转换为 DAG 生成器将函数转换为 DAG 生成器

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator


@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")


generate_dag()

没有要运行的任务,DAG 毫无用处,这些任务通常以运算符传感器TaskFlow的形式出现。

任务依赖项

任务/运算符通常不是独立存在的;它依赖于其他任务(它上游的任务),其他任务也依赖于它(它下游的任务)。声明任务之间的这些依赖项构成了 DAG 结构(有向无环图)。

有两种主要方法可以声明单个任务依赖项。建议使用 >><< 运算符

first_task >> [second_task, third_task]
third_task << fourth_task

或者,你还可以使用更明确的 set_upstreamset_downstream 方法

first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)

还有一些快捷方式可以声明更复杂的依赖项。如果你想让两个任务列表相互依赖,则不能使用上述任何一种方法,因此你需要使用 cross_downstream

from airflow.models.baseoperator import cross_downstream

# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])

如果你想将依赖项链接在一起,则可以使用 chain

from airflow.models.baseoperator import chain

# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)

# You can also do it dynamically
chain(*[EmptyOperator(task_id='op' + i) for i in range(1, 6)])

Chain 还可以对大小相同的列表进行成对依赖项(这不同于 cross_downstream 创建的交叉依赖项!)

from airflow.models.baseoperator import chain

# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)

加载 DAG

Airflow 从 Python 源文件中加载 DAG,它在配置的 DAG_FOLDER 中查找这些文件。它将获取每个文件,执行它,然后从该文件中加载任何 DAG 对象。

这意味着您可以在每个 Python 文件中定义多个 DAG,或使用导入将一个非常复杂的 DAG 分散到多个 Python 文件中。

但是,请注意,当 Airflow 从 Python 文件加载 DAG 时,它只会提取顶级的任何对象,这些对象是 DAG 实例。例如,采用此 DAG 文件

dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
    dag_2 = DAG('but_this_dag_will_not')

my_function()

当访问该文件时,虽然两个 DAG 构造函数都将被调用,但只有 dag_1 位于顶级(在 globals() 中),因此只有它被添加到 Airflow。 dag_2 未加载。

注意

DAG_FOLDER 中搜索 DAG 时,Airflow 仅将包含字符串 airflowdag(不区分大小写)的 Python 文件作为优化项考虑在内。

要考虑所有 Python 文件,请禁用 DAG_DISCOVERY_SAFE_MODE 配置标志。

您还可以在 DAG_FOLDER 或其任何子文件夹中提供一个 .airflowignore 文件,该文件描述了加载器要忽略的文件模式。它涵盖了它所在的目录以及其下的所有子文件夹。有关文件语法的详细信息,请参阅下面的 .airflowignore

.airflowignore 无法满足您的需求,并且您希望以更灵活的方式控制 Airflow 是否需要解析 Python 文件的情况下,您可以通过在配置文件中设置 might_contain_dag_callable 来插入您的可调用项。请注意,此可调用项将替换默认的 Airflow 启发式方法,即检查 Python 文件中是否存在字符串 airflowdag(不区分大小写)。

def might_contain_dag(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
    # Your logic to check if there are DAGs defined in the file_path
    # Return True if the file_path needs to be parsed, otherwise False

运行 DAG

DAG 将以两种方式之一运行

  • 当它们被触发,无论是手动触发还是通过 API 触发

  • 在定义的时间表上,该时间表定义为 DAG 的一部分

DAG 不需要时间表,但定义时间表非常常见。您可以通过 schedule 参数进行定义,如下所示

with DAG("my_daily_dag", schedule="@daily"):
    ...

对于 schedule 参数,有各种有效值

with DAG("my_daily_dag", schedule="0 0 * * *"):
    ...

with DAG("my_one_time_dag", schedule="@once"):
    ...

with DAG("my_continuous_dag", schedule="@continuous"):
    ...

提示

有关不同类型调度的更多信息,请参阅 编写和调度

每次运行 DAG 时,您都在创建该 DAG 的新实例,Airflow 称之为 DAG 运行。DAG 运行可以针对同一 DAG 并行运行,并且每个运行都具有定义的数据间隔,用于标识任务应操作的数据周期。

作为此功能有用的原因的一个示例,考虑编写一个处理每日一组实验数据的 DAG。它已被重写,您希望在过去 3 个月的数据上运行它——没问题,因为 Airflow 可以回填DAG 并同时为过去 3 个月中的每一天运行它的副本。

这些 DAG 运行都将在同一天实际启动,但每个 DAG 运行将有一个数据间隔,涵盖该 3 个月期间的某一天,并且该数据间隔是 DAG 中的所有任务、运算符和传感器在运行时查看的所有内容。

与 DAG 在每次运行时实例化为 DAG 运行的方式非常相似,DAG 中指定的 Task 也实例化为 Task 实例

DAG 运行将在其启动时有一个启动日期,并在其结束时有一个结束日期。此周期描述了 DAG 实际“运行”的时间。除了 DAG 运行的开始和结束日期之外,还有另一个称为逻辑日期(正式称为执行日期)的日期,它描述了 DAG 运行计划或触发的预期时间。之所以将其称为逻辑日期,是因为它具有抽象的本质,具有多种含义,具体取决于 DAG 运行本身的上下文。

例如,如果 DAG 运行由用户手动触发,则其逻辑日期将为触发 DAG 运行的日期和时间,并且该值应等于 DAG 运行的开始日期。但是,当 DAG 被自动调度时,并设置了特定的调度间隔,逻辑日期将指示它标记数据间隔开始的时间,其中 DAG 运行的开始日期将是逻辑日期 + 调度间隔。

提示

有关 logical date 的更多信息,请参阅 数据间隔execution_date 的含义是什么?

DAG 分配

请注意,每个操作员/任务都必须分配给一个 DAG 才能运行。Airflow 有几种方法来计算 DAG,而无需您显式传递它

  • 如果您在 with DAG 块中声明您的操作员

  • 如果您在 @dag 装饰器中声明您的操作员

  • 如果您将您的操作员置于具有 DAG 的操作员的上游或下游

否则,您必须使用 dag= 将其传递给每个操作员。

默认参数

通常,DAG 中的许多操作员需要相同的默认参数集(例如它们的 retries)。与其为每个操作员单独指定此项,您可以在创建 DAG 时将 default_args 传递给 DAG,它将自动将它们应用于与之绑定的任何操作员

import pendulum

with DAG(
    dag_id="my_dag",
    start_date=pendulum.datetime(2016, 1, 1),
    schedule="@daily",
    default_args={"retries": 2},
):
    op = BashOperator(task_id="hello_world", bash_command="Hello World!")
    print(op.retries)  # 2

DAG 装饰器

2.0 版中的新增功能。

除了使用上下文管理器或 DAG() 构造函数声明单个 DAG 的传统方法之外,您还可以使用 @dag 装饰函数,将其转换为 DAG 生成器函数

airflow/example_dags/example_dag_decorator.py[源代码]

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def example_dag_decorator(email: str = "[email protected]"):
    """
    DAG to send server IP to email.

    :param email: Email to send IP to. Defaults to [email protected].
    """
    get_ip = GetRequestOperator(task_id="get_ip", url="http://httpbin.org/get")

    @task(multiple_outputs=True)
    def prepare_email(raw_json: dict[str, Any]) -> dict[str, str]:
        external_ip = raw_json["origin"]
        return {
            "subject": f"Server connected from {external_ip}",
            "body": f"Seems like today your server executing Airflow is connected from IP {external_ip}<br>",
        }

    email_info = prepare_email(get_ip.output)

    EmailOperator(
        task_id="send_email", to=email, subject=email_info["subject"], html_content=email_info["body"]
    )


example_dag = example_dag_decorator()

除了作为一种创建 DAG 的新方法之外,该装饰器还会将函数中的任何参数设置为 DAG 参数,让您 在触发 DAG 时设置这些参数。然后,您可以从 Python 代码或 {{ context.params }} 中访问这些参数,这些代码位于 Jinja 模板 中。

注意

Airflow 仅加载 出现在 DAG 文件顶层 的 DAG。这意味着您不能仅仅使用 @dag 声明一个函数 - 您还必须在 DAG 文件中至少调用它一次,并将其分配给顶层对象,如您在上面的示例中所见。

控制流

默认情况下,DAG 仅在所有依赖任务成功时才运行任务。但是,有几种方法可以修改此设置

  • 分支 - 根据条件选择要继续执行的任务

  • 触发规则 - 设置 DAG 运行任务的条件

  • 设置和拆除 - 定义设置和拆除关系

  • 仅最新 - 一种特殊形式的分支,仅在针对当前运行的 DAG 运行

  • 依赖过去 - 任务可以依赖于自己从之前的运行

分支

您可以利用分支来告知 DAG 运行所有依赖任务,而是选择一个或多个路径进行执行。这就是 @task.branch 装饰器发挥作用的地方。

@task.branch 装饰器与 @task 非常相似,不同之处在于它期望装饰的函数返回一个任务 ID(或 ID 列表)。将执行指定的任务,而所有其他路径都将被跳过。它还可以返回 None 以跳过所有下游任务。

Python 函数返回的 task_id 必须引用 @task.branch 装饰的任务直接下游的一个任务。

注意

当一个任务既是分支操作符的下游,又是所选任务之一或多个的下游时,它将不会被跳过。

../_images/branch_note.png

分支任务的路径是 branch_ajoinbranch_b。由于 joinbranch_a 的下游任务,因此它仍将运行,即使它未作为分支决策的一部分返回。

@task.branch 还可以与 XCom 一起使用,允许分支上下文根据上游任务动态决定要遵循哪个分支。例如

@task.branch(task_id="branch_task")
def branch_func(ti=None):
    xcom_value = int(ti.xcom_pull(task_ids="start_task"))
    if xcom_value >= 5:
        return "continue_task"
    elif xcom_value >= 3:
        return "stop_task"
    else:
        return None


start_op = BashOperator(
    task_id="start_task",
    bash_command="echo 5",
    do_xcom_push=True,
    dag=dag,
)

branch_op = branch_func()

continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)

start_op >> branch_op >> [continue_op, stop_op]

如果您希望使用分支功能实现自己的运算符,则可以从 BaseBranchOperator 继承,它的行为类似于 @task.branch 装饰器,但要求您提供方法 choose_branch 的实现。

注意

@task.branch 装饰器比在 DAG 中直接实例化 BranchPythonOperator 更为推荐。后者通常只应被子类化以实现自定义运算符。

@task.branch 的可调用对象一样,此方法可以返回下游任务的 ID 或任务 ID 列表,这些任务将被运行,而所有其他任务都将被跳过。它还可以返回 None 以跳过所有下游任务

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        """
        Run an extra branch on the first day of the month
        """
        if context['data_interval_start'].day == 1:
            return ['daily_task_id', 'monthly_task_id']
        elif context['data_interval_start'].day == 2:
            return 'daily_task_id'
        else:
            return None

类似于常规 Python 代码的 @task.branch 装饰器,还有一些分支装饰器,它们使用名为 @task.branch_virtualenv 的虚拟环境或名为 @task.branch_external_python 的外部 python。

仅最新

Airflow 的 DAG 运行通常针对的日期与当前日期不同 - 例如,为上个月的每一天运行一个 DAG 副本以回填一些数据。

但是,在某些情况下,您希望让 DAG 的某些(或全部)部分针对以前日期运行;在这种情况下,您可以使用 LatestOnlyOperator

如果您不在“最新”DAG 运行中(如果当前的时钟时间介于其 execution_time 和下一个计划的 execution_time 之间,并且它不是外部触发的运行),此特殊操作符将跳过其下游的所有任务。

以下是一个示例

airflow/example_dags/example_latest_only_with_trigger.py[源代码]

import datetime

import pendulum

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id="latest_only_with_trigger",
    schedule=datetime.timedelta(hours=4),
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example3"],
) as dag:
    latest_only = LatestOnlyOperator(task_id="latest_only")
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.ALL_DONE)

    latest_only >> task1 >> [task3, task4]
    task2 >> [task3, task4]

对于此 DAG

  • task1latest_only 的直接下游,并且除了最新版本之外,所有运行都将跳过它。

  • task2 完全独立于 latest_only,并且将在所有计划周期中运行

  • task3task1task2 的下游,并且由于默认 触发规则all_success,因此将从 task1 接收级联跳过。

  • task4task1task2 的下游,但它不会被跳过,因为它的 trigger_rule 设置为 all_done

../_images/latest_only_with_trigger.png

依赖于过去

您还可以说,只有当上一个 DAG 运行中的任务成功运行后,任务才能运行。要使用此功能,您只需将任务上的 depends_on_past 参数设置为 True 即可。

请注意,如果您在 DAG 生命周期的开始阶段(具体来说,是其第一个自动运行)运行 DAG,则任务仍将运行,因为没有上一个运行可依赖。

触发规则

默认情况下,Airflow 将等待任务的所有上游(直接父级)任务成功,然后才运行该任务。

但是,这只是默认行为,您可以使用任务的 trigger_rule 参数对其进行控制。 trigger_rule 的选项为

  • all_success(默认):所有上游任务都已成功

  • all_failed:所有上游任务都处于 failedupstream_failed 状态

  • all_done:所有上游任务都已完成执行

  • all_skipped:所有上游任务都处于 skipped 状态

  • one_failed:至少有一个上游任务失败(不等待所有上游任务完成)

  • one_success:至少有一个上游任务成功(不等待所有上游任务完成)

  • one_done:至少有一个上游任务成功或失败

  • none_failed:所有上游任务均未失败上游失败 - 即,所有上游任务均已成功或已跳过

  • none_failed_min_one_success:所有上游任务均未失败上游失败,且至少有一个上游任务已成功。

  • none_skipped:没有上游任务处于已跳过状态 - 即,所有上游任务均处于成功失败上游失败状态

  • always:没有任何依赖项,随时运行此任务

如果您愿意,还可以将其与取决于过去功能结合使用。

注意

了解触发规则与已跳过任务之间的交互非常重要,尤其是作为分支操作的一部分而被跳过的任务。几乎从不希望在分支操作的下游使用 all_success 或 all_failed。

已跳过的任务将通过触发规则all_successall_failed级联,并导致它们也被跳过。考虑以下 DAG

# dags/branch_without_trigger.py
import pendulum

from airflow.decorators import task
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator

dag = DAG(
    dag_id="branch_without_trigger",
    schedule="@once",
    start_date=pendulum.datetime(2019, 2, 28, tz="UTC"),
)

run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)


@task.branch(task_id="branching")
def do_branching():
    return "branch_a"


branching = do_branching()

branch_a = EmptyOperator(task_id="branch_a", dag=dag)
follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)

branch_false = EmptyOperator(task_id="branch_false", dag=dag)

join = EmptyOperator(task_id="join", dag=dag)

run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join

join位于follow_branch_abranch_false的下游。join任务将显示为已跳过,因为其trigger_rule默认设置为all_success,并且由分支操作导致的跳过会级联跳过标记为all_success的任务。

../_images/branch_without_trigger.png

通过在join任务中将trigger_rule设置为none_failed_min_one_success,我们可以获得预期的行为

../_images/branch_with_trigger.png

设置和拆除

在数据工作流中,通常会创建资源(例如计算资源),使用它来完成一些工作,然后将其拆除。Airflow 提供设置和拆除任务来支持此需求。

有关如何使用此功能的详细信息,请参阅主文章 设置和拆除

动态 DAG

由于 DAG 由 Python 代码定义,因此无需使其完全声明性;您可以自由使用循环、函数等来定义 DAG。

例如,这是一个使用 for 循环来定义一些任务的 DAG

 with DAG("loop_example", ...):
     first = EmptyOperator(task_id="first")
     last = EmptyOperator(task_id="last")

     options = ["branch_a", "branch_b", "branch_c", "branch_d"]
     for option in options:
         t = EmptyOperator(task_id=option)
         first >> t >> last

一般来说,我们建议您尝试保持 DAG 任务的拓扑(布局)相对稳定;动态 DAG 通常更适合用于动态加载配置选项或更改操作符选项。

DAG 可视化

如果您想查看 DAG 的可视化表示,您有两个选择

  • 您可以加载 Airflow UI,导航到您的 DAG,然后选择“图形”

  • 您可以运行 airflow dags show,它会将其呈现为图像文件

我们通常建议您使用图形视图,因为它还会向您显示您在所选 DAG 运行中所有 任务实例 的状态。

当然,随着您开发 DAG,它们会变得越来越复杂,因此我们提供了一些方法来修改这些 DAG 视图,以便更易于理解。

任务组

任务组可用于在图形视图中将任务组织成层次组。它对于创建重复模式和减少视觉混乱非常有用。

子 DAG 不同,任务组纯粹是 UI 分组概念。任务组中的任务位于同一原始 DAG 上,并遵循所有 DAG 设置和池配置。

../_images/task_group.gif

依赖关系可以使用 >><< 操作符应用到任务组中的所有任务。例如,以下代码将 task1task2 放入任务组 group1 中,然后将这两个任务放在 task3 的上游

 from airflow.decorators import task_group


 @task_group()
 def group1():
     task1 = EmptyOperator(task_id="task1")
     task2 = EmptyOperator(task_id="task2")


 task3 = EmptyOperator(task_id="task3")

 group1() >> task3

TaskGroup 也支持 default_args,类似于 DAG,它将覆盖 DAG 级别中的 default_args

import datetime

from airflow import DAG
from airflow.decorators import task_group
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="dag1",
    start_date=datetime.datetime(2016, 1, 1),
    schedule="@daily",
    default_args={"retries": 1},
):

    @task_group(default_args={"retries": 3})
    def group1():
        """This docstring will become the tooltip for the TaskGroup."""
        task1 = EmptyOperator(task_id="task1")
        task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2)
        print(task1.retries)  # 3
        print(task2.retries)  # 2

如果您希望看到 TaskGroup 的更高级用法,可以查看随 Airflow 提供的 example_task_group_decorator.py 示例 DAG。

注意

默认情况下,子任务/TaskGroup 的 ID 以其父 TaskGroup 的 group_id 为前缀。这有助于确保整个 DAG 中 group_id 和 task_id 的唯一性。

要禁用前缀,请在创建 TaskGroup 时传递 prefix_group_id=False,但请注意,您现在将负责确保每个任务和组都有自己唯一的 ID。

注意

使用 @task_group 装饰器时,装饰函数的文档字符串将用作 UI 中的 TaskGroup 工具提示,但前提是未明确提供 tooltip 值。

边缘标签

除了将任务分组到组中之外,您还可以在图形视图中标记不同任务之间的依赖关系边缘 - 这对于 DAG 的分支区域特别有用,因此您可以标记某些分支可能运行的条件。

要添加标签,您可以直接使用 >><< 运算符内联使用它们

from airflow.utils.edgemodifier import Label

my_task >> Label("When empty") >> other_task

或者,您可以将 Label 对象传递给 set_upstream/set_downstream

from airflow.utils.edgemodifier import Label

my_task.set_downstream(other_task, Label("When empty"))

以下是一个示例 DAG,说明了如何标记不同的分支

../_images/edge_label_example.png

airflow/example_dags/example_branch_labels.py[源代码]


with DAG(
    "example_branch_labels",
    schedule="@daily",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
) as dag:
    ingest = EmptyOperator(task_id="ingest")
    analyse = EmptyOperator(task_id="analyze")
    check = EmptyOperator(task_id="check_integrity")
    describe = EmptyOperator(task_id="describe_integrity")
    error = EmptyOperator(task_id="email_error")
    save = EmptyOperator(task_id="save")
    report = EmptyOperator(task_id="report")

    ingest >> analyse >> check
    check >> Label("No errors") >> save >> report
    check >> Label("Errors found") >> describe >> error >> report

DAG 和任务文档

可以向 DAG 和任务对象添加文档或注释,这些文档或注释在 Web 界面中可见(DAG 的“图形”和“树”,任务的“任务实例详细信息”)。

有一组特殊任务属性,如果已定义,则会呈现为丰富的内容

属性

呈现为

doc

等宽字体

doc_json

json

doc_yaml

yaml

doc_md

markdown

doc_rst

reStructuredText

请注意,对于 DAG,doc_md 是唯一解释的属性。对于 DAG,它可以包含字符串或对模板文件的引用。模板引用由以 .md 结尾的字符串识别。如果提供了相对路径,它将从 DAG 文件的文件夹开始。此外,模板文件必须存在,否则 Airflow 将抛出 jinja2.exceptions.TemplateNotFound 异常。

如果您的任务是从配置文件动态构建的,这将特别有用,因为它允许您公开导致 Airflow 中相关任务的配置

"""
### My great DAG
"""
import pendulum

dag = DAG(
    "my_dag",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule="@daily",
    catchup=False,
)
dag.doc_md = __doc__

t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""

子 DAG

注意

子 DAG 已弃用,因此 TaskGroup 始终是首选。

有时,您会发现您会定期向每个 DAG 添加完全相同的任务集,或者您希望将大量任务分组到一个单一的逻辑单元中。这就是子 DAG 的用途。

例如,这里有一个 DAG,它在两个部分中有很多并行任务

../_images/subdag_before.png

我们可以将所有并行的 task-* 运算符合并到一个子 DAG 中,这样生成的 DAG 类似于以下内容

../_images/subdag_after.png

请注意,子 DAG 运算符应包含返回 DAG 对象的工厂方法。这将防止子 DAG 在主 UI 中被视为单独的 DAG - 请记住,如果 Airflow 在 Python 文件的顶层看到一个 DAG,它将 将其加载为其自己的 DAG。例如

airflow/example_dags/subdags/subdag.py[源代码]

import pendulum

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator


def subdag(parent_dag_name, child_dag_name, args) -> DAG:
    """
    Generate a DAG to be used as a subdag.

    :param str parent_dag_name: Id of the parent DAG
    :param str child_dag_name: Id of the child DAG
    :param dict args: Default arguments to provide to the subdag
    :return: DAG to use as a subdag
    """
    dag_subdag = DAG(
        dag_id=f"{parent_dag_name}.{child_dag_name}",
        default_args=args,
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
        schedule="@daily",
    )

    for i in range(5):
        EmptyOperator(
            task_id=f"{child_dag_name}-task-{i + 1}",
            default_args=args,
            dag=dag_subdag,
        )

    return dag_subdag


然后可以在您的主 DAG 文件中引用此子 DAG

airflow/example_dags/example_subdag_operator.py[源代码]

import datetime

from airflow.example_dags.subdags.subdag import subdag
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = "example_subdag_operator"

with DAG(
    dag_id=DAG_NAME,
    default_args={"retries": 2},
    start_date=datetime.datetime(2022, 1, 1),
    schedule="@once",
    tags=["example"],
) as dag:
    start = EmptyOperator(
        task_id="start",
    )

    section_1 = SubDagOperator(
        task_id="section-1",
        subdag=subdag(DAG_NAME, "section-1", dag.default_args),
    )

    some_other_task = EmptyOperator(
        task_id="some-other-task",
    )

    section_2 = SubDagOperator(
        task_id="section-2",
        subdag=subdag(DAG_NAME, "section-2", dag.default_args),
    )

    end = EmptyOperator(
        task_id="end",
    )

    start >> section_1 >> some_other_task >> section_2 >> end

你可以从主 DAG 的图形视图中放大 SubDagOperator 以显示 SubDAG 中包含的任务

../_images/subdag_zoom.png

使用 SubDAG 时的一些其他提示

  • 根据惯例,SubDAG 的 dag_id 应以其父 DAG 的名称和一个点作为前缀 (parent.child)

  • 你应通过向 SubDAG 运算符传递参数(如上所示)来在主 DAG 和 SubDAG 之间共享参数

  • SubDAG 必须有计划并启用。如果 SubDAG 的计划设置为 None@once,则 SubDAG 将在未执行任何操作的情况下成功。

  • 清除 SubDagOperator 也将清除其中任务的状态。

  • SubDagOperator 上标记成功不会影响其中任务的状态。

  • 避免在 SubDAG 中的任务中使用 依赖于过去,因为这可能会造成混淆。

  • 你可以为 SubDAG 指定一个执行器。如果你想在进程中运行 SubDAG 并有效地将其并行性限制为 1,则通常使用 SequentialExecutor。使用 LocalExecutor 可能会出现问题,因为它可能会过度订阅你的工作进程,在单个插槽中运行多个任务。

请参阅 airflow/example_dags 以获取演示。

注意

SubDagOperator 不遵守 并行性,因此 SubdagOperator 可能会消耗超出你可能设置的任何限制的资源。

任务组与子 DAG

子 DAG 虽然与任务组具有类似的目的,但由于其实现,引入了性能和功能问题。

  • SubDagOperator 启动一个 BackfillJob,它会忽略现有的并行配置,从而可能过度订阅工作器环境。

  • 子 DAG 具有自己的 DAG 属性。当子 DAG DAG 属性与其父 DAG 不一致时,可能会出现意外行为。

  • 无法在一个视图中看到“完整”的 DAG,因为子 DAG 作为一个完全成熟的 DAG 存在。

  • 子 DAG 引入了各种边缘情况和注意事项。这可能会破坏用户体验和期望。

另一方面,任务组是一个更好的选择,因为它纯粹是一个 UI 分组概念。任务组中的所有任务仍然像任务组外的任何其他任务一样运行。

你可以看到这两个构造之间的核心差异。

任务组

子 DAG

重复模式作为同一 DAG 的一部分

重复模式作为单独的 DAG

DAG 的一组视图和统计信息

父 DAG 和子 DAG 之间的一组独立的视图和统计信息

一组 DAG 配置

多组 DAG 配置

通过现有的 SchedulerJob 尊重并行配置

由于新产生的 BackfillJob,不尊重并行配置

使用上下文管理器进行简单的构造声明

具有命名限制的复杂 DAG 工厂

打包 DAG

虽然更简单的 DAG 通常仅在一个 Python 文件中,但更复杂的 DAG 可能分布在多个文件中并具有应该与它们一起发布的依赖项(“供应商”)的情况并不少见。

你可以将所有内容都放在标准文件系统布局的 DAG_FOLDER 中,也可以将 DAG 及其所有 Python 文件打包为一个 zip 文件。例如,你可以将两个 DAG 以及它们需要的依赖项打包为一个 zip 文件,内容如下

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

请注意,打包的 DAG 有一些注意事项

  • 如果启用了序列化,则无法使用它们

  • 它们不能包含编译库(例如 libz.so),只能包含纯 Python

  • 它们将被插入 Python 的 sys.path 中,并且可以通过 Airflow 进程中的任何其他代码导入,因此请确保包名称不会与系统上已安装的其他包冲突。

一般来说,如果你有一组复杂的编译依赖项和模块,那么最好使用 Python virtualenv 系统,并使用 pip 在目标系统上安装必要的包。

.airflowignore

.airflowignore 文件指定了 DAG_FOLDERPLUGINS_FOLDER 中 Airflow 应有意忽略的目录或文件。Airflow 支持文件中模式的两种语法风格,如 DAG_IGNORE_FILE_SYNTAX 配置参数(在 Airflow 2.3 中添加)所指定:regexpglob

注意

默认的 DAG_IGNORE_FILE_SYNTAXregexp,以确保向后兼容性。

对于 regexp 模式语法(默认),.airflowignore 中的每一行都指定一个正则表达式模式,并且名称(不是 DAG ID)与任何模式匹配的目录或文件都将被忽略(本质上,Pattern.search() 用于匹配模式)。使用 # 字符表示注释;以 # 开头的行上的所有字符都将被忽略。

与 Airflow 中大多数正则表达式匹配一样,正则表达式引擎是 re2,它明确不支持许多高级功能,请查看其 文档 了解更多信息。

使用 glob 语法,模式的工作方式就像 .gitignore 文件中的模式一样

  • 字符 * 将匹配任意数量的字符,除了 /

  • 字符 ? 将匹配任何单个字符,除了 /

  • 范围符号,例如 [a-zA-Z],可用于匹配范围内的某个字符

  • 可以通过添加前缀 ! 来否定模式。模式按顺序评估,因此否定可以覆盖同一文件中先前定义的模式或父目录中定义的模式。

  • 双星 (**) 可用于跨目录匹配。例如,**/__pycache__/ 将忽略每个子目录中无限深度的 __pycache__ 目录。

  • 如果模式的开头或中间(或两者)有 /,则该模式相对于 .airflowignore 文件本身的目录级别。否则,该模式还可以在 .airflowignore 级别以下的任何级别匹配。

应该将 .airflowignore 文件放在 DAG_FOLDER 中。例如,可以使用 regexp 语法准备一个 .airflowignore 文件,内容如下

project_a
tenant_[\d]

或者,等效地,使用 glob 语法

**/*project_a*
tenant_[0-9]*

然后, DAG_FOLDER 中的 project_a_dag_1.pyTESTING_project_a.pytenant_1.pyproject_a/dag_1.pytenant_1/dag_1.py 等文件将被忽略(如果目录的名称与任何模式匹配,则 Airflow 根本不会扫描此目录及其所有子文件夹。这提高了查找 DAG 的效率)。

一个 .airflowignore 文件的作用域是它所在的目录及其所有子文件夹。你还可以为 DAG_FOLDER 中的子文件夹准备 .airflowignore 文件,它仅适用于该子文件夹。

DAG 依赖

在 Airflow 2.1 中添加

虽然 DAG 中任务之间的依赖关系通过上游和下游关系明确定义,但 DAG 之间的依赖关系则稍微复杂一些。通常,一个 DAG 可以通过以下两种方式依赖另一个 DAG

另一个困难在于,一个 DAG 可以等待或触发另一个 DAG 的多次运行,并且数据间隔不同。DAG 依赖视图 菜单 -> 浏览 -> DAG 依赖 有助于可视化 DAG 之间的依赖关系。依赖关系由调度程序在 DAG 序列化期间计算,Web 服务器使用它们来构建依赖关系图。

依赖关系检测器是可配置的,因此你可以实现与 DependencyDetector 中的默认值不同的逻辑

DAG 暂停、停用和删除

在“未运行”时,DAG 具有多种状态。可以暂停、停用 DAG,最后可以删除 DAG 的所有元数据。

当 DAG 存在于 DAGS_FOLDER 中时,可以通过 UI 暂停,并且调度程序将它存储在数据库中,但用户选择通过 UI 禁用它。“暂停”和“取消暂停”操作可以通过 UI 和 API 获得。已暂停的 DAG 不会由调度程序调度,但您可以通过 UI 触发它们以进行手动运行。在 UI 中,您可以看到已暂停的 DAG(在 Paused 选项卡中)。可以在 Active 选项卡中找到已取消暂停的 DAG。暂停 DAG 时,允许任何正在运行的任务完成,并且所有下游任务都将置于“已调度”状态。取消暂停 DAG 时,任何“已调度”任务都将根据 DAG 逻辑开始运行。没有“已调度”任务的 DAG 将根据其时间表开始运行。

可以通过从 DAGS_FOLDER 中删除 DAG 来停用 DAG(不要将其与 UI 中的 Active 标记混淆)。当调度程序解析 DAGS_FOLDER 并错过了它之前看到并存储在数据库中的 DAG 时,它将被设置为已停用。停用的 DAG 的元数据和历史记录将被保留,当 DAG 重新添加到 DAGS_FOLDER 中时,它将再次被激活,并且历史记录将可见。您无法通过 UI 或 API 激活/停用 DAG,这只能通过从 DAGS_FOLDER 中删除文件来完成。再次强调 - 当调度程序停用 DAG 时,不会丢失其历史运行的任何数据。请注意,Airflow UI 中的 Active 选项卡指的是既不是 Activated 也不是 Not paused 的 DAG,因此这最初可能会有点令人困惑。

您无法在 UI 中看到已停用的 DAG - 有时您可能会看到历史运行,但当您尝试查看有关这些运行的信息时,您会看到 DAG 丢失的错误。

您还可以使用 UI 或 API 从元数据数据库中删除 DAG 元数据,但它并不总是会导致 DAG 从 UI 中消失,这最初也可能有点令人困惑。如果您在删除元数据时 DAG 仍位于 DAGS_FOLDER 中,则 DAG 将重新出现,因为调度程序将解析该文件夹,只会删除 DAG 的历史运行信息。

所有这些意味着,如果您实际上想要删除 DAG 及其所有历史元数据,则需要分三步进行

  • 暂停 DAG

  • 通过 UI 或 API 从数据库中删除历史元数据

  • DAGS_FOLDER 中删除 DAG 文件,并等到它变为非活动状态

DAG 自动暂停(实验性)

还可以将 DAG 配置为自动暂停。有一个 Airflow 配置,如果 DAG 连续失败 N 次,则允许自动禁用该 DAG。

我们还可以提供并覆盖 DAG 参数中的这些配置

此条目是否有帮助?