@task.snowpark
¶
使用 @task.snowpark
在 Snowpark Python 代码在 Snowflake 数据库中运行。
警告
Snowpark 尚不支持 Python 3.12。
目前,此装饰器不支持 Snowpark pandas API,因为 Airflow 中使用了冲突的 pandas 版本。 请考虑将 Snowpark pandas API 与其他 Snowpark 装饰器或操作符一起使用。
先决条件任务¶
要使用此装饰器,您必须执行以下几项操作
通过 pip 安装提供程序包。
pip install 'apache-airflow-providers-snowflake'安装提供了详细信息。
使用操作符¶
使用 snowflake_conn_id
参数指定使用的连接。 如果未指定,则将使用 snowflake_default
。
以下是 @task.snowpark
的示例用法
@task.snowpark
def setup_data(session: Session):
# The Snowpark session object is injected as an argument
data = [
(1, 0, 5, "Product 1", "prod-1", 1, 10),
(2, 1, 5, "Product 1A", "prod-1-A", 1, 20),
(3, 1, 5, "Product 1B", "prod-1-B", 1, 30),
(4, 0, 10, "Product 2", "prod-2", 2, 40),
(5, 4, 10, "Product 2A", "prod-2-A", 2, 50),
(6, 4, 10, "Product 2B", "prod-2-B", 2, 60),
(7, 0, 20, "Product 3", "prod-3", 3, 70),
(8, 7, 20, "Product 3A", "prod-3-A", 3, 80),
(9, 7, 20, "Product 3B", "prod-3-B", 3, 90),
(10, 0, 50, "Product 4", "prod-4", 4, 100),
(11, 10, 50, "Product 4A", "prod-4-A", 4, 100),
(12, 10, 50, "Product 4B", "prod-4-B", 4, 100),
]
columns = ["id", "parent_id", "category_id", "name", "serial_number", "key", "3rd"]
df = session.create_dataframe(data, schema=columns)
table_name = "sample_product_data"
df.write.save_as_table(table_name, mode="overwrite")
return table_name
table_name = setup_data() # type: ignore[call-arg, misc]
@task.snowpark
def check_num_rows(table_name: str):
# Alternatively, retrieve the Snowpark session object using `get_active_session`
from snowflake.snowpark.context import get_active_session
session = get_active_session()
df = session.table(table_name)
assert df.count() == 12
check_num_rows(table_name)
正如示例所示,在您的 Python 函数中使用 Snowpark 会话对象有两种方法
将 Snowpark 会话对象作为名为
session
的关键字参数传递给函数。 Snowpark 会话将自动注入到函数中,使您可以像平常一样使用它。使用 Snowpark 中的 get_active_session 函数来检索函数内部的 Snowpark 会话对象。
注意
可以传递到装饰器的参数将优先于 Airflow 连接元数据中已给定的参数(例如 schema
、role
、database
等)。