对象存储¶
本教程演示如何使用对象存储 API 来管理位于对象存储上的对象,例如 S3、GCS 和 Azure Blob 存储。该 API 作为 Airflow 2.8 的一部分引入。
本教程涵盖了数据工程和数据科学工作流程中常用的简单模式:访问 Web API,保存并分析结果。
先决条件¶
要完成本教程,您需要以下几项:
DuckDB,一个进程内分析数据库,可以通过运行
pip install duckdb
进行安装。一个 S3 存储桶,以及包含
s3fs
的 Amazon 提供程序。您可以通过运行pip install apache-airflow-providers-amazon[s3fs]
来安装提供程序包。或者,您可以通过在create_object_storage_path
函数中将 URL 更改为您提供程序的相应 URL,来使用不同的存储提供程序,例如将s3://
替换为 Google Cloud Storage 的gs://
,并安装不同的提供程序。pandas
,您可以通过运行pip install pandas
来安装。
创建 ObjectStoragePath¶
ObjectStoragePath 是一个类似路径的对象,表示对象存储上的路径。它是对象存储 API 的基本构建块。
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
提供给 ObjectStoragePath 的 URL 的用户名部分应该是连接 ID。指定的连接将用于获取访问后端的正确凭据。如果省略,将使用后端的默认连接。
连接 ID 也可以通过关键字参数传入
ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
当重用为其他目的定义的 URL(例如数据集)时,这很有用,这些 URL 通常不包含用户名部分。如果同时指定了 URL 的用户名值和显式关键字参数,则显式关键字参数优先。
在 DAG 的根目录中实例化 ObjectStoragePath 是安全的。在路径被使用之前,不会创建连接。这意味着您可以在 DAG 的全局范围内创建路径,并在多个任务中使用它。
将数据保存到对象存储¶
ObjectStoragePath 的行为方式与 pathlib.Path 对象类似。您可以使用它直接将数据保存到对象存储并从中加载数据。因此,典型的流程可能如下所示:
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
import pandas as pd
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]
params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}
response = requests.get(API, params=params)
response.raise_for_status()
# ensure the bucket exists
base.mkdir(exist_ok=True)
formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"
df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)
return path
get_air_quality_data
调用芬兰气象研究所的 API,以获取赫尔辛基地区的空气质量数据。它从生成的 JSON 创建 Pandas DataFrame。然后,它将数据保存到对象存储,并在运行时将其转换为 parquet 格式。
对象的键是从任务的逻辑日期自动生成的,因此我们可以每天运行此任务,并且每天都会创建一个新对象。我们将此键与基本路径连接起来,以创建对象的完整路径。最后,在将对象写入存储后,我们返回对象的路径。这允许我们在下一个任务中使用该路径。
分析数据¶
在理解数据的过程中,您通常需要对其进行分析。Duck DB 是一个很好的工具。它是一个进程内分析数据库,允许您在内存中的数据上运行 SQL 查询。
由于数据已经是 parquet 格式,我们可以使用 read_parquet
,并且由于 Duck DB 和 ObjectStoragePath 都使用 fsspec
,我们可以将 ObjectStoragePath 的后端注册到 Duck DB。ObjectStoragePath 公开了 fs
属性以实现此目的。然后,我们可以使用 Duck DB 中的 register_filesystem
函数将后端注册到 Duck DB。
在 Duck DB 中,我们可以根据数据创建表,并在其上运行查询。查询将以 dataframe 的形式返回,该 dataframe 可用于进一步分析或保存到对象存储。
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
import duckdb
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")
df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()
print(df2.head())
您可能会注意到,analyze
函数不知道对象的原始路径,但它是作为参数传入的并通过 XCom 获取的。您不需要重新实例化 Path 对象。连接详细信息也会透明地处理。
整合所有内容¶
最终的 DAG 如下所示,它包装了一些东西,以便我们可以运行它
import pendulum
import requests
from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath
API = "https://opendata.fmi.fi/timeseries"
aq_fields = {
"fmisid": "int32",
"time": "datetime64[ns]",
"AQINDEX_PT1H_avg": "float64",
"PM10_PT1H_avg": "float64",
"PM25_PT1H_avg": "float64",
"O3_PT1H_avg": "float64",
"CO_PT1H_avg": "float64",
"SO2_PT1H_avg": "float64",
"NO2_PT1H_avg": "float64",
"TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_objectstorage():
"""
### Object Storage Tutorial Documentation
This is a tutorial DAG to showcase the usage of the Object Storage API.
Documentation that goes along with the Airflow Object Storage tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
"""
@task
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
"""
#### Get Air Quality Data
This task gets air quality data from the Finnish Meteorological Institute's
open data API. The data is saved as parquet.
"""
import pandas as pd
execution_date = kwargs["logical_date"]
start_time = kwargs["data_interval_start"]
params = {
"format": "json",
"precision": "double",
"groupareas": "0",
"producer": "airquality_urban",
"area": "Uusimaa",
"param": ",".join(aq_fields.keys()),
"starttime": start_time.isoformat(timespec="seconds"),
"endtime": execution_date.isoformat(timespec="seconds"),
"tz": "UTC",
}
response = requests.get(API, params=params)
response.raise_for_status()
# ensure the bucket exists
base.mkdir(exist_ok=True)
formatted_date = execution_date.format("YYYYMMDD")
path = base / f"air_quality_{formatted_date}.parquet"
df = pd.DataFrame(response.json()).astype(aq_fields)
with path.open("wb") as file:
df.to_parquet(file)
return path
@task
def analyze(path: ObjectStoragePath, **kwargs):
"""
#### Analyze
This task analyzes the air quality data, prints the results
"""
import duckdb
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")
df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()
print(df2.head())
obj_path = get_air_quality_data()
analyze(obj_path)
tutorial_objectstorage()