PySpark 装饰器

使用 @task.pyspark 装饰器包装的 Python 可调用对象会被注入 SparkSession 和 SparkContext 对象(如果可用)。

参数

可以将以下参数传递给装饰器

conn_id: str

用于连接 Spark 集群的连接 ID。如果未指定,则 Spark 主机设置为 local[*]

config_kwargs: dict

用于初始化 SparkConf 对象的 kwargs。这将覆盖连接中设置的 Spark 配置选项。

示例

以下示例显示了如何使用 @task.pyspark 装饰器。请注意,sparksc 对象被注入到函数中。

tests/system/providers/apache/spark/example_pyspark.py[源代码]

@task.pyspark(conn_id="spark-local")
def spark_task(spark: SparkSession, sc: SparkContext) -> pd.DataFrame:
    df = spark.createDataFrame(
        [
            (1, "John Doe", 21),
            (2, "Jane Doe", 22),
            (3, "Joe Bloggs", 23),
        ],
        ["id", "name", "age"],
    )
    df.show()

    return df.toPandas()

Spark Connect

Apache Spark 3.4 中,Spark Connect 引入了一种解耦的客户端-服务器架构,允许使用 DataFrame API 远程连接到 Spark 集群。使用 Spark Connect 是 Airflow 中使用 PySpark 装饰器的首选方式,因为它不需要在与 Airflow 相同的主机上运行 Spark 驱动程序。要使用 Spark Connect,请在主机 URL 前面加上 sc://。例如,sc://spark-cluster:15002

身份验证

Spark Connect 没有内置的身份验证。但是,gRPC HTTP/2 接口允许使用身份验证来通过身份验证代理与 Spark Connect 服务器进行通信。要使用身份验证,请确保创建 Spark Connect 连接并设置正确的凭据。

此条目有帮助吗?