对象存储

本教程介绍如何使用对象存储 API 来管理驻留在对象存储(如 S3、gcs 和 azure blob 存储)上的对象。该 API 作为 Airflow 2.8 的一部分引入。

本教程涵盖了数据工程和数据科学工作流程中经常使用的一种简单模式:访问 Web api、保存和分析结果。

先决条件

要完成本教程,您需要准备以下内容

  • DuckDB,一个进程内分析数据库,可以通过运行 pip install duckdb 来安装。

  • 一个 S3 存储桶,以及包含 s3fs 的 Amazon 提供程序。您可以通过运行 pip install apache-airflow-providers-amazon[s3fs] 来安装提供程序包。或者,您可以通过将 create_object_storage_path 函数中的 URL 更改为您的提供程序的相应 URL 来使用其他存储提供程序,例如,将 s3:// 替换为 Google Cloud Storage 的 gs://,并安装其他提供程序。

  • pandas,您可以通过运行 pip install pandas 来安装。

创建 ObjectStoragePath

ObjectStoragePath 是一个类似于路径的对象,表示对象存储上的路径。它是对象存储 API 的基本构建块。

airflow/example_dags/tutorial_objectstorage.py[源代码]

base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")

提供给 ObjectStoragePath 的 URL 的用户名部分应该是连接 ID。指定的连接将用于获取访问后端的正确凭据。如果省略,则将使用后端的默认连接。

连接 ID 也可以通过关键字参数传递

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

当重用为其他目的(例如数据集)定义的 URL 时,这很有用,该 URL 通常不包含用户名部分。如果同时指定了显式关键字参数和 URL 的用户名值,则前者优先。

在 DAG 的根目录下实例化 ObjectStoragePath 是安全的。在使用路径之前,不会创建连接。这意味着您可以在 DAG 的全局范围内创建路径,并在多个任务中使用它。

将数据保存到对象存储

ObjectStoragePath 的行为与 pathlib.Path 对象非常相似。您可以使用它直接将数据保存到对象存储并从对象存储加载数据。因此,典型的流程可能如下所示

airflow/example_dags/tutorial_objectstorage.py[源代码]

    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path

get_air_quality_data 调用芬兰气象研究所的 API 来获取赫尔辛基地区的空气质量数据。它根据生成的 json 创建一个 Pandas DataFrame。然后,它将数据保存到对象存储,并将其动态转换为 parquet 格式。

对象的键是根据任务的逻辑日期自动生成的,因此我们可以每天运行它,它会为每天创建一个新对象。我们将此键与基本路径连接起来,以创建对象的完整路径。最后,在将对象写入存储后,我们返回对象的路径。这允许我们在下一个任务中使用该路径。

分析数据

在理解数据时,您通常希望对其进行分析。Duck DB 是一个很好的工具。它是一个进程内分析数据库,允许您对内存中的数据运行 SQL 查询。

因为数据已经是 parquet 格式,我们可以使用 read_parquet,并且因为 Duck DB 和 ObjectStoragePath 都使用 fsspec,所以我们可以使用 Duck DB 注册 ObjectStoragePath 的后端。ObjectStoragePath 为此公开了 fs 属性。然后,我们可以使用 Duck DB 中的 register_filesystem 函数向 Duck DB 注册后端。

在 Duck DB 中,我们可以从数据创建表并对其运行查询。查询以数据帧的形式返回,可以用于进一步分析或保存到对象存储。

airflow/example_dags/tutorial_objectstorage.py[源代码]

    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())

您可能会注意到,analyze 函数不知道对象的原始路径,而是将其作为参数传递并通过 XCom 获取。您不需要重新实例化 Path 对象。此外,连接详细信息也会透明地处理。

整合所有内容

最终的 DAG 如下所示,它包装了所有内容,以便我们可以运行它

airflow/example_dags/tutorial_objectstorage.py[源代码]


import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
    "fmisid": "int32",
    "time": "datetime64[ns]",
    "AQINDEX_PT1H_avg": "float64",
    "PM10_PT1H_avg": "float64",
    "PM25_PT1H_avg": "float64",
    "O3_PT1H_avg": "float64",
    "CO_PT1H_avg": "float64",
    "SO2_PT1H_avg": "float64",
    "NO2_PT1H_avg": "float64",
    "TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_objectstorage():
    """
    ### Object Storage Tutorial Documentation
    This is a tutorial DAG to showcase the usage of the Object Storage API.
    Documentation that goes along with the Airflow Object Storage tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
    """
    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        execution_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": execution_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = execution_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path
    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())
    obj_path = get_air_quality_data()
    analyze(obj_path)
tutorial_objectstorage()

此条目有帮助吗?