模板参考

变量、宏和过滤器可以在模板中使用(请参阅 Jinja 模板 部分)

以下内容在 Airflow 中开箱即用。可以通过 插件 全局添加其他自定义宏,或者通过 DAG.user_defined_macros 参数在 DAG 级别添加。

变量

Airflow 引擎默认传递一些变量,这些变量在所有模板中都可以访问

变量

类型

描述

{{ data_interval_start }}

pendulum.DateTime

数据间隔的开始时间。在版本 2.2 中添加。

{{ data_interval_end }}

pendulum.DateTime

pendulum.DateTime

数据间隔的结束时间。在版本 2.2 中添加。

pendulum.DateTime

{{ logical_date }}
pendulum.DateTime
逻辑上标识当前 DAG 运行的日期时间。此值不包含任何语义,而只是一个用于标识的值。

如果您想要一个具有现实世界语义的值,请改用 data_interval_startdata_interval_end

例如,根据时间戳从数据库中获取一行切片。

{{ ds }}
str

DAG 运行的逻辑日期,格式为 YYYY-MM-DD

例如,根据时间戳从数据库中获取一行切片。

{{ logical_date | ds }} 相同。

{{ ds_nodash }}

str

DAG 运行的逻辑日期,格式为 YYYYMMDD


{{ logical_date | ds_nodash }} 相同。

例如,根据时间戳从数据库中获取一行切片。

{{ exception }}
无 | str | 异常 KeyboardInterrupt

运行任务实例时发生错误。

例如,根据时间戳从数据库中获取一行切片。

{{ ts }}
str

DAG 运行的逻辑日期时间,格式为 YYYY-MM-DDTHH:MM:SS+00:00

例如,根据时间戳从数据库中获取一行切片。

{{ logical_date | ts }} 相同。
例如:2018-01-01T00:00:00+00:00

{{ ts_nodash_with_tz }}

str

DAG 运行的逻辑日期时间,格式为 YYYYMMDDTHHMMSS+0000
{{ logical_date | ts_nodash_with_tz }} 相同。

例如:20180101T000000+0000

str

{{ ts_nodash }}
{{ logical_date | ts_nodash_with_tz }} 相同。

str

str

DAG 运行的逻辑日期时间,格式为 YYYYMMDDTHHMMSS

{{ logical_date | ts_nodash }} 相同。

str

例如:20180101T000000

{{ prev_data_interval_start_success }}

pendulum.DateTime | None

先前成功的 DagRun 的数据间隔的开始时间。

在版本 2.2 中添加。

pendulum.DateTime | None

{{ prev_data_interval_end_success }}

pendulum.DateTime | None

先前成功的 DagRun 的数据间隔的结束时间。

在版本 2.2 中添加。

{{ prev_start_date_success }}

pendulum.DateTime | None

先前成功的 DagRun 的开始日期(如果可用)。

{{ prev_end_date_success }}

先前成功的 DagRun 的结束日期(如果可用)。

{{ inlets }}

列表

在任务上声明的入口列表。

{{ inlets }}

{{ outlets }}

列表

在任务上声明的出口列表。

{{ dag }}
DAG
当前正在运行的 DAG。您可以在 DAG 中阅读有关 DAG 的更多信息。

{{ task }}

BaseOperator

当前正在运行的 BaseOperator。您可以在 操作符 中阅读有关任务的更多信息

BaseOperator

{{ macros }}

对宏包的引用。请参阅下面的

{{ task_instance }}

例如,根据时间戳从数据库中获取一行切片。

TaskInstance
当前正在运行的 TaskInstance.

{{ ti }}

TaskInstance

{{ task_instance }} 相同。
{{ params }}

dict[str, Any]

例如,根据时间戳从数据库中获取一行切片。

用户定义的参数。如果在 airflow.cfg 中启用了 dag_run_conf_overrides_params,则可以通过传递给 trigger_dag -c 的映射来覆盖此参数。

{{ var.value }}

任何

Airflow 变量。请参阅下面的 模板中的 Airflow 变量

{{ var.json }}

任何

Airflow 变量,以 JSON 格式加载。请参阅下面的 模板中的 Airflow 变量

{{ conn }}

连接

Airflow 连接。请参阅下面的 模板中的 Airflow 连接

{{ task_instance_key_str }}

str

任务实例的唯一、人类可读的键。格式为
{dag_id}__{task_id}__{ds_nodash}
{{ conf }}

AirflowConfigParser

表示 airflow.cfg 内容的完整配置对象。请参阅 airflow.configuration.conf

{{ run_id }}
str
当前正在运行的 DagRun 运行 ID。
{{ dag_run }}

DagRun

当前正在运行的 DagRun

{{ test_mode }}

虽然使用 @task 装饰的任务不支持渲染作为参数传递的 jinja 模板,但上面列出的所有变量都可以直接从任务中访问。以下代码块是一个从其任务访问 task_instance 对象的示例

from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun


@task
def print_ti_info(task_instance: TaskInstance | None = None, dag_run: DagRun | None = None):
    print(f"Run ID: {task_instance.run_id}")  # Run ID: scheduled__2023-08-09T00:00:00+00:00
    print(f"Duration: {task_instance.duration}")  # Duration: 0.972019
    print(f"DAG Run queued at: {dag_run.queued_at}")  # 2023-08-10 00:00:01+02:20

已弃用的变量

以下变量已弃用。它们保留是为了向后兼容,但您应该将现有代码转换为使用其他变量。

已弃用的变量

描述

{{ execution_date }}

执行日期(逻辑日期),与 logical_date 相同

{{ next_execution_date }}

下次计划运行的逻辑日期(如果适用);您可以改用 data_interval_end

{{ next_ds }}

如果存在,则为 YYYY-MM-DD 格式的下一个执行日期,否则为 None

{{ next_ds_nodash }}

如果存在,则为 YYYYMMDD 格式的下一个执行日期,否则为 None

{{ prev_execution_date }}

上次计划运行的逻辑日期(如果适用)

{{ prev_ds }}

如果存在,则为 YYYY-MM-DD 格式的上一个执行日期,否则为 None

{{ prev_ds_nodash }}

如果存在,则为 YYYYMMDD 格式的上一个执行日期,否则为 None

{{ yesterday_ds }}

执行日期前一天,格式为 YYYY-MM-DD

{{ yesterday_ds_nodash }}

执行日期前一天,格式为 YYYYMMDD

{{ tomorrow_ds }}

执行日期后一天,格式为 YYYY-MM-DD

{{ tomorrow_ds_nodash }}

执行日期后一天,格式为 YYYYMMDD

{{ prev_execution_date_success }}

先前成功运行 DAG 的执行日期;如果 DAG 使用的时间表/计划定义的 data_interval_start 与旧的 execution_date 兼容,则您可以改用 prev_data_interval_start_success

请注意,您可以使用简单的点表示法访问对象的属性和方法。以下是一些可能的示例:{{ task.owner }}{{ task.task_id }}{{ ti.hostname }},…有关对象属性和方法的更多信息,请参阅模型文档。

模板中的 Airflow 变量

var 模板变量允许您访问 Airflow 变量。您可以以纯文本或 JSON 格式访问它们。如果您使用 JSON,您还可以遍历嵌套结构,例如字典:{{ var.json.my_dict_var.key1 }}

如果需要,也可以通过字符串获取变量(例如,您的变量键包含点),使用 {{ var.value.get('my.var', 'fallback') }}{{ var.json.get('my.dict.var', {'key1': 'val1'}) }}。如果变量不存在,则可以提供默认值。

模板中的 Airflow 连接

类似地,可以通过 conn 模板变量访问 Airflow 连接数据。例如,您可以在模板中使用表达式,例如 {{ conn.my_conn_id.login }}{{ conn.my_conn_id.password }} 等。

就像使用 var 一样,可以通过字符串获取连接(例如 {{ conn.get('my_conn_id_'+index).host }})或提供默认值(例如 {{ conn.get('my_conn_id', {"host": "host1", "login": "user1"}).host }})。

此外,可以使用 extra_dejson 字段将连接的 extras 字段作为 Python 字典获取,例如,conn.my_aws_conn_id.extra_dejson.region_name 将从 extras 中获取 region_name。这样,也可以在 extras 中提供默认值(例如 {{ conn.my_aws_conn_id.extra_dejson.get('region_name', '欧洲 (法兰克福)') }})。

过滤器

Airflow 定义了一些可用于格式化值的 Jinja 过滤器。

例如,使用 {{ logical_date | ds }} 将以 YYYY-MM-DD 格式输出 logical_date。

过滤器

操作对象

描述

ds

日期时间

将日期时间格式化为 YYYY-MM-DD

ds_nodash

日期时间

将日期时间格式化为 YYYYMMDD

ts

日期时间

.isoformat() 相同,例如:2018-01-01T00:00:00+00:00

ts_nodash

日期时间

ts 过滤器相同,但没有 -: 或时区信息。例如:20180101T000000

ts_nodash_with_tz

日期时间

作为 ts 过滤器,但没有 -:。例如 20180101T000000+0000

宏是一种将对象公开给模板的方法,并在模板中位于 macros 命名空间下。

提供了一些常用的库和方法。

变量

描述

macros.datetime

标准库的 datetime.datetime

macros.timedelta

标准库的 datetime.timedelta

macros.dateutil

dateutil 包的引用

macros.time

标准库的 time

macros.uuid

标准库的 uuid

macros.random

标准库的 random.random

还定义了一些 Airflow 特定的宏

airflow.macros.datetime_diff_for_humans(dt, since=None)[源代码]

返回日期时间之间的人类可读/近似差异。

如果只提供一个日期时间,则比较将基于当前时间。

参数
  • dt (任何) – 要显示差异的日期时间

  • since (日期时间 | ) – 要显示日期的起始时间。如果为 None,则差异在 dt 和当前时间之间。

airflow.macros.ds_add(ds, days)[源代码]

从 YYYY-MM-DD 添加或减去天数。

参数
  • ds (str) – 要添加到的 YYYY-MM-DD 格式的锚定日期

  • daysint)– 要添加到 ds 的天数,您可以使用负值

>>> ds_add("2015-01-01", 5)
'2015-01-06'
>>> ds_add("2015-01-06", -5)
'2015-01-01'
airflow.macros.ds_format(ds, input_format, output_format)[源代码]

以给定格式输出日期时间字符串。

参数
  • dsstr)– 包含日期的输入字符串

  • input_formatstr)– 输入字符串格式。例如 %Y-%m-%d

  • output_formatstr)– 输出字符串格式。例如 %Y-%m-%d

>>> ds_format("2015-01-01", "%Y-%m-%d", "%m-%d-%y")
'01-01-15'
>>> ds_format("1/5/2015", "%m/%d/%Y", "%Y-%m-%d")
'2015-01-05'
airflow.macros.random() x 区间 [0, 1).

此条目有帮助吗?