2025 年 Airflow 峰会将于 10 月 07-09 日举行。立即注册获取早鸟票!

airflow.models.xcom

属性

log

XComModel

XCom 模型类。包含表和一些工具方法。

函数

__getattr__(name)

模块内容

airflow.models.xcom.log[源代码]
class airflow.models.xcom.XComModel[源代码]

Bases: airflow.models.base.TaskInstanceDependencies

XCom 模型类。包含表和一些工具方法。

__tablename__ = 'xcom'[源代码]
dag_run_id[源代码]
task_id[源代码]
map_index[源代码]
key[源代码]
dag_id[源代码]
run_id[源代码]
value[源代码]
timestamp[源代码]
__table_args__[源代码]
dag_run[源代码]
logical_date[源代码]
classmethod clear(*, dag_id, task_id, run_id, map_index=None, session=NEW_SESSION)[源代码]

清除给定任务实例在数据库中的所有 XCom 数据。

注意

不会清除任何自定义 XCom 后端的数据。

参数:
  • dag_id (str) – 要清除 XCom 的 DAG ID。

  • task_id (str) – 要清除 XCom 的任务 ID。

  • run_id (str) – 要清除 XCom 的 DAG 运行 ID。

  • map_index (int | None) – 如果提供,仅清除此特定映射任务的 XCom。默认值 None 清除该任务的所有 XCom。

  • session (sqlalchemy.orm.Session) – 数据库会话。如果未提供,将为该函数创建一个新会话。

classmethod set(key, value, *, dag_id, task_id, run_id, map_index=-1, session=NEW_SESSION)[源代码]

存储一个 XCom 值。

参数:
  • key (str) – 存储 XCom 的键。

  • value (Any) – 要存储的 XCom 值。

  • dag_id (str) – DAG ID。

  • task_id (str) – 任务 ID。

  • run_id (str) – 任务的 DAG 运行 ID。

  • map_index (int) – 可选的映射索引,用于为映射任务分配 XCom。默认值为 -1(为非映射任务设置)。

  • session (sqlalchemy.orm.Session) – 数据库会话。如果未提供,将为该函数创建一个新会话。

classmethod get_many(*, run_id, key=None, task_ids=None, dag_ids=None, map_indexes=None, include_prior_dates=False, limit=None, session=NEW_SESSION)[源代码]

构建一个查询以获取一个或多个 XCom 条目。

此函数返回一个包含完整 XCom 对象的 SQLAlchemy 查询。如果您只需要一个存储值,请改用 get_one()

参数:
  • run_id (str) – 任务的 DAG 运行 ID。

  • key (str | None) – XCom 的键。如果提供,仅返回键匹配的 XCom。传入 None(默认)以移除过滤。

  • task_ids (str | collections.abc.Iterable[str] | None) – 仅拉取 ID 匹配的任务的 XCom。传入 None(默认)以移除过滤。

  • dag_ids (str | collections.abc.Iterable[str] | None) – 仅拉取指定 DAG 的 XCom。传入 None(默认)以移除过滤。

  • map_indexes (int | collections.abc.Iterable[int] | None) – 仅拉取匹配映射索引的 XCom。传入 None(默认)以移除过滤。

  • include_prior_dates (bool) – 如果为 False(默认),则仅返回指定 DAG 运行的 XCom。如果为 True,无论属于哪个运行,都返回所有匹配的 XCom。

  • session (sqlalchemy.orm.Session) – 数据库会话。如果未提供,将为该函数创建一个新会话。

  • limit (int | None) – 限制返回的 XCom 数量

static serialize_value(value, *, key=None, task_id=None, dag_id=None, run_id=None, map_index=None)[源代码]

将 XCom 值序列化为 JSON 字符串。

static deserialize_value(result)[源代码]

从数据库结果反序列化 XCom 值。

如果反序列化失败,则返回原始值,该值仍必须是有效的 Python JSON 兼容类型(例如,dictliststrintfloatbool)。

XCom 值以 JSON 格式存储在数据库中,SQLAlchemy 会自动处理序列化(json.dumps)和反序列化(json.loads)。但是,我们使用自定义编码器(serialize_value 和反序列化)来处理特殊情况,例如通过 Airflow 序列化模块对元组进行编码。必须使用 XComDecoder 解码这些值以恢复原始类型。

某些 XCom 值,例如通过任务执行 API 设置的值,会绕过 serialize_value 并直接以 JSON 格式存储。由于这些值已由 SQLAlchemy 反序列化,因此会原样返回。

示例:处理元组:

original_value = (1, 2, 3)
serialized_value = XComModel.serialize_value(original_value)
print(serialized_value)
# '{"__classname__": "builtins.tuple", "__version__": 1, "__data__": [1, 2, 3]}'

这个序列化的值存储在数据库中。反序列化时,该值将恢复为原始元组。

参数:

result – 包含 value 属性的 XCom 数据库行或对象。

返回:

反序列化的 Python 对象。

返回类型:

Any

airflow.models.xcom.__getattr__(name)[源代码]

此条目是否有帮助?