对象存储¶
2.8.0 版本新增。
这是一个 实验性功能。
所有主要的云提供商都在对象存储中提供持久数据存储。这些不是经典的 “POSIX” 文件系统。为了存储数百 PB 的数据而没有任何单点故障,对象存储将经典文件系统目录树替换为更简单的对象名称 => 数据模型。为了实现远程访问,对对象的操作通常以(缓慢的)HTTP REST 操作的形式提供。
Airflow 在对象存储之上提供了一个通用抽象,例如 s3、gcs 和 Azure Blob 存储。此抽象允许您在 DAG 中使用各种对象存储系统,而无需更改代码来处理每个不同的对象存储系统。此外,它还允许您使用大多数标准 Python 模块,例如可以处理类文件对象的 shutil
。
对特定对象存储系统的支持取决于您安装的提供程序。例如,如果您安装了 apache-airflow-providers-google
提供程序,您将可以使用 gcs
方案进行对象存储。开箱即用,Airflow 提供对 file
方案的支持。
注意
对 s3 的支持需要您安装 apache-airflow-providers-amazon[s3fs]
。这是因为它依赖于 aiobotocore
,默认情况下不安装该库,因为它可能会在 botocore
中造成依赖项挑战。
云对象存储不是真正的文件系统¶
对象存储不是真正的文件系统,尽管它们看起来是这样。它们不支持真实文件系统执行的所有操作。主要区别是
不保证原子重命名操作。这意味着如果您将文件从一个位置移动到另一个位置,它将被复制然后删除。如果复制失败,您将丢失该文件。
目录是模拟的,可能会使处理它们的速度变慢。例如,列出目录可能需要列出存储桶中的所有对象,并按前缀进行筛选。
在文件中查找可能需要大量的调用开销,从而影响性能,或者可能根本不支持。
Airflow 依赖于 fsspec 来提供跨不同对象存储系统的一致体验。它实现了本地文件缓存以加快访问速度。但是,在设计 DAG 时,您应该了解对象存储的局限性。
基本用法¶
要使用对象存储,您需要使用要与之交互的对象的 URI 实例化一个 Path(见下文)对象。例如,要指向 s3 中的存储桶,您将执行以下操作
from airflow.io.path import ObjectStoragePath
base = ObjectStoragePath("s3://aws_default@my-bucket/")
URI 的用户名部分表示 Airflow 连接 ID,是可选的。它也可以作为单独的关键字参数传入
# Equivalent to the previous example.
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")
列出文件对象
@task
def list_files() -> list[ObjectStoragePath]:
files = [f for f in base.iterdir() if f.is_file()]
return files
在目录树中导航
base = ObjectStoragePath("s3://my-bucket/")
subdir = base / "subdir"
# prints ObjectStoragePath("s3://my-bucket/subdir")
print(subdir)
打开文件
@task
def read_file(path: ObjectStoragePath) -> str:
with path.open() as f:
return f.read()
利用 XCOM,您可以在任务之间传递路径
@task
def create(path: ObjectStoragePath) -> ObjectStoragePath:
return path / "new_file.txt"
@task
def write_file(path: ObjectStoragePath, content: str):
with path.open("wb") as f:
f.write(content)
new_file = create(base)
write = write_file(new_file, b"data")
read >> write
配置¶
在其基本用法中,对象存储抽象不需要太多配置,并且依赖于标准的 Airflow 连接机制。这意味着您可以使用 conn_id
参数来指定要使用的连接。连接的任何设置都会下推到基础实现。例如,如果您使用 s3,您可以指定 aws_access_key_id
和 aws_secret_access_key
,还可以添加额外的参数,例如 endpoint_url
来指定自定义端点。
替代后端¶
可以为方案或协议配置替代后端。这是通过将 backend
附加到方案来完成的。例如,要为 dbfs
方案启用 databricks 后端,您将执行以下操作
from airflow.io.path import ObjectStoragePath
from airflow.io.store import attach
from fsspec.implementations.dbfs import DBFSFileSystem
attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken"))
base = ObjectStoragePath("dbfs://my-location/")
注意
要跨任务重用注册,请确保在 DAG 的顶层附加后端。否则,该后端将无法在多个任务之间使用。
路径 API¶
对象存储抽象实现为 Path API,并基于 Universal Pathlib 构建。这意味着您可以使用几乎相同的 API 与对象存储进行交互,就像使用本地文件系统一样。在本节中,我们仅列出两个 API 之间的差异。超出标准 Path API 的扩展操作(例如复制和移动)将在下一节中列出。有关每个操作的详细信息(例如它们采用的参数),请参阅 ObjectStoragePath
类的文档。
mkdir¶
在指定路径或存储桶/容器中创建目录条目。对于没有真正目录的系统,它可能仅为此实例创建目录条目,而不影响真实的文件系统。
如果 parents
为 True
,则根据需要创建此路径的任何缺失父级。
touch¶
在此给定路径创建文件或更新时间戳。如果 truncate
为 True
,则会截断该文件,这是默认设置。如果文件已存在,则如果 exists_ok
为 true,则函数成功(并且其修改时间会更新为当前时间),否则会引发 FileExistsError
。
stat¶
返回一个类似于 stat_result
的对象,该对象支持以下属性:st_size
、st_mtime
、st_mode
,但也充当一个字典,可以提供有关对象的其他元数据。例如,对于 s3,它将返回其他键,例如:['ETag', 'ContentType']
。如果您的代码需要在不同的对象存储之间移植,请不要依赖扩展的元数据。
扩展¶
以下操作不是标准 Path API 的一部分,但对象存储抽象支持这些操作。
bucket¶
返回存储桶名称。
checksum¶
返回文件的校验和。
container¶
bucket 的别名
fs¶
用于访问实例化的文件系统的便捷属性
key¶
返回对象键。
namespace¶
返回对象的命名空间。通常这是协议,例如带有存储桶名称的 s3://
。
path¶
与 fsspec
兼容的路径,用于文件系统实例
protocol¶
filesystem_spec 协议。
read_block¶
从指定路径的文件中读取一个字节块。
从文件的偏移量位置开始,读取长度为 length 的字节。如果设置了分隔符,则确保读取操作从偏移量和偏移量 + length 位置之后的分隔符边界开始和结束。如果偏移量为零,则从零开始。返回的字节串将包含末尾分隔符字符串。
如果偏移量 + length 超出文件末尾 (eof),则读取到 eof。
sign¶
创建一个表示给定路径的签名 URL。某些实现允许生成临时 URL,作为委托凭据的一种方式。
size¶
返回指定路径的文件大小(以字节为单位)。
storage_options¶
用于实例化底层文件系统的存储选项。
ukey¶
文件属性的哈希值,用于判断文件是否已更改。
复制和移动¶
本文档介绍了 copy
和 move
操作的预期行为,特别是跨对象存储(例如,文件 -> s3)的行为。每个方法将文件或目录从 source
复制或移动到 target
位置。预期行为与 fsspec
指定的行为相同。对于跨对象存储目录复制,Airflow 需要遍历目录树并单独复制每个文件。这是通过将每个文件从源流式传输到目标来完成的。
外部集成¶
许多其他项目,如 DuckDB、Apache Iceberg 等,都可以使用对象存储抽象。通常,这是通过传递底层的 fsspec
实现来完成的。为此,ObjectStoragePath
暴露了 fs
属性。例如,以下代码与 duckdb
一起使用,以便使用来自 Airflow 的连接详细信息连接到 s3,并读取由 ObjectStoragePath
指示的 parquet 文件
import duckdb
from airflow.io.path import ObjectStoragePath
path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")