模板参考¶
变量、宏和过滤器可以在模板中使用(请参阅 Jinja 模板 部分)
以下内容在 Airflow 中开箱即用。可以通过 插件 全局添加其他自定义宏,或者通过 DAG.user_defined_macros
参数在 DAG 级别添加。
变量¶
Airflow 引擎默认传递一些变量,这些变量在所有模板中都可以访问
变量 |
类型 |
描述 |
---|---|---|
|
数据间隔的开始时间。在版本 2.2 中添加。 |
|
|
pendulum.DateTime |
|
|
{{ logical_date }}
pendulum.DateTime
逻辑上标识当前 DAG 运行的日期时间。此值不包含任何语义,而只是一个用于标识的值。
|
|
|
例如,根据时间戳从数据库中获取一行切片。 |
{{ ds }}
str
|
|
例如,根据时间戳从数据库中获取一行切片。 |
与 |
|
str |
DAG 运行的逻辑日期,格式为
YYYYMMDD 。 |
|
例如,根据时间戳从数据库中获取一行切片。 |
{{ exception }}
无 | str | 异常 KeyboardInterrupt
|
|
例如,根据时间戳从数据库中获取一行切片。 |
{{ ts }}
str
|
|
例如,根据时间戳从数据库中获取一行切片。 |
与
{{ logical_date | ts }} 相同。例如:
2018-01-01T00:00:00+00:00 。 |
|
str |
DAG 运行的逻辑日期时间,格式为
YYYYMMDDTHHMMSS+0000 。与
{{ logical_date | ts_nodash_with_tz }} 相同。 |
|
str |
{{ ts_nodash }}
与
{{ logical_date | ts_nodash_with_tz }} 相同。 |
|
str |
DAG 运行的逻辑日期时间,格式为 |
|
str |
例如: |
|
pendulum.DateTime | |
先前成功的 |
|
pendulum.DateTime | |
{{ prev_data_interval_end_success }} |
|
先前成功的 |
在版本 2.2 中添加。 |
|
pendulum.DateTime | |
先前成功的
DagRun 的开始日期(如果可用)。 |
|
pendulum.DateTime |
None |
|
|
{{ inlets }} |
列表 |
|
{{ inlets }} |
{{ outlets }} |
|
在任务上声明的出口列表。 |
|
|
BaseOperator |
|
|
BaseOperator |
|
|
对宏包的引用。请参阅下面的 宏。 |
|
|
例如,根据时间戳从数据库中获取一行切片。 |
TaskInstance
当前正在运行的 . |
|
TaskInstance |
与
{{ task_instance }} 相同。{{ params }}
|
|
例如,根据时间戳从数据库中获取一行切片。 |
用户定义的参数。如果在 |
|
任何 |
Airflow 变量。请参阅下面的 模板中的 Airflow 变量。 |
|
任何 |
Airflow 变量,以 JSON 格式加载。请参阅下面的 模板中的 Airflow 变量。 |
|
连接 |
Airflow 连接。请参阅下面的 模板中的 Airflow 连接。 |
|
str |
任务实例的唯一、人类可读的键。格式为
{dag_id}__{task_id}__{ds_nodash}
{{ conf }}
|
|
表示 |
DagRun
当前正在运行的 DagRun
。
{{ test_mode }}
虽然使用 @task
装饰的任务不支持渲染作为参数传递的 jinja 模板,但上面列出的所有变量都可以直接从任务中访问。以下代码块是一个从其任务访问 task_instance
对象的示例
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(task_instance: TaskInstance | None = None, dag_run: DagRun | None = None): print(f"Run ID: {task_instance.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {task_instance.duration}") # Duration: 0.972019 print(f"DAG Run queued at: {dag_run.queued_at}") # 2023-08-10 00:00:01+02:20
已弃用的变量¶
以下变量已弃用。它们保留是为了向后兼容,但您应该将现有代码转换为使用其他变量。
已弃用的变量 |
描述 |
---|---|
|
执行日期(逻辑日期),与 |
|
下次计划运行的逻辑日期(如果适用);您可以改用 |
|
如果存在,则为 |
|
如果存在,则为 |
|
上次计划运行的逻辑日期(如果适用) |
|
如果存在,则为 |
|
如果存在,则为 |
|
执行日期前一天,格式为 |
|
执行日期前一天,格式为 |
|
执行日期后一天,格式为 |
|
执行日期后一天,格式为 |
|
先前成功运行 DAG 的执行日期;如果 DAG 使用的时间表/计划定义的 |
请注意,您可以使用简单的点表示法访问对象的属性和方法。以下是一些可能的示例:{{ task.owner }}
、{{ task.task_id }}
、{{ ti.hostname }}
,…有关对象属性和方法的更多信息,请参阅模型文档。
模板中的 Airflow 变量¶
var
模板变量允许您访问 Airflow 变量。您可以以纯文本或 JSON 格式访问它们。如果您使用 JSON,您还可以遍历嵌套结构,例如字典:{{ var.json.my_dict_var.key1 }}
。
如果需要,也可以通过字符串获取变量(例如,您的变量键包含点),使用 {{ var.value.get('my.var', 'fallback') }}
或 {{ var.json.get('my.dict.var', {'key1': 'val1'}) }}
。如果变量不存在,则可以提供默认值。
模板中的 Airflow 连接¶
类似地,可以通过 conn
模板变量访问 Airflow 连接数据。例如,您可以在模板中使用表达式,例如 {{ conn.my_conn_id.login }}
、{{ conn.my_conn_id.password }}
等。
就像使用 var
一样,可以通过字符串获取连接(例如 {{ conn.get('my_conn_id_'+index).host }}
)或提供默认值(例如 {{ conn.get('my_conn_id', {"host": "host1", "login": "user1"}).host }}
)。
此外,可以使用 extra_dejson
字段将连接的 extras
字段作为 Python 字典获取,例如,conn.my_aws_conn_id.extra_dejson.region_name
将从 extras
中获取 region_name
。这样,也可以在 extras
中提供默认值(例如 {{ conn.my_aws_conn_id.extra_dejson.get('region_name', '欧洲 (法兰克福)') }}
)。
过滤器¶
Airflow 定义了一些可用于格式化值的 Jinja 过滤器。
例如,使用 {{ logical_date | ds }}
将以 YYYY-MM-DD
格式输出 logical_date。
过滤器 |
操作对象 |
描述 |
---|---|---|
|
日期时间 |
将日期时间格式化为 |
|
日期时间 |
将日期时间格式化为 |
|
日期时间 |
与 |
|
日期时间 |
与 |
|
日期时间 |
作为 |
宏¶
宏是一种将对象公开给模板的方法,并在模板中位于 macros
命名空间下。
提供了一些常用的库和方法。
变量 |
描述 |
---|---|
|
标准库的 |
|
标准库的 |
|
对 |
|
标准库的 |
|
标准库的 |
|
标准库的 |
还定义了一些 Airflow 特定的宏
- airflow.macros.datetime_diff_for_humans(dt, since=None)[源代码]¶
返回日期时间之间的人类可读/近似差异。
如果只提供一个日期时间,则比较将基于当前时间。
- 参数
dt (任何) – 要显示差异的日期时间
since (日期时间 | 无) – 要显示日期的起始时间。如果为
None
,则差异在dt
和当前时间之间。
- airflow.macros.ds_add(ds, days)[源代码]¶
从 YYYY-MM-DD 添加或减去天数。
>>> ds_add("2015-01-01", 5) '2015-01-06' >>> ds_add("2015-01-06", -5) '2015-01-01'
- airflow.macros.ds_format(ds, input_format, output_format)[源代码]¶
以给定格式输出日期时间字符串。
- 参数
>>> ds_format("2015-01-01", "%Y-%m-%d", "%m-%d-%y") '01-01-15' >>> ds_format("1/5/2015", "%m/%d/%Y", "%Y-%m-%d") '2015-01-05'
- airflow.macros.random() x 在 区间 [0, 1). ¶