模板参考

变量、宏和过滤器可以在模板中使用(请参阅Jinja 模板部分)

以下内容是 Airflow 开箱即用的。可以通过插件全局添加其他自定义宏,或者通过 DAG.user_defined_macros 参数在 DAG 级别添加。

变量

Airflow 引擎默认传递一些变量,这些变量可以在所有模板中访问

变量

类型

描述

{{ data_interval_start }}

pendulum.DateTime

数据间隔的开始时间。在 2.2 版本中添加。

{{ data_interval_end }}

pendulum.DateTime

数据间隔的结束时间。在 2.2 版本中添加。

{{ logical_date }}

pendulum.DateTime

逻辑上标识当前 DAG 运行的日期时间。此值不包含任何语义,而只是一个用于标识的值。
如果要获取具有实际语义的值,请使用 data_interval_startdata_interval_end
例如,根据时间戳从数据库中获取行切片。

{{ ds }}

str

DAG 运行的逻辑日期,格式为 YYYY-MM-DD
{{ logical_date | ds }} 相同。

{{ ds_nodash }}

str

{{ logical_date | ds_nodash }} 相同。

{{ exception }}

None | str | 异常 KeyboardInterrupt

运行任务实例时发生错误。


{{ ts }}

str

{{ logical_date | ts }} 相同。
示例:2018-01-01T00:00:00+00:00

{{ ts_nodash_with_tz }}

str

{{ logical_date | ts_nodash_with_tz }} 相同。
示例:20180101T000000+0000

{{ ts_nodash }}

str

{{ logical_date | ts_nodash }} 相同。
示例:20180101T000000

{{ prev_data_interval_start_success }}

pendulum.DateTime | None

先前成功DagRun的数据间隔的开始时间。
在 2.2 版本中添加。

{{ prev_data_interval_end_success }}

pendulum.DateTime | None

先前成功DagRun的数据间隔的结束时间。
在 2.2 版本中添加。

{{ prev_start_date_success }}

pendulum.DateTime | None

先前成功DagRun的开始日期(如果可用)。

{{ prev_end_date_success }}

pendulum.DateTime | None

先前成功DagRun的结束日期(如果可用)。

{{ inlets }}

list

任务上声明的入口列表。

{{ inlet_events }}

dict[str, …]

访问入口数据集的过去事件。请参阅数据集。在 2.10 版本中添加。

{{ outlets }}

list

任务上声明的出口列表。

{{ outlet_events }}

dict[str, …]

用于将信息附加到当前任务将发出的数据集事件的访问器。
请参阅数据集。在 2.10 版本中添加。

{{ dag }}

DAG

当前正在运行的DAG。您可以在DAG中阅读有关 DAG 的更多信息。

{{ task }}

BaseOperator

当前正在运行的BaseOperator。您可以在运算符中阅读有关任务的更多信息

{{ macros }}

对宏包的引用。请参阅下面的

{{ task_instance }}

TaskInstance

当前正在运行的TaskInstance

{{ ti }}

TaskInstance

{{ task_instance }} 相同。

{{ params }}

dict[str, Any]

用户定义的参数。如果 dag_run_conf_overrides_params
airflow.cfg 中启用,则可以通过传递给 trigger_dag -c 的映射覆盖此参数。
airflow.cfg 中启用。

{{ var.value }}

Airflow 变量。请参阅下面的模板中的 Airflow 变量

{{ var.json }}

Airflow 变量。请参阅下面的模板中的 Airflow 变量

{{ conn }}

Airflow 连接。请参阅下面的模板中的 Airflow 连接

{{ task_instance_key_str }}

str

任务实例的唯一、人类可读的键。格式为
{dag_id}__{task_id}__{ds_nodash}.

{{ conf }}

AirflowConfigParser

表示您的airflow.cfg内容的完整配置对象。请参阅 airflow.configuration.conf
airflow.cfg

{{ run_id }}

str

当前正在运行的DagRun运行 ID。

{{ dag_run }}

DagRun

当前正在运行的DagRun

{{ test_mode }}

bool

任务实例是否由 airflow test CLI 运行。

{{ map_index_template }}

None | str

用于呈现映射任务的展开任务实例的模板。设置此值将反映在呈现的结果中。

{{ expanded_ti_count }}

int | None

映射任务展开为的任务实例数。如果
当前任务未映射,则应为 None
在 2.5 版本中添加。

{{ triggering_dataset_events }}

dict[str, list[DatasetEvent]]

如果在数据集调度 DAG 中,则将数据集 URI 映射到触发DatasetEvent的列表
(如果存在多个具有不同频率的数据集,则可能存在多个)。
在此处阅读更多信息 数据集
在 2.4 版本中添加。

注意

DAG 运行的逻辑日期以及由此派生的值,例如 dsts不应被视为 DAG 中的唯一标识。请改用 run_id

从 TaskFlow 任务访问 Airflow 上下文变量

虽然使用 @task 装饰的任务不支持渲染作为参数传递的 Jinja 模板,但上述所有变量都可以直接从任务中访问。以下代码块示例展示了如何从任务访问 task_instance 对象。

from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun


@task
def print_ti_info(task_instance: TaskInstance | None = None, dag_run: DagRun | None = None):
    print(f"Run ID: {task_instance.run_id}")  # Run ID: scheduled__2023-08-09T00:00:00+00:00
    print(f"Duration: {task_instance.duration}")  # Duration: 0.972019
    print(f"DAG Run queued at: {dag_run.queued_at}")  # 2023-08-10 00:00:01+02:20

已弃用的变量

以下变量已弃用。保留它们是为了向后兼容,但您应该将现有代码转换为使用其他变量。

已弃用的变量

描述

{{ execution_date }}

执行日期(逻辑日期),与 logical_date 相同

{{ next_execution_date }}

下一个计划运行的逻辑日期(如果适用);您可以使用 data_interval_end 来代替

{{ next_ds }}

下一个执行日期,格式为 YYYY-MM-DD,如果存在,否则为 None

{{ next_ds_nodash }}

下一个执行日期,格式为 YYYYMMDD,如果存在,否则为 None

{{ prev_execution_date }}

上一个计划运行的逻辑日期(如果适用)

{{ prev_ds }}

上一个执行日期,格式为 YYYY-MM-DD,如果存在,否则为 None

{{ prev_ds_nodash }}

上一个执行日期,格式为 YYYYMMDD,如果存在,否则为 None

{{ yesterday_ds }}

执行日期的前一天,格式为 YYYY-MM-DD

{{ yesterday_ds_nodash }}

执行日期的前一天,格式为 YYYYMMDD

{{ tomorrow_ds }}

执行日期的后一天,格式为 YYYY-MM-DD

{{ tomorrow_ds_nodash }}

执行日期的后一天,格式为 YYYYMMDD

{{ prev_execution_date_success }}

先前成功 DAG 运行的执行日期;如果您用于 DAG 的时间表/计划定义了与旧版 execution_date 兼容的 data_interval_start,则可以使用 prev_data_interval_start_success 来代替。

请注意,您可以使用简单的点表示法访问对象的属性和方法。以下是一些可能的示例:{{ task.owner }}{{ task.task_id }}{{ ti.hostname }},… 有关对象属性和方法的更多信息,请参阅模型文档。

模板中的 Airflow 变量

var 模板变量允许您访问 Airflow 变量。您可以将其作为纯文本或 JSON 访问。如果您使用 JSON,您还可以遍历嵌套结构,例如字典:{{ var.json.my_dict_var.key1 }}

如果需要,也可以使用字符串获取变量(例如,您的变量键包含点),方法是使用 {{ var.value.get('my.var', 'fallback') }}{{ var.json.get('my.dict.var', {'key1': 'val1'}) }}。如果变量不存在,可以提供默认值。

模板中的 Airflow 连接

类似地,可以通过 conn 模板变量访问 Airflow 连接数据。例如,您可以在模板中使用如下表达式:{{ conn.my_conn_id.login }}{{ conn.my_conn_id.password }} 等。

就像使用 var 一样,可以使用字符串获取连接(例如 {{ conn.get('my_conn_id_'+index).host }} )或提供默认值(例如 {{ conn.get('my_conn_id', {"host": "host1", "login": "user1"}).host }})。

此外,可以使用 extra_dejson 字段将连接的 extras 字段作为 Python 字典获取,例如 conn.my_aws_conn_id.extra_dejson.region_name 将从 extras 中获取 region_name。这样,也可以在 extras 中提供默认值(例如 {{ conn.my_aws_conn_id.extra_dejson.get('region_name', 'Europe (Frankfurt)') }})。

过滤器

Airflow 定义了一些 Jinja 过滤器,可用于格式化值。

例如,使用 {{ logical_date | ds }} 将以 YYYY-MM-DD 格式输出 logical_date。

过滤器

操作于

描述

ds

datetime

将 datetime 格式化为 YYYY-MM-DD

ds_nodash

datetime

将 datetime 格式化为 YYYYMMDD

ts

datetime

.isoformat() 相同,例如:2018-01-01T00:00:00+00:00

ts_nodash

datetime

ts 过滤器相同,但不包含 -: 或时区信息。例如:20180101T000000

ts_nodash_with_tz

datetime

ts 过滤器相同,但不包含 -:。例如 20180101T000000+0000

宏是将对象公开到模板并在模板中的 macros 命名空间下使用的工具。

提供了一些常用的库和方法。

变量

描述

macros.datetime

标准库的 datetime.datetime

macros.timedelta

标准库的 datetime.timedelta

macros.dateutil

dateutil 包的引用

macros.time

标准库的 time

macros.uuid

标准库的 uuid

macros.random

标准库的 random.random

还定义了一些 Airflow 特有的宏

airflow.macros.datetime_diff_for_humans(dt, since=None)[源代码]

返回 datetime 之间的人类可读/近似差异。

如果仅提供一个 datetime,则比较将基于当前时间。

参数
  • dt (Any) – 要显示差异的 datetime

  • since (DateTime | None) – 从哪个时间显示日期。如果为 None,则差异为 dt 和当前时间之间的差异。

airflow.macros.ds_add(ds, days)[源代码]

从 YYYY-MM-DD 添加或减去天数。

参数
  • ds (str) – 用于添加的基准日期,格式为 YYYY-MM-DD

  • days (int) – 要添加到 ds 的天数,可以使用负值

>>> ds_add("2015-01-01", 5)
'2015-01-06'
>>> ds_add("2015-01-06", -5)
'2015-01-01'
airflow.macros.ds_format(ds, input_format, output_format)[源代码]

以给定格式输出日期时间字符串。

参数
  • ds (str) – 包含日期的输入字符串。

  • input_format (str) – 输入字符串格式(例如,‘%Y-%m-%d’)。

  • output_format (str) – 输出字符串格式(例如,‘%Y-%m-%d’)。

>>> ds_format("2015-01-01", "%Y-%m-%d", "%m-%d-%y")
'01-01-15'
>>> ds_format("1/5/2015", "%m/%d/%Y", "%Y-%m-%d")
'2015-01-05'
>>> ds_format("12/07/2024", "%d/%m/%Y", "%A %d %B %Y", "en_US")
'Friday 12 July 2024'
airflow.macros.ds_format_locale(ds, input_format, output_format, locale=None)[源代码]

以给定的 Babel 格式输出本地化的日期时间字符串。

参数
  • ds (str) – 包含日期的输入字符串。

  • input_format (str) – 输入字符串格式(例如,‘%Y-%m-%d’)。

  • output_format (str) – 输出字符串 Babel 格式(例如,yyyy-MM-dd)。

  • locale (Locale | str | None) – 用于格式化输出字符串的区域设置(例如,‘en_US’)。 如果未指定区域设置,将使用默认的 LC_TIME,如果该设置也不可用,则将使用 'en_US'。

>>> ds_format("2015-01-01", "%Y-%m-%d", "MM-dd-yy")
'01-01-15'
>>> ds_format("1/5/2015", "%m/%d/%Y", "yyyy-MM-dd")
'2015-01-05'
>>> ds_format("12/07/2024", "%d/%m/%Y", "EEEE dd MMMM yyyy", "en_US")
'Friday 12 July 2024'

2.10.0 版本新增。

airflow.macros.random() x 在区间 [0, 1) 内。

此条目是否有帮助?