数据加载与保存-通用方式‌ 使用df.write.save方法保存数据,同样可通过format指定数据类型。 save方法后需传入保存路径(针对csv、orc、parquet、textFile格式)。 option方法用于设置特定格式的参数。 保存操作可使用SaveMode来指明如何处理数据,如覆盖(overwrite)、追加(append)等,通过mode方法设置。 特定格式保存‌ 与加载类似,Parquet、JSON、CSV等格式均可通过指定format进行保存。 MySQL等关系型数据库的写入也通过JDBC实现,需指定format为jdbc,并传入数据库连接信息及表名。 注意事项

时间:2025-04-17 17:24:29

在处理JSON数据时,需确保文件格式符合Spark的要求,即每行一个JSON串。

在读取CSV文件时,可通过设置option来指定分隔符、是否推断schema等信息,以便正确解析文件内容。

在通过JDBC连接数据库时,需确保数据库驱动已正确导入,并正确配置数据库连接信息。

在保存数据时,需根据实际需求选择合适的SaveMode,以避免数据覆盖或丢失。

Spark SQLHive的集成

Spark SQL可以编译时包含Hive支持,从而提供对Hive表访问、UDF(用户自定义函数)、Hive查询语言(HQL)等特性的支持。在使用时,无需事先安装Hive,但最好在编译Spark SQL时引入Hive支持。

IDEA通过JDBC对MySQL进行操作:

读取数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")

val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._

//通用的load方式读取

spark.read.format("jdbc")

  .option("url","jdbc:mysql://localhost:3306/system")

  .option("driver","com.mysql.jdbc.Driver")//com.mysql.cj.jdbc.Driver

  .option("user","root")

  .option("password","123456")

  .option("dbtable","user")

  .load().show()

spark.stop()

//通用的load方法的另一种形式

spark.read.format("jdbc")

  .options(

    Map("url"->"jdbc:mysql://localhost:3306/system?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver"))

  .load().show()

//通过JDBC

val pros :Properties = new Properties()

pros.setProperty("user","root")

pros.setProperty("password","123456")

val df :DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/system","user",pros)

df.show()

 写入数据

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQL")

val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._

val rdd: RDD[Stu] = spark.sparkContext.makeRDD(List(Stu("lisi", 20),

  Stu("zs", 30)))

val ds:Dataset[Stu] = rdd.toDS()

ds.write.format("jdbc")

  .option("url","jdbc:mysql://localhost:3306/system")

  .option("driver","com.mysql.jdbc.Driver")

  .option("user","root")

  .option("password","123456")

  .option("dbtable","user2")

  .mode(SaveMode.Append)

  .save()

spark.stop()