Apache Spark 算子

先决条件

SparkJDBCOperator

在 Apache Spark 服务器上启动应用程序,它使用 SparkSubmitOperator 执行与基于 JDBC 的数据库之间的数据传输。

有关参数定义,请查看 SparkJDBCOperator

使用算子

使用 cmd_type 参数,可以从 Spark 向数据库传输数据 (spark_to_jdbc) 或从数据库向 Spark 传输数据 (jdbc_to_spark),这将使用 Spark 命令 saveAsTable 写入表。

tests/system/apache/spark/example_spark_dag.py[源代码]

jdbc_to_spark_job = SparkJDBCOperator(
    cmd_type="jdbc_to_spark",
    jdbc_table="foo",
    spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
    jdbc_driver="org.postgresql.Driver",
    metastore_table="bar",
    save_mode="overwrite",
    save_format="JSON",
    task_id="jdbc_to_spark_job",
)

spark_to_jdbc_job = SparkJDBCOperator(
    cmd_type="spark_to_jdbc",
    jdbc_table="foo",
    spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
    jdbc_driver="org.postgresql.Driver",
    metastore_table="bar",
    save_mode="append",
    task_id="spark_to_jdbc_job",
)

参考

有关更多信息,请查看 Apache Spark DataFrameWriter 文档

SparkSqlOperator

在 Apache Spark 服务器上启动应用程序,它要求 spark-sql 脚本位于 PATH 中。该算子将在 Spark Hive 元数据服务上运行 SQL 查询,sql 参数可以是模板化的,并且可以是 .sql.hql 文件。

有关参数定义,请查看 SparkSqlOperator

使用算子

tests/system/apache/spark/example_spark_dag.py[源代码]

spark_sql_job = SparkSqlOperator(
    sql="SELECT COUNT(1) as cnt FROM temp_table", master="local", task_id="spark_sql_job"
)

参考

有关更多信息,请查看 运行 Spark SQL CLI

SparkSubmitOperator

在 Apache Spark 服务器上启动应用程序,它使用 spark-submit 脚本,该脚本负责设置 Spark 及其依赖项的类路径,并且可以支持 Spark 支持的不同集群管理器和部署模式。

有关参数定义,请查看 SparkSubmitOperator

使用算子

tests/system/apache/spark/example_spark_dag.py[源代码]

submit_job = SparkSubmitOperator(
    application="${SPARK_HOME}/examples/src/main/python/pi.py", task_id="submit_job"
)

参考

有关更多信息,请查看 Apache Spark 提交应用程序

此条目是否有帮助?