序列化¶
为了支持任务间的数据交换(例如参数),Airflow 需要对要交换的数据进行序列化,并在下游任务需要时再次反序列化。序列化也发生在 Web 服务器和调度器(而非 DAG 处理器)无需读取 DAG 文件时。这样做是为了安全和效率。
序列化是一项出奇困难的工作。Python 本身仅支持对原始类型(如 str
和 int
)进行序列化,并遍历可迭代对象。当情况变得更复杂时,就需要自定义序列化。
Airflow 本身支持三种自定义序列化方式。原始类型按原样返回,无需额外编码,例如 str
仍然是 str
。当它不是原始类型(或其可迭代对象)时,Airflow 会在 airflow.serialization.serializers
命名空间中查找已注册的序列化器和反序列化器。如果找不到,它会在类中查找 serialize()
方法,或者在反序列化时查找 deserialize(data, version: int)
方法。最后,如果类使用 @dataclass
或 @attr.define
装饰器装饰,它将使用这些装饰器提供的公共方法。
如果您想扩展 Airflow 以支持新的序列化器,了解何时选择哪种序列化方式非常有用。由 Airflow 控制的对象,即位于 airflow.*
命名空间下的对象(如 airflow.model.dag.DAG
)或由开发人员控制的对象(如 my.company.Foo
),应首先检查它们是否可以使用 @attr.define
或 @dataclass
进行装饰。如果不可能,则应实现 serialize
和 deserialize
方法。serialize
方法应返回一个原始类型或一个字典。它无需序列化字典中的值(这会由 Airflow 处理),但键必须是原始类型形式。
不受 Airflow 控制的对象(例如 numpy.int16
)将需要注册的序列化器和反序列化器。需要版本控制。原始类型(不包括 bytes
)可以作为字典返回。同样,dict
的值不需要序列化,但其键需要是原始类型形式。如果您正在实现已注册的序列化器,请特别注意避免循环导入。通常,可以通过使用 str
来填充序列化器列表来避免这种情况。例如:serializers = ["my.company.Foo"]
而不是 serializers = [Foo]
。
注意
序列化和反序列化依赖于速度。尽可能使用内置函数(如 dict
),避免使用类和其他复杂结构。
Airflow 对象¶
from typing import Any, ClassVar
class Foo:
__version__: ClassVar[int] = 1
def __init__(self, a, v) -> None:
self.a = a
self.b = {"x": v}
def serialize(self) -> dict[str, Any]:
return {
"a": self.a,
"b": self.b,
}
@staticmethod
def deserialize(data: dict[str, Any], version: int):
f = Foo(a=data["a"])
f.b = data["b"]
return f
已注册¶
from __future__ import annotations
from decimal import Decimal
from typing import TYPE_CHECKING
from airflow.utils.module_loading import qualname
if TYPE_CHECKING:
from airflow.serialization.serde import U
serializers = [
Decimal
] # this can be a type or a fully qualified str. Str can be used to prevent circular imports
deserializers = serializers # in some cases you might not have a deserializer (e.g. k8s pod)
__version__ = 1 # required
# the serializer expects output, classname, version, is_serialized?
def serialize(o: object) -> tuple[U, str, int, bool]:
if isinstance(o, Decimal):
name = qualname(o)
_, _, exponent = o.as_tuple()
if exponent >= 0: # No digits after the decimal point.
return int(o), name, __version__, True
# Technically lossy due to floating point errors, but the best we
# can do without implementing a custom encode function.
return float(o), name, __version__, True
return "", "", 0, False
# the deserializer sanitizes the data for you, so you do not need to deserialize values yourself
def deserialize(classname: str, version: int, data: object) -> Decimal:
# always check version compatibility
if version > __version__:
raise TypeError(f"serialized {version} of {classname} > {__version__}")
if classname != qualname(Decimal):
raise TypeError(f"{classname} != {qualname(Decimal)}")
return Decimal(str(data))