Apache Spark 算子¶
先决条件¶
要使用
SparkSubmitOperator
,您必须配置 Spark 连接。要使用
SparkJDBCOperator
,您必须同时配置 Spark 连接 和 JDBC 连接。SparkSqlOperator
从算子参数获取所有配置。
SparkJDBCOperator¶
在 Apache Spark 服务器上启动应用程序,它使用 SparkSubmitOperator
执行与基于 JDBC 的数据库之间的数据传输。
有关参数定义,请查看 SparkJDBCOperator
。
使用算子¶
使用 cmd_type
参数,可以将数据从 Spark 传输到数据库 (spark_to_jdbc
) 或从数据库传输到 Spark (jdbc_to_spark
),这将使用 Spark 命令 saveAsTable
写入表。
jdbc_to_spark_job = SparkJDBCOperator(
cmd_type="jdbc_to_spark",
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="overwrite",
save_format="JSON",
task_id="jdbc_to_spark_job",
)
spark_to_jdbc_job = SparkJDBCOperator(
cmd_type="spark_to_jdbc",
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="append",
task_id="spark_to_jdbc_job",
)
参考¶
有关更多信息,请查看 Apache Spark DataFrameWriter 文档。
SparkSqlOperator¶
在 Apache Spark 服务器上启动应用程序,它要求 spark-sql
脚本位于 PATH 中。该算子将在 Spark Hive 元存储服务上运行 SQL 查询,sql
参数可以使用模板,并且可以是 .sql
或 .hql
文件。
有关参数定义,请查看 SparkSqlOperator
。
使用算子¶
spark_sql_job = SparkSqlOperator(
sql="SELECT COUNT(1) as cnt FROM temp_table", master="local", task_id="spark_sql_job"
)
参考¶
有关更多信息,请查看 运行 Spark SQL CLI。
SparkSubmitOperator¶
在 Apache Spark 服务器上启动应用程序,它使用 spark-submit
脚本,该脚本负责使用 Spark 及其依赖项设置类路径,并且可以支持 Spark 支持的不同集群管理器和部署模式。
有关参数定义,请查看 SparkSubmitOperator
。
使用算子¶
submit_job = SparkSubmitOperator(
application="${SPARK_HOME}/examples/src/main/python/pi.py", task_id="submit_job"
)
参考¶
有关更多信息,请查看 Apache Spark 提交应用程序。