Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册以获取早鸟票!

Airflow 101:构建你的第一个工作流

欢迎来到 Apache Airflow 的世界!在本教程中,我们将引导您了解 Airflow 的基本概念,帮助您理解如何编写您的第一个 DAG。无论您是否熟悉 Python,或者刚刚入门,我们都会让这段旅程变得愉快而简单。

什么是 DAG?

其核心是,DAG 是按反映任务关系和依赖关系的方式组织的任务集合。它就像您工作流的路线图,展示了每个任务如何相互连接。如果这听起来有点复杂,请不要担心;我们将逐步分解它。

流水线定义示例

让我们从一个简单的流水线定义示例开始。尽管一开始可能看起来令人不知所措,但我们将详细解释每一行。

src/airflow/example_dags/tutorial.py


import textwrap
from datetime import datetime, timedelta

# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

理解 DAG 定义文件

将 Airflow Python 脚本视为一个配置文件,它用代码定义了 DAG 的结构。您在此处定义的实际任务在不同的环境中运行,这意味着此脚本并非用于数据处理。它的主要作用是定义 DAG 对象,并且需要快速评估,因为 DAG 文件处理器会定期检查它是否有任何更改。

导入模块

要开始使用,我们需要导入必要的库。这是任何 Python 脚本中典型的第一步。

src/airflow/example_dags/tutorial.py

import textwrap
from datetime import datetime, timedelta

# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG


有关 Python 和 Airflow 如何处理模块的更多详细信息,请查看模块管理

设置默认参数

创建 DAG 及其任务时,您可以直接将参数传递给每个任务,也可以在字典中定义一组默认参数。后一种方法通常更高效、更简洁。

src/airflow/example_dags/tutorial.py

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function, # or list of functions
    # 'on_success_callback': some_other_function, # or list of functions
    # 'on_retry_callback': another_function, # or list of functions
    # 'sla_miss_callback': yet_another_function, # or list of functions
    # 'on_skipped_callback': another_function, #or list of functions
    # 'trigger_rule': 'all_success'
},

如果您想深入了解 BaseOperator 的参数,请查看 airflow.sdk.BaseOperator 文档。

创建 DAG

接下来,我们需要创建一个 DAG 对象来容纳我们的任务。我们将为 DAG 提供一个唯一的标识符,称为 dag_id,并指定我们刚刚定义的默认参数。我们还将为我们的 DAG 设置每天运行的调度。

src/airflow/example_dags/tutorial.py

with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

理解 Operator

Operator 在 Airflow 中代表一个工作单元。它们是工作流的构建块,允许您定义将执行哪些任务。虽然我们可以将 Operator 用于许多任务,但 Airflow 也提供了Taskflow API,这是一种更 Python 式的方式来定义工作流,我们稍后将对此进行介绍。

所有 Operator 都派生自 BaseOperator,它包含在 Airflow 中运行任务所需的基本参数。一些流行的 Operator 包括 PythonOperatorBashOperatorKubernetesPodOperator。在本教程中,我们将重点介绍 BashOperator 来执行一些简单的 bash 命令。

定义任务

要使用 Operator,必须将其实例化为一个任务。任务规定了 Operator 将如何在 DAG 的上下文中执行其工作。在下面的示例中,我们两次实例化了 BashOperator 来运行两个不同的 bash 脚本。task_id 作为每个任务的唯一标识符。

src/airflow/example_dags/tutorial.py

t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
)

t2 = BashOperator(
    task_id="sleep",
    depends_on_past=False,
    bash_command="sleep 5",
    retries=3,
)

请注意我们如何将特定于 Operator 的参数(如 bash_command)与从 BaseOperator 继承的通用参数(如 retries)混合使用。这种方法简化了我们的代码。在第二个任务中,我们甚至覆盖了 retries 参数,将其设置为 3

任务参数的优先级如下

  1. 显式传递的参数

  2. default_args 字典中的值

  3. Operator 的默认值(如果可用)


注意

请记住,每个任务都必须包含或继承参数 task_idowner。否则,Airflow 将抛出错误。幸运的是,全新的 Airflow 安装将 owner 默认设置为 airflow,因此您主要需要确保设置了 task_id

使用 Jinja 进行模板化

Airflow 利用了 Jinja 模板化 的强大功能,让您可以访问内置参数和宏来增强您的工作流。本节将向您介绍 Airflow 中模板化的基础知识,重点关注常用的模板变量:{{ ds }},它代表今天的日期戳。

src/airflow/example_dags/tutorial.py

templated_command = textwrap.dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id="templated",
    depends_on_past=False,
    bash_command=templated_command,
)

您会注意到 templated_command 包含 {% %} 块中的逻辑,并引用 {{ ds }} 等参数。您还可以将文件传递给 bash_command,例如 bash_command='templated_command.sh',以便更好地组织代码。您甚至可以定义 user_defined_macrosuser_defined_filters 来创建自己的变量和过滤器用于模板。有关自定义过滤器的更多信息,请参阅 Jinja 文档

有关可在模板中引用的变量和宏的更多信息,请阅读模板参考

添加 DAG 和任务文档

您可以为您的 DAG 或单个任务添加文档。虽然 DAG 文档目前支持 markdown,但任务文档可以是纯文本、markdown、reStructuredText、JSON 或 YAML。一个好的做法是在 DAG 文件的开头包含文档。

src/airflow/example_dags/tutorial.py

t1.doc_md = textwrap.dedent(
    """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](https://imgs.xkcd.com/comics/fixing_problems.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)

dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
"""  # otherwise, type it like this

../_images/task_doc.png

../_images/dag_doc.png

设置依赖关系

在 Airflow 中,任务可以相互依赖。例如,如果您有任务 t1t2t3,您可以通过几种方式定义它们的依赖关系

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

请注意,如果 Airflow 检测到 DAG 中的循环或某个依赖关系被多次引用,它将抛出错误。

处理时区

创建感知时区的 DAG 很简单。只需确保使用 pendulum 处理感知时区日期即可。避免使用标准库中的 timezone,因为它们存在已知限制。

回顾

恭喜!现在,您应该对如何在 Airflow 中创建 DAG、定义任务及其依赖关系以及使用模板有了基本的了解。您的代码应该类似于以下内容

src/airflow/example_dags/tutorial.py


import textwrap
from datetime import datetime, timedelta

# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

测试你的流水线

现在是时候测试您的流水线了!首先,确保您的脚本能成功解析。如果您将代码保存在 airflow.cfg 中指定的 dags 文件夹下的 tutorial.py 中,您可以运行

python ~/airflow/dags/tutorial.py

如果脚本运行无误,恭喜!您的 DAG 已正确设置。

命令行元数据验证

让我们通过运行一些命令进一步验证您的脚本

# initialize the database tables
airflow db migrate

# print the list of active dags
airflow dags list

# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial

# prints the graphviz representation of "tutorial" DAG
airflow dags show tutorial

测试任务实例和 DAG 运行

您可以针对指定的 逻辑日期 测试特定的任务实例。这模拟了调度程序在特定日期和时间运行您的任务。

注意

请注意,调度程序是 针对 特定的日期和时间运行您的任务,而不是一定 该日期或时间运行。逻辑日期 是 DAG 运行的命名所依据的时间戳,它通常对应于您的工作流操作的时间段的 结束 时间 — 或 DAG 运行手动触发的时间。

Airflow 使用此逻辑日期来组织和跟踪每次运行;这是您在 UI、日志和代码中引用特定执行的方式。通过 UI 或 API 触发 DAG 时,您可以提供自己的逻辑日期,以便在特定时间点运行工作流。

# command layout: command subcommand [dag_id] [task_id] [(optional) date]

# testing print_date
airflow tasks test tutorial print_date 2015-06-01

# testing sleep
airflow tasks test tutorial sleep 2015-06-01

您还可以通过运行以下命令查看您的模板是如何渲染的

# testing templated
airflow tasks test tutorial templated 2015-06-01

此命令将提供详细日志并执行您的 bash 命令。

请记住,airflow tasks test 命令在本地运行任务实例,将其日志输出到 stdout,并且不在数据库中跟踪状态。这是测试单个任务实例的便捷方法。

类似地,airflow dags test 运行单个 DAG 运行,而不在数据库中注册任何状态,这对于在本地测试整个 DAG 非常有用。

下一步是什么?

本教程到此结束!您已成功编写并测试了您的第一个 Airflow 流水线。在您继续您的旅程时,可以考虑将您的代码合并到一个配置了 Scheduler 的仓库中,这将允许您的 DAG 每天被触发和执行。

以下是关于您下一步的一些建议

另请参阅

这篇内容有帮助吗?