基本概念

本教程将引导您了解一些基本的 Airflow 概念、对象及其在编写第一个 DAG 时的用法。

示例管道定义

以下是基本管道定义的示例。如果这看起来很复杂,请不要担心,下面将逐行进行解释。

airflow/example_dags/tutorial.py[源代码]


import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["[email protected]"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

它是一个 DAG 定义文件

需要理解的一件事是(一开始可能不是每个人都觉得直观),这个 Airflow Python 脚本实际上只是一个配置文件,它以代码的形式指定了 DAG 的结构。这里定义的实际任务将在与此脚本不同的上下文中运行。不同的任务在不同的时间点在不同的工作器上运行,这意味着此脚本不能用于在任务之间进行交叉通信。请注意,为此,我们有一个更高级的功能,称为 XComs

人们有时认为 DAG 定义文件是他们可以进行一些实际数据处理的地方 - 根本不是这样!该脚本的目的是定义一个 DAG 对象。它需要快速评估(以秒为单位,而不是分钟),因为调度程序将定期执行它以反映任何更改。

导入模块

Airflow 管道只是一个碰巧定义了 Airflow DAG 对象的 Python 脚本。让我们从导入我们将需要的库开始。

airflow/example_dags/tutorial.py[源代码]

import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理

默认参数

我们将要创建一个 DAG 和一些任务,我们可以选择显式地将一组参数传递给每个任务的构造函数(这将变得冗余),或者(更好的是!)我们可以定义一个默认参数字典,我们可以在创建任务时使用它。

airflow/example_dags/tutorial.py[源代码]

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
    "depends_on_past": False,
    "email": ["[email protected]"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function, # or list of functions
    # 'on_success_callback': some_other_function, # or list of functions
    # 'on_retry_callback': another_function, # or list of functions
    # 'sla_miss_callback': yet_another_function, # or list of functions
    # 'on_skipped_callback': another_function, #or list of functions
    # 'trigger_rule': 'all_success'
},

有关 BaseOperator 参数及其作用的更多信息,请参阅 airflow.models.baseoperator.BaseOperator 文档。

此外,请注意,您可以轻松地定义不同的参数集,以服务于不同的目的。例如,在生产环境和开发环境之间使用不同的设置。

实例化 DAG

我们需要一个 DAG 对象来嵌套我们的任务。在这里,我们传递一个字符串,该字符串定义了 dag_id,它充当 DAG 的唯一标识符。我们还传递了我们刚刚定义的默认参数字典,并为 DAG 定义了 1 天的 schedule

airflow/example_dags/tutorial.py[源代码]

with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["[email protected]"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

操作符

操作符定义了 Airflow 要完成的工作单元。使用操作符是在 Airflow 中定义工作的经典方法。对于某些用例,最好使用 TaskFlow API 在 Pythonic 上下文中定义工作,如 使用 TaskFlow 中所述。现在,使用操作符有助于可视化我们 DAG 代码中的任务依赖关系。

所有操作符都继承自 BaseOperator,它包括在 Airflow 中运行工作所需的所有参数。从这里开始,每个操作符都包含用于其正在完成的工作类型的唯一参数。一些最受欢迎的操作符是 PythonOperator、BashOperator 和 KubernetesPodOperator。

Airflow 根据您传递给操作符的参数完成工作。在本教程中,我们使用 BashOperator 来运行一些 bash 脚本。

任务

要在 DAG 中使用操作符,您必须将其实例化为一个任务。任务确定如何在 DAG 的上下文中执行操作符的工作。

在以下示例中,我们将 BashOperator 实例化为两个独立的任务,以便运行两个独立的 bash 脚本。每个实例化的第一个参数 task_id 充当任务的唯一标识符。

airflow/example_dags/tutorial.py[源代码]

t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
)

t2 = BashOperator(
    task_id="sleep",
    depends_on_past=False,
    bash_command="sleep 5",
    retries=3,
)

请注意,我们如何将操作符特定参数(bash_command)和从 BaseOperator 继承的所有操作符共有的参数(retries)的混合传递给操作符的构造函数。这比为每个构造函数调用传递每个参数更简单。另外,请注意,在第二个任务中,我们使用 3 覆盖了 retries 参数。

任务的优先级规则如下

  1. 显式传递的参数

  2. default_args 字典中存在的值

  3. 操作符的默认值(如果存在)

注意

任务必须包含或继承参数 task_idowner,否则 Airflow 将引发异常。全新安装的 Airflow 将为 owner 设置默认值“airflow”,因此您实际上只需要确保 task_id 有一个值即可。

使用 Jinja 进行模板化

Airflow 利用了 Jinja 模板 的强大功能,并为管道作者提供了一组内置参数和宏。Airflow 还为管道作者提供了定义自己的参数、宏和模板的钩子。

本教程只是简单介绍了您可以在 Airflow 中使用模板化完成的工作,但本节的目标是让您了解此功能的存在,熟悉双大括号,并指向最常见的模板变量:{{ ds }}(今天的“日期戳”)。

airflow/example_dags/tutorial.py[源代码]

templated_command = textwrap.dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id="templated",
    depends_on_past=False,
    bash_command=templated_command,
)

请注意,templated_command{% %} 块中包含代码逻辑,引用了 {{ ds }} 等参数,并调用了 {{ macros.ds_add(ds, 7)}} 中的函数。

文件也可以传递给 bash_command 参数,例如 bash_command='templated_command.sh',其中文件位置相对于包含管道文件(在本例中为 tutorial.py)的目录。出于多种原因,这可能是可取的,例如分离脚本的逻辑和管道代码,允许在由不同语言组成的文件中进行正确的代码突出显示,以及在构建管道时具有通用灵活性。也可以在 DAG 构造函数调用中将 template_searchpath 定义为指向文件夹位置。

使用相同的 DAG 构造函数调用,可以定义 user_defined_macros,它允许您指定自己的变量。例如,将 dict(foo='bar') 传递给此参数允许您在模板中使用 {{ foo }}。此外,指定 user_defined_filters 允许您注册自己的过滤器。例如,将 dict(hello=lambda name: 'Hello %s' % name) 传递给此参数允许您在模板中使用 {{ 'world' | hello }}。有关自定义过滤器的更多信息,请查看 Jinja 文档

有关可以在模板中引用的变量和宏的更多信息,请务必通读 模板参考

添加 DAG 和任务文档

我们可以为 DAG 或每个任务添加文档。目前,DAG 文档仅支持 markdown 格式,而任务文档支持纯文本、markdown、reStructuredText、json 和 yaml 格式。DAG 文档可以作为文档字符串写在 DAG 文件的开头(推荐),也可以写在文件的任何其他位置。您可以在下面找到一些关于如何实现任务和 DAG 文档的示例,以及屏幕截图。

airflow/example_dags/tutorial.py[源代码]

t1.doc_md = textwrap.dedent(
    """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](https://imgs.xkcd.com/comics/fixing_problems.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)

dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
"""  # otherwise, type it like this
../_images/task_doc.png ../_images/dag_doc.png

设置依赖关系

我们有任务 t1t2t3,它们之间相互依赖。以下是一些定义它们之间依赖关系的方法。

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

请注意,在执行脚本时,如果 Airflow 在 DAG 中发现循环或引用了多次依赖关系,则会引发异常。

使用时区

创建一个时区感知的 DAG 非常简单。只需确保使用 pendulum 提供时区感知的日期。不要尝试使用标准库 timezone,因为它们已知存在限制,并且我们有意禁止在 DAG 中使用它们。

回顾

好了,我们现在有了一个非常基本的 DAG。此时,您的代码应该类似于以下内容。

airflow/example_dags/tutorial.py[源代码]


import textwrap
from datetime import datetime, timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "email": ["[email protected]"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

测试

运行脚本

是时候进行一些测试了。首先,让我们确保管道已成功解析。

假设我们将上一步中的代码保存在 tutorial.py 中,该文件位于 airflow.cfg 中引用的 DAGs 文件夹中。DAG 的默认位置是 ~/airflow/dags

python ~/airflow/dags/tutorial.py

如果脚本没有引发异常,则表示您没有做错任何事情,并且您的 Airflow 环境基本正常。

命令行元数据验证

让我们运行一些命令来进一步验证此脚本。

# initialize the database tables
airflow db migrate

# print the list of active DAGs
airflow dags list

# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial

# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree

测试

让我们通过针对特定日期运行实际任务实例来进行测试。在此上下文中指定的日期称为*逻辑日期*(出于历史原因也称为*执行日期*),它模拟调度程序针对特定日期和时间运行您的任务或 DAG,即使它在*物理上*将在现在(或在其依赖项满足后立即)运行。

我们说调度程序针对特定日期和时间运行您的任务,而不是在特定日期和时间运行。这是因为每次运行 DAG 在概念上并不代表特定的日期和时间,而是代表两个时间之间的时间间隔,称为 数据间隔。DAG 运行的逻辑日期是其数据间隔的开始时间。

# command layout: command subcommand [dag_id] [task_id] [(optional) date]

# testing print_date
airflow tasks test tutorial print_date 2015-06-01

# testing sleep
airflow tasks test tutorial sleep 2015-06-01

现在还记得我们之前对模板做了什么吗?通过运行以下命令,查看此模板如何呈现和执行。

# testing templated
airflow tasks test tutorial templated 2015-06-01

这应该会显示详细的事件日志,并最终运行您的 bash 命令并打印结果。

请注意,airflow tasks test 命令在本地运行任务实例,将其日志输出到 stdout(屏幕上),不考虑依赖关系,也不将状态(运行中、成功、失败...)传递给数据库。它只是允许测试单个任务实例。

airflow dags test 也是如此,但它是在 DAG 级别上进行的。它执行给定 DAG ID 的单个 DAG 运行。虽然它确实考虑了任务依赖关系,但不会在数据库中注册任何状态。如果您的某个任务期望在某个位置获取数据,并且该数据可用,那么它对于在本地测试 DAG 的完整运行非常方便。

回填

一切看起来都运行良好,所以让我们运行一次回填。backfill 将遵循您的依赖关系,将日志发送到文件中,并与数据库通信以记录状态。如果您确实启动了 Web 服务器,您将能够跟踪进度。airflow webserver 将启动一个 Web 服务器,如果您有兴趣在回填过程中直观地跟踪进度。

请注意,如果您使用 depends_on_past=True,则各个任务实例将依赖于其先前任务实例的成功(即,根据逻辑日期的先前任务实例)。逻辑日期等于 start_date 的任务实例将忽略此依赖关系,因为不会为其创建过去的实例。

在使用 depends_on_past=True 时,您可能还需要考虑 wait_for_downstream=True。虽然 depends_on_past=True 会导致任务实例依赖于其先前 task_instance 的成功,但 wait_for_downstream=True 会导致任务实例还等待先前任务实例*紧邻下游*的所有任务实例都成功。

在此上下文中,日期范围是一个 start_date 和一个可选的 end_date,它们用于使用此 DAG 中的任务实例填充运行计划。

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow dags backfill tutorial \
    --start-date 2015-06-01 \
    --end-date 2015-06-07

下一步是什么?

就是这样!您已经编写、测试并回填了您的第一个 Airflow 管道。将您的代码合并到一个运行着调度程序的存储库中,应该会导致该管道每天都被触发和运行。

以下是一些您接下来可能想做的事情。

另请参阅

  • 继续学习本教程的下一步:使用 TaskFlow

  • 跳至 核心概念 部分,详细了解 Airflow 概念,例如 DAG、任务、操作符等。

此条目对您有帮助吗?