对象存储 XCom 后端

默认的 XCom 后端是 BaseXCom 类,它将 XCom 存储在 Airflow 数据库中。这对于较小的值来说是可以的,但对于较大的值或大量的 XCom 来说可能会出现问题。

要启用将 XCom 存储在对象存储中,您可以将 xcom_backend 配置选项设置为 airflow.providers.common.io.xcom.backend.XComObjectStorageBackend。您还需要将 xcom_objectstorage_path 设置为所需的位置。连接 ID 从您将提供的 URL 的用户部分获取,例如 xcom_objectstorage_path = s3://conn_id@mybucket/key。此外,xcom_objectstorage_threshold 必须大于 -1。任何小于阈值(以字节为单位)的对象都将存储在数据库中,任何大于阈值的对象都将存储在对象存储中。这将允许混合设置。如果 XCom 存储在对象存储上,则会在数据库中保存一个引用。最后,您可以将 xcom_objectstorage_compression 设置为 fsspec 支持的压缩方法,例如 zipsnappy,以便在将数据存储到对象存储之前对其进行压缩。

因此,例如,以下配置会将任何大于 1MB 的内容存储在 S3 中,并使用 gzip 对其进行压缩

[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
xcom_objectstorage_threshold = 1048576
xcom_objectstorage_compression = gzip

注意

压缩需要在您的 Python 环境中安装对它的支持。例如,要使用 snappy 压缩,您需要安装 python-snappy。Zip、gzip 和 bz2 开箱即用。

此条目有帮助吗?