airflow.models.xcom¶
属性¶
类¶
XCom 模型类。包含表和一些工具方法。 |
函数¶
|
模块内容¶
- class airflow.models.xcom.XComModel[源代码]¶
Bases:
airflow.models.base.TaskInstanceDependencies
XCom 模型类。包含表和一些工具方法。
- classmethod clear(*, dag_id, task_id, run_id, map_index=None, session=NEW_SESSION)[源代码]¶
清除给定任务实例在数据库中的所有 XCom 数据。
注意
这不会清除任何自定义 XCom 后端的数据。
- 参数:
dag_id (str) – 要清除 XCom 的 DAG ID。
task_id (str) – 要清除 XCom 的任务 ID。
run_id (str) – 要清除 XCom 的 DAG 运行 ID。
map_index (int | None) – 如果提供,仅清除此特定映射任务的 XCom。默认值
None
清除该任务的所有 XCom。session (sqlalchemy.orm.Session) – 数据库会话。如果未提供,将为该函数创建一个新会话。
- classmethod set(key, value, *, dag_id, task_id, run_id, map_index=-1, session=NEW_SESSION)[源代码]¶
存储一个 XCom 值。
- classmethod get_many(*, run_id, key=None, task_ids=None, dag_ids=None, map_indexes=None, include_prior_dates=False, limit=None, session=NEW_SESSION)[源代码]¶
构建一个查询以获取一个或多个 XCom 条目。
此函数返回一个包含完整 XCom 对象的 SQLAlchemy 查询。如果您只需要一个存储值,请改用
get_one()
。- 参数:
run_id (str) – 任务的 DAG 运行 ID。
key (str | None) – XCom 的键。如果提供,仅返回键匹配的 XCom。传入 None(默认)以移除过滤。
task_ids (str | collections.abc.Iterable[str] | None) – 仅拉取 ID 匹配的任务的 XCom。传入 None(默认)以移除过滤。
dag_ids (str | collections.abc.Iterable[str] | None) – 仅拉取指定 DAG 的 XCom。传入 None(默认)以移除过滤。
map_indexes (int | collections.abc.Iterable[int] | None) – 仅拉取匹配映射索引的 XCom。传入 None(默认)以移除过滤。
include_prior_dates (bool) – 如果为 False(默认),则仅返回指定 DAG 运行的 XCom。如果为 True,无论属于哪个运行,都返回所有匹配的 XCom。
session (sqlalchemy.orm.Session) – 数据库会话。如果未提供,将为该函数创建一个新会话。
limit (int | None) – 限制返回的 XCom 数量
- static serialize_value(value, *, key=None, task_id=None, dag_id=None, run_id=None, map_index=None)[源代码]¶
将 XCom 值序列化为 JSON 字符串。
- static deserialize_value(result)[源代码]¶
从数据库结果反序列化 XCom 值。
如果反序列化失败,则返回原始值,该值仍必须是有效的 Python JSON 兼容类型(例如,
dict
、list
、str
、int
、float
或bool
)。XCom 值以 JSON 格式存储在数据库中,SQLAlchemy 会自动处理序列化(
json.dumps
)和反序列化(json.loads
)。但是,我们使用自定义编码器(serialize_value
和反序列化)来处理特殊情况,例如通过 Airflow 序列化模块对元组进行编码。必须使用XComDecoder
解码这些值以恢复原始类型。某些 XCom 值,例如通过任务执行 API 设置的值,会绕过
serialize_value
并直接以 JSON 格式存储。由于这些值已由 SQLAlchemy 反序列化,因此会原样返回。示例:处理元组:
original_value = (1, 2, 3) serialized_value = XComModel.serialize_value(original_value) print(serialized_value) # '{"__classname__": "builtins.tuple", "__version__": 1, "__data__": [1, 2, 3]}'
这个序列化的值存储在数据库中。反序列化时,该值将恢复为原始元组。
- 参数:
result – 包含
value
属性的 XCom 数据库行或对象。- 返回:
反序列化的 Python 对象。
- 返回类型:
Any