前言
大家好,我是DJ丶小哪吒,我又来跟你们分享知识了。对软件开发有着浓厚的兴趣。喜欢与人分享知识。做博客的目的就是为了能与 他 人知识共享。由于水平有限。博客中难免会有一些错误。如有 纰漏 之处,欢迎大家在留言区指正。小编也会及时改正。
DJ丶小哪吒又来与各位分享知识了。今天我们不飙车,今天就静静的坐下来,我们来聊一聊关于sparkSQL。准备好茶水,听老朽与你娓娓道来。
Spark SQL是什么
Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。
1、使用IDEA开发Spark SQL
Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种:
- 第1种:指定列名添加Schema
- 第2种:通过StructType指定Schema
- 第3种:编写样例类,利用反射机制推断Schema
1.1、指定列名添加Schema
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDFDS {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master( "local[*]" ).appName( "SparkSQL" ).getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel( "WARN" )
//2.读取文件
val fileRDD: RDD[String] = sc.textFile( "D:\\data\\person.txt" )
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split( " " ))
val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line( 0 ).toInt,line( 1 ),line( 2 ).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
import spark.implicits._
val personDF: DataFrame = rowRDD.toDF( "id" , "name" , "age" )
personDF.show( 10 )
personDF.printSchema()
sc.stop()
spark.stop()
}
}
|
1.2、通过StructType指定Schema
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object CreateDFDS2 {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master( "local[*]" ).appName( "SparkSQL" ).getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel( "WARN" )
//2.读取文件
val fileRDD: RDD[String] = sc.textFile( "D:\\data\\person.txt" )
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split( " " ))
val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line( 0 ).toInt,line( 1 ),line( 2 ).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
//import spark.implicits._
val schema: StructType = StructType(Seq(
StructField( "id" , IntegerType, true ), //允许为空
StructField( "name" , StringType, true ),
StructField( "age" , IntegerType, true ))
)
val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
personDF.show( 10 )
personDF.printSchema()
sc.stop()
spark.stop()
}
}
|
1.3、反射推断Schema–掌握
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object CreateDFDS3 {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master( "local[*]" ).appName( "SparkSQL" )
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel( "WARN" )
//2.读取文件
val fileRDD: RDD[String] = sc.textFile( "D:\\data\\person.txt" )
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split( " " ))
val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line( 0 ).toInt,line( 1 ),line( 2 ).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
import spark.implicits._
//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
//所以SparkSQL可以通过反射自动获取到并添加给DF
val personDF: DataFrame = rowRDD.toDF
personDF.show( 10 )
personDF.printSchema()
sc.stop()
spark.stop()
}
}
|
1.4、花式查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object QueryDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master( "local[*]" ).appName( "SparkSQL" )
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel( "WARN" )
//2.读取文件
val fileRDD: RDD[String] = sc.textFile( "D:\\data\\person.txt" )
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split( " " ))
val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line( 0 ).toInt,line( 1 ),line( 2 ).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
import spark.implicits._
//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
//所以SparkSQL可以通过反射自动获取到并添加给DF
val personDF: DataFrame = rowRDD.toDF
personDF.show( 10 )
personDF.printSchema()
//=======================SQL方式查询=======================
//0.注册表
personDF.createOrReplaceTempView( "t_person" )
//1.查询所有数据
spark.sql( "select * from t_person" ).show()
//2.查询age+1
spark.sql( "select age,age+1 from t_person" ).show()
//3.查询age最大的两人
spark.sql( "select name,age from t_person order by age desc limit 2" ).show()
//4.查询各个年龄的人数
spark.sql( "select age,count(*) from t_person group by age" ).show()
//5.查询年龄大于30的
spark.sql( "select * from t_person where age > 30" ).show()
//=======================DSL方式查询=======================
//1.查询所有数据
personDF.select( "name" , "age" )
//2.查询age+1
personDF.select($ "name" ,$ "age" + 1 )
//3.查询age最大的两人
personDF.sort($ "age" .desc).show( 2 )
//4.查询各个年龄的人数
personDF.groupBy( "age" ).count().show()
//5.查询年龄大于30的
personDF.filter($ "age" > 30 ).show()
sc.stop()
spark.stop()
}
}
|
1.5、 相互转化
RDD、DF、DS之间的相互转换有很多(6种),但是我们实际操作就只有2类:
1)使用RDD算子操作
2)使用DSL/SQL对表操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object TransformDemo {
case class Person(id:Int,name:String,age:Int)
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master( "local[*]" ).appName( "SparkSQL" ).getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel( "WARN" )
//2.读取文件
val fileRDD: RDD[String] = sc.textFile( "D:\\data\\person.txt" )
val linesRDD: RDD[Array[String]] = fileRDD.map(_.split( " " ))
val personRDD: RDD[Person] = linesRDD.map(line =>Person(line( 0 ).toInt,line( 1 ),line( 2 ).toInt))
//3.将RDD转成DF
//注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
import spark.implicits._
//注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
//所以SparkSQL可以通过反射自动获取到并添加给DF
//=========================相互转换======================
//1.RDD-->DF
val personDF: DataFrame = personRDD.toDF
//2.DF-->RDD
val rdd: RDD[Row] = personDF.rdd
//3.RDD-->DS
val DS: Dataset[Person] = personRDD.toDS()
//4.DS-->RDD
val rdd2: RDD[Person] = DS.rdd
//5.DF-->DS
val DS2: Dataset[Person] = personDF.as[Person]
//6.DS-->DF
val DF: DataFrame = DS2.toDF()
sc.stop()
spark.stop()
}
}
|
1.6、Spark SQL完成WordCount(案例)
1.6.1、SQL风格
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object WordCount {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master( "local[*]" ).appName( "SparkSQL" ).getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel( "WARN" )
//2.读取文件
val fileDF: DataFrame = spark.read.text( "D:\\data\\words.txt" )
val fileDS: Dataset[String] = spark.read.textFile( "D:\\data\\words.txt" )
//fileDF.show()
//fileDS.show()
//3.对每一行按照空格进行切分并压平
//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
import spark.implicits._
val wordDS: Dataset[String] = fileDS.flatMap(_.split( " " )) //注意:正确,因为DS有泛型,知道_是String
//wordDS.show()
/*
+-----+
|value|
+-----+
|hello|
| me|
|hello|
| you|
...
*/
//4.对上面的数据进行WordCount
wordDS.createOrReplaceTempView( "t_word" )
val sql =
"" "
|select value ,count(value) as count
|from t_word
|group by value
|order by count desc
"" ".stripMargin
spark.sql(sql).show()
sc.stop()
spark.stop()
}
}
|
1.6.2、DQL风格
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package cn.itcast.sql
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object WordCount2 {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
val spark: SparkSession = SparkSession.builder().master( "local[*]" ).appName( "SparkSQL" ).getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel( "WARN" )
//2.读取文件
val fileDF: DataFrame = spark.read.text( "D:\\data\\words.txt" )
val fileDS: Dataset[String] = spark.read.textFile( "D:\\data\\words.txt" )
//fileDF.show()
//fileDS.show()
//3.对每一行按照空格进行切分并压平
//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
import spark.implicits._
val wordDS: Dataset[String] = fileDS.flatMap(_.split( " " )) //注意:正确,因为DS有泛型,知道_是String
//wordDS.show()
/*
+-----+
|value|
+-----+
|hello|
| me|
|hello|
| you|
...
*/
//4.对上面的数据进行WordCount
wordDS.groupBy( "value" ).count().orderBy($ "count" .desc).show()
sc.stop()
spark.stop()
}
}
|
好了,以上内容就到这里了。你学到了吗。
到此这篇关于如何使用IDEA开发Spark SQL程序(一文搞懂)的文章就介绍到这了,更多相关IDEA开发Spark SQL内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/mr_yang888/article/details/105820316