@task.snowpark

使用 @task.snowparkSnowpark Python 代码在 Snowflake 数据库中运行。

警告

  • Snowpark 尚不支持 Python 3.12。

  • 目前,此装饰器不支持 Snowpark pandas API,因为 Airflow 中使用了冲突的 pandas 版本。 请考虑将 Snowpark pandas API 与其他 Snowpark 装饰器或操作符一起使用。

先决条件任务

要使用此装饰器,您必须执行以下几项操作

使用操作符

使用 snowflake_conn_id 参数指定使用的连接。 如果未指定,则将使用 snowflake_default

以下是 @task.snowpark 的示例用法

tests/system/snowflake/example_snowpark_decorator.py[源代码]

    @task.snowpark
    def setup_data(session: Session):
        # The Snowpark session object is injected as an argument
        data = [
            (1, 0, 5, "Product 1", "prod-1", 1, 10),
            (2, 1, 5, "Product 1A", "prod-1-A", 1, 20),
            (3, 1, 5, "Product 1B", "prod-1-B", 1, 30),
            (4, 0, 10, "Product 2", "prod-2", 2, 40),
            (5, 4, 10, "Product 2A", "prod-2-A", 2, 50),
            (6, 4, 10, "Product 2B", "prod-2-B", 2, 60),
            (7, 0, 20, "Product 3", "prod-3", 3, 70),
            (8, 7, 20, "Product 3A", "prod-3-A", 3, 80),
            (9, 7, 20, "Product 3B", "prod-3-B", 3, 90),
            (10, 0, 50, "Product 4", "prod-4", 4, 100),
            (11, 10, 50, "Product 4A", "prod-4-A", 4, 100),
            (12, 10, 50, "Product 4B", "prod-4-B", 4, 100),
        ]
        columns = ["id", "parent_id", "category_id", "name", "serial_number", "key", "3rd"]
        df = session.create_dataframe(data, schema=columns)
        table_name = "sample_product_data"
        df.write.save_as_table(table_name, mode="overwrite")
        return table_name

    table_name = setup_data()  # type: ignore[call-arg, misc]

    @task.snowpark
    def check_num_rows(table_name: str):
        # Alternatively, retrieve the Snowpark session object using `get_active_session`
        from snowflake.snowpark.context import get_active_session

        session = get_active_session()
        df = session.table(table_name)
        assert df.count() == 12

    check_num_rows(table_name)

正如示例所示,在您的 Python 函数中使用 Snowpark 会话对象有两种方法

  • 将 Snowpark 会话对象作为名为 session 的关键字参数传递给函数。 Snowpark 会话将自动注入到函数中,使您可以像平常一样使用它。

  • 使用 Snowpark 中的 get_active_session 函数来检索函数内部的 Snowpark 会话对象。

注意

可以传递到装饰器的参数将优先于 Airflow 连接元数据中已给定的参数(例如 schemaroledatabase 等)。

此条目是否有帮助?