基本概念¶
本教程将引导您了解一些基本的 Airflow 概念、对象及其在编写第一个 DAG 时的用法。
示例管道定义¶
以下是基本管道定义的示例。如果这看起来很复杂,请不要担心,下面将逐行进行解释。
import textwrap
from datetime import datetime, timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](https://imgs.xkcd.com/comics/fixing_problems.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
它是一个 DAG 定义文件¶
需要理解的一件事是(一开始可能不是每个人都觉得直观),这个 Airflow Python 脚本实际上只是一个配置文件,它以代码的形式指定了 DAG 的结构。这里定义的实际任务将在与此脚本不同的上下文中运行。不同的任务在不同的时间点在不同的工作器上运行,这意味着此脚本不能用于在任务之间进行交叉通信。请注意,为此,我们有一个更高级的功能,称为 XComs。
人们有时认为 DAG 定义文件是他们可以进行一些实际数据处理的地方 - 根本不是这样!该脚本的目的是定义一个 DAG 对象。它需要快速评估(以秒为单位,而不是分钟),因为调度程序将定期执行它以反映任何更改。
导入模块¶
Airflow 管道只是一个碰巧定义了 Airflow DAG 对象的 Python 脚本。让我们从导入我们将需要的库开始。
import textwrap
from datetime import datetime, timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理。
默认参数¶
我们将要创建一个 DAG 和一些任务,我们可以选择显式地将一组参数传递给每个任务的构造函数(这将变得冗余),或者(更好的是!)我们可以定义一个默认参数字典,我们可以在创建任务时使用它。
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
有关 BaseOperator 参数及其作用的更多信息,请参阅 airflow.models.baseoperator.BaseOperator
文档。
此外,请注意,您可以轻松地定义不同的参数集,以服务于不同的目的。例如,在生产环境和开发环境之间使用不同的设置。
实例化 DAG¶
我们需要一个 DAG 对象来嵌套我们的任务。在这里,我们传递一个字符串,该字符串定义了 dag_id
,它充当 DAG 的唯一标识符。我们还传递了我们刚刚定义的默认参数字典,并为 DAG 定义了 1 天的 schedule
。
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
操作符¶
操作符定义了 Airflow 要完成的工作单元。使用操作符是在 Airflow 中定义工作的经典方法。对于某些用例,最好使用 TaskFlow API 在 Pythonic 上下文中定义工作,如 使用 TaskFlow 中所述。现在,使用操作符有助于可视化我们 DAG 代码中的任务依赖关系。
所有操作符都继承自 BaseOperator,它包括在 Airflow 中运行工作所需的所有参数。从这里开始,每个操作符都包含用于其正在完成的工作类型的唯一参数。一些最受欢迎的操作符是 PythonOperator、BashOperator 和 KubernetesPodOperator。
Airflow 根据您传递给操作符的参数完成工作。在本教程中,我们使用 BashOperator 来运行一些 bash 脚本。
任务¶
要在 DAG 中使用操作符,您必须将其实例化为一个任务。任务确定如何在 DAG 的上下文中执行操作符的工作。
在以下示例中,我们将 BashOperator 实例化为两个独立的任务,以便运行两个独立的 bash 脚本。每个实例化的第一个参数 task_id
充当任务的唯一标识符。
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
请注意,我们如何将操作符特定参数(bash_command
)和从 BaseOperator 继承的所有操作符共有的参数(retries
)的混合传递给操作符的构造函数。这比为每个构造函数调用传递每个参数更简单。另外,请注意,在第二个任务中,我们使用 3
覆盖了 retries
参数。
任务的优先级规则如下
显式传递的参数
default_args
字典中存在的值操作符的默认值(如果存在)
注意
任务必须包含或继承参数 task_id
和 owner
,否则 Airflow 将引发异常。全新安装的 Airflow 将为 owner
设置默认值“airflow”,因此您实际上只需要确保 task_id
有一个值即可。
使用 Jinja 进行模板化¶
Airflow 利用了 Jinja 模板 的强大功能,并为管道作者提供了一组内置参数和宏。Airflow 还为管道作者提供了定义自己的参数、宏和模板的钩子。
本教程只是简单介绍了您可以在 Airflow 中使用模板化完成的工作,但本节的目标是让您了解此功能的存在,熟悉双大括号,并指向最常见的模板变量:{{ ds }}
(今天的“日期戳”)。
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
请注意,templated_command
在 {% %}
块中包含代码逻辑,引用了 {{ ds }}
等参数,并调用了 {{ macros.ds_add(ds, 7)}}
中的函数。
文件也可以传递给 bash_command
参数,例如 bash_command='templated_command.sh'
,其中文件位置相对于包含管道文件(在本例中为 tutorial.py
)的目录。出于多种原因,这可能是可取的,例如分离脚本的逻辑和管道代码,允许在由不同语言组成的文件中进行正确的代码突出显示,以及在构建管道时具有通用灵活性。也可以在 DAG 构造函数调用中将 template_searchpath
定义为指向文件夹位置。
使用相同的 DAG 构造函数调用,可以定义 user_defined_macros
,它允许您指定自己的变量。例如,将 dict(foo='bar')
传递给此参数允许您在模板中使用 {{ foo }}
。此外,指定 user_defined_filters
允许您注册自己的过滤器。例如,将 dict(hello=lambda name: 'Hello %s' % name)
传递给此参数允许您在模板中使用 {{ 'world' | hello }}
。有关自定义过滤器的更多信息,请查看 Jinja 文档。
有关可以在模板中引用的变量和宏的更多信息,请务必通读 模板参考。
添加 DAG 和任务文档¶
我们可以为 DAG 或每个任务添加文档。目前,DAG 文档仅支持 markdown 格式,而任务文档支持纯文本、markdown、reStructuredText、json 和 yaml 格式。DAG 文档可以作为文档字符串写在 DAG 文件的开头(推荐),也可以写在文件的任何其他位置。您可以在下面找到一些关于如何实现任务和 DAG 文档的示例,以及屏幕截图。
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](https://imgs.xkcd.com/comics/fixing_problems.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
设置依赖关系¶
我们有任务 t1
、t2
和 t3
,它们之间相互依赖。以下是一些定义它们之间依赖关系的方法。
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
请注意,在执行脚本时,如果 Airflow 在 DAG 中发现循环或引用了多次依赖关系,则会引发异常。
使用时区¶
创建一个时区感知的 DAG 非常简单。只需确保使用 pendulum
提供时区感知的日期。不要尝试使用标准库 timezone,因为它们已知存在限制,并且我们有意禁止在 DAG 中使用它们。
回顾¶
好了,我们现在有了一个非常基本的 DAG。此时,您的代码应该类似于以下内容。
import textwrap
from datetime import datetime, timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](https://imgs.xkcd.com/comics/fixing_problems.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
测试¶
运行脚本¶
是时候进行一些测试了。首先,让我们确保管道已成功解析。
假设我们将上一步中的代码保存在 tutorial.py
中,该文件位于 airflow.cfg
中引用的 DAGs 文件夹中。DAG 的默认位置是 ~/airflow/dags
。
python ~/airflow/dags/tutorial.py
如果脚本没有引发异常,则表示您没有做错任何事情,并且您的 Airflow 环境基本正常。
命令行元数据验证¶
让我们运行一些命令来进一步验证此脚本。
# initialize the database tables
airflow db migrate
# print the list of active DAGs
airflow dags list
# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial
# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree
测试¶
让我们通过针对特定日期运行实际任务实例来进行测试。在此上下文中指定的日期称为*逻辑日期*(出于历史原因也称为*执行日期*),它模拟调度程序针对特定日期和时间运行您的任务或 DAG,即使它在*物理上*将在现在(或在其依赖项满足后立即)运行。
我们说调度程序针对特定日期和时间运行您的任务,而不是在特定日期和时间运行。这是因为每次运行 DAG 在概念上并不代表特定的日期和时间,而是代表两个时间之间的时间间隔,称为 数据间隔。DAG 运行的逻辑日期是其数据间隔的开始时间。
# command layout: command subcommand [dag_id] [task_id] [(optional) date]
# testing print_date
airflow tasks test tutorial print_date 2015-06-01
# testing sleep
airflow tasks test tutorial sleep 2015-06-01
现在还记得我们之前对模板做了什么吗?通过运行以下命令,查看此模板如何呈现和执行。
# testing templated
airflow tasks test tutorial templated 2015-06-01
这应该会显示详细的事件日志,并最终运行您的 bash 命令并打印结果。
请注意,airflow tasks test
命令在本地运行任务实例,将其日志输出到 stdout(屏幕上),不考虑依赖关系,也不将状态(运行中、成功、失败...)传递给数据库。它只是允许测试单个任务实例。
airflow dags test
也是如此,但它是在 DAG 级别上进行的。它执行给定 DAG ID 的单个 DAG 运行。虽然它确实考虑了任务依赖关系,但不会在数据库中注册任何状态。如果您的某个任务期望在某个位置获取数据,并且该数据可用,那么它对于在本地测试 DAG 的完整运行非常方便。
回填¶
一切看起来都运行良好,所以让我们运行一次回填。backfill
将遵循您的依赖关系,将日志发送到文件中,并与数据库通信以记录状态。如果您确实启动了 Web 服务器,您将能够跟踪进度。airflow webserver
将启动一个 Web 服务器,如果您有兴趣在回填过程中直观地跟踪进度。
请注意,如果您使用 depends_on_past=True
,则各个任务实例将依赖于其先前任务实例的成功(即,根据逻辑日期的先前任务实例)。逻辑日期等于 start_date
的任务实例将忽略此依赖关系,因为不会为其创建过去的实例。
在使用 depends_on_past=True
时,您可能还需要考虑 wait_for_downstream=True
。虽然 depends_on_past=True
会导致任务实例依赖于其先前 task_instance 的成功,但 wait_for_downstream=True
会导致任务实例还等待先前任务实例*紧邻下游*的所有任务实例都成功。
在此上下文中,日期范围是一个 start_date
和一个可选的 end_date
,它们用于使用此 DAG 中的任务实例填充运行计划。
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow dags backfill tutorial \
--start-date 2015-06-01 \
--end-date 2015-06-07
下一步是什么?¶
就是这样!您已经编写、测试并回填了您的第一个 Airflow 管道。将您的代码合并到一个运行着调度程序的存储库中,应该会导致该管道每天都被触发和运行。
以下是一些您接下来可能想做的事情。
另请参阅
继续学习本教程的下一步:使用 TaskFlow
跳至 核心概念 部分,详细了解 Airflow 概念,例如 DAG、任务、操作符等。