Airflow Summit 2025 将于 10 月 7 日至 9 日举行。立即注册享早鸟票!

XComs

XComs(“cross-communications” 的缩写,意为跨任务通信)是一种机制,允许任务之间相互通信,因为默认情况下任务是完全隔离的,并且可能在完全不同的机器上运行。

XCom 通过一个 key(本质上是它的名称)以及它来自的 task_iddag_id 进行标识。它们可以拥有任何可序列化的值(包括使用 @dataclass@attr.define 装饰的对象,参阅TaskFlow 参数),但它们仅设计用于传递少量数据;不要使用它们来传递大量值,例如数据框。

XCom 通过 Task 实例上的 xcom_pushxcom_pull 方法被明确地“推送”(push)和“拉取”(pull)到/从其存储中。

在名为 “task-1” 的任务中推送一个将被另一个任务使用的值

# pushes data in any_serializable_value into xcom with key "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)

在不同的任务中拉取上述代码中推送的值

# pulls the xcom variable with key "identifier as string" that was pushed from within task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")

如果 do_xcom_push 参数设置为 True(默认即如此),许多操作符会自动将其结果推送到名为 return_value 的 XCom 键中,@task 函数也是如此。xcom_pull 在没有传入键时,默认使用 return_value 作为键,这意味着可以像这样编写代码

# Pulls the return_value XCOM from "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')

您也可以在模板中使用 XComs

SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

XComs 与变量相关,主要区别在于 XComs 是每个 Task 实例的,设计用于在 DAG 运行内部进行通信,而变量是全局的,设计用于整体配置和值共享。

如果您想一次推送多个 XComs,可以将 do_xcom_pushmultiple_outputs 参数设置为 True,然后返回一个包含多个值的字典。

推送多个 XComs 并单独拉取的示例

# A task returning a dictionary
@task(do_xcom_push=True, multiple_outputs=True)
def push_multiple(**context):
    return {"key1": "value1", "key2": "value2"}


@task
def xcom_pull_with_multiple_outputs(**context):
    # Pulling a specific key from the multiple outputs
    key1 = context["ti"].xcom_pull(task_ids="push_multiple", key="key1")  # to pull key1
    key2 = context["ti"].xcom_pull(task_ids="push_multiple", key="key2")  # to pull key2

    # Pulling entire xcom data from push_multiple task
    data = context["ti"].xcom_pull(task_ids="push_multiple", key="return_value")

注意

如果第一次任务运行未成功,那么在每次重试任务时,XComs 将被清除,以使任务运行具有幂等性。

对象存储 XCom 后端

默认的 XCom 后端 BaseXCom 将 XCom 存储在 Airflow 数据库中,这对于小值效果很好,但处理大值或大量 XCom 时可能导致问题。为克服此限制,推荐使用对象存储来高效处理更大规模数据。详细概述请参考文档

自定义 XCom 后端

XCom 系统具有可互换的后端,您可以通过 xcom_backend 配置选项来设置使用哪个后端。

如果您想实现自己的后端,应该继承 BaseXCom 类,并重写 serialize_valuedeserialize_value 方法。

您可以重写 BaseXCom 类中的 purge 方法,以便控制从自定义后端清除 XCom 数据。此方法会在 delete 过程中被调用。

在容器中验证自定义 XCom 后端的使用

根据 Airflow 的部署环境,例如本地、Docker、K8s 等,确认自定义 XCom 后端是否实际初始化非常有用。例如,容器环境的复杂性使得在容器部署期间更难确定后端是否正确加载。幸运的是,可以使用以下指导来帮助您增强对自定义 XCom 实现的信心。

如果您可以 exec 进入 Airflow 容器的终端,您就可以打印出正在使用的实际 XCom 类

from airflow.models.xcom import XCom

print(XCom.__name__)

此条目有帮助吗?