对象存储¶
2.8.0 版本的新功能。
这是一个 实验性功能。
所有主要的云提供商都在对象存储中提供持久数据存储。这些不是经典的“POSIX”文件系统。为了在没有任何单点故障的情况下存储数百 PB 的数据,对象存储用更简单的对象名称 => 数据模型取代了经典的文件系统目录树。为了启用远程访问,对对象的访问操作通常作为(缓慢的)HTTP REST 操作提供。
Airflow 在对象存储(如 s3、gcs 和 azure blob 存储)之上提供了一个通用的抽象层。这种抽象允许您在 DAG 中使用各种对象存储系统,而无需更改代码来处理每个不同的对象存储系统。此外,它还允许您使用大多数标准 Python 模块,例如可以处理类文件对象的 shutil
。
对特定对象存储系统的支持取决于您安装的提供程序。例如,如果您安装了 apache-airflow-providers-google
提供程序,您将能够将 gcs
方案用于对象存储。开箱即用,Airflow 提供对 file
方案的支持。
注意
对 s3 的支持要求您安装 apache-airflow-providers-amazon[s3fs]
。这是因为它依赖于 aiobotocore
,而默认情况下不安装它,因为它可能会与 botocore
产生依赖关系冲突。
云对象存储不是真正的文件系统¶
对象存储不是真正的文件系统,尽管它们看起来像。它们不支持真实文件系统所做的所有操作。主要区别在于
不保证原子重命名操作。这意味着,如果您将文件从一个位置移动到另一个位置,它将被复制然后删除。如果复制失败,您将丢失该文件。
目录是模拟的,使用它们可能会很慢。例如,列出目录可能需要列出存储桶中的所有对象并按前缀过滤它们。
在文件中查找可能需要大量的调用开销,从而影响性能,或者根本不支持。
Airflow 依靠 fsspec 在不同的对象存储系统中提供一致的体验。它实现了本地文件缓存以加快访问速度。但是,在设计 DAG 时,您应该注意对象存储的局限性。
基本用法¶
要使用对象存储,您需要使用要交互的对象的 URI 实例化一个 Path(见下文)对象。例如,要指向 s3 中的存储桶,您可以执行以下操作
from airflow.io.path import ObjectStoragePath
base = ObjectStoragePath("s3://aws_default@my-bucket/")
URI 的用户名部分表示 Airflow 连接 ID,并且是可选的。它也可以作为单独的关键字参数传入
# Equivalent to the previous example.
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")
列出文件对象
@task
def list_files() -> list[ObjectStoragePath]:
files = [f for f in base.iterdir() if f.is_file()]
return files
在目录树中导航
base = ObjectStoragePath("s3://my-bucket/")
subdir = base / "subdir"
# prints ObjectStoragePath("s3://my-bucket/subdir")
print(subdir)
打开文件
@task
def read_file(path: ObjectStoragePath) -> str:
with path.open() as f:
return f.read()
利用 XCOM,您可以在任务之间传递路径
@task
def create(path: ObjectStoragePath) -> ObjectStoragePath:
return path / "new_file.txt"
@task
def write_file(path: ObjectStoragePath, content: str):
with path.open("wb") as f:
f.write(content)
new_file = create(base)
write = write_file(new_file, b"data")
read >> write
配置¶
在其基本用法中,对象存储抽象不需要太多配置,并且依赖于标准的 Airflow 连接机制。这意味着您可以使用 conn_id
参数来指定要使用的连接。连接的任何设置都会被推送到底层实现。例如,如果您使用的是 s3,则可以指定 aws_access_key_id
和 aws_secret_access_key
,但也可以添加额外的参数,例如 endpoint_url
来指定自定义端点。
备用后端¶
可以为方案或协议配置备用后端。这是通过将 backend
附加到方案来完成的。例如,要为 dbfs
方案启用 databricks 后端,您可以执行以下操作
from airflow.io.path import ObjectStoragePath
from airflow.io.store import attach
from fsspec.implementations.dbfs import DBFSFileSystem
attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken"))
base = ObjectStoragePath("dbfs://my-location/")
注意
要在任务之间重复使用注册,请确保将后端附加到 DAG 的顶层。否则,后端将无法跨多个任务使用。
路径 API¶
对象存储抽象是作为 路径 API 实现的,并建立在 通用路径库 之上。这意味着您可以像使用本地文件系统一样使用相同的 API 来与对象存储进行交互。在本节中,我们仅列出两个 API 之间的差异。标准路径 API 之外的扩展操作(例如复制和移动)将在下一节中列出。有关每个操作的详细信息(例如它们采用的参数),请参阅 ObjectStoragePath
类的文档。
mkdir¶
在指定路径或存储桶/容器内创建目录条目。对于没有真正目录的系统,它可能仅为此实例创建一个目录条目,而不会影响真正的文件系统。
如果 parents
为 True
,则会根据需要创建此路径的任何缺失父级。
touch¶
在此给定路径创建文件,或更新时间戳。如果 truncate
为 True
,则文件将被截断,这是默认行为。如果文件已存在,则如果 exists_ok
为 true(并且其修改时间更新为当前时间),则该函数将成功,否则将引发 FileExistsError
。
stat¶
返回一个类似 stat_result
的对象,该对象支持以下属性:st_size
、st_mtime
、st_mode
,但也像字典一样可以提供有关该对象的附加元数据。例如,对于 s3,它将返回其他键,例如:['ETag', 'ContentType']
。如果您的代码需要在不同的对象存储之间移植,请不要依赖扩展元数据。
扩展¶
以下操作不是标准路径 API 的一部分,但受对象存储抽象的支持。
bucket¶
返回存储桶名称。
checksum¶
返回文件的校验和。
container¶
bucket 的别名
fs¶
用于访问实例化文件系统的便捷属性
key¶
返回对象键。
namespace¶
返回对象的命名空间。通常,这是协议,例如带有存储桶名称的 s3://
。
path¶
与文件系统实例一起使用的 fsspec
兼容路径
protocol¶
filesystem_spec 协议。
read_block¶
从此给定路径的文件中读取一个字节块。
从文件的偏移量开始,读取长度字节。如果设置了分隔符,则我们确保读取在偏移量和偏移量 + 长度之后的分隔符边界处开始和停止。如果偏移量为零,则我们从零开始。返回的字节串将包含结束分隔符字符串。
如果偏移量 + 长度超出文件末尾,则读取到文件末尾。
sign¶
创建表示给定路径的签名 URL。某些实现允许生成临时 URL,作为一种委托凭证的方式。
size¶
返回给定路径的文件大小(以字节为单位)。
storage_options¶
用于实例化底层文件系统的存储选项。
ukey¶
文件属性的哈希值,用于判断它是否已更改。
复制和移动¶
本文档描述了 copy
和 move
操作的预期行为,特别是跨对象存储(例如,文件 -> s3)的行为。每个方法都将文件或目录从 source
复制或移动到 target
位置。预期行为与 fsspec
中指定的相同。对于跨对象存储的目录复制,Airflow 需要遍历目录树并分别复制每个文件。这是通过将每个文件从源流式传输到目标来完成的。
外部集成¶
许多其他项目,如 DuckDB、Apache Iceberg 等,都可以利用对象存储抽象。这通常是通过传递底层的 fsspec
实现来完成的。为此,ObjectStoragePath
公开了 fs
属性。例如,以下代码适用于 duckdb
,以便使用来自 Airflow 的连接详细信息连接到 s3,并读取由 ObjectStoragePath
指示的 parquet 文件
import duckdb
from airflow.io.path import ObjectStoragePath
path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")