Spark SQL电影分析案例

时间:2021-06-05 12:30:29

用Spark SQL分析热门电影的TopN

1.数据结构

数据可以在此下载 https://pan.baidu.com/s/1eSNt6E2#list/path=%2FshareData
文件夹中包含三个数据文件,一个说明文件
数据结构如下

users.dat
5220::M::25::7::91436
5221::F::56::1::96734
5222::M::25::12::94501
5223::M::56::10::11361
5224::M::35::1::10016

movies.dat
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children’s
9::Sudden Death (1995)::Action

ratings.dat
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291

README是说明文件,说明了字段含意

2.TopN求值

求出TopN其实只要ratings.dat这个文件的数据就行了。
步骤

  1. 求出每部电影的平均评分
  2. 排序

3.代码

package main.scala.com.hlf.project.moguyun.movies

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
* 用RDD与DataFrame计算电影topN
* Created by hlf on 2016/11/1.
*/

object MoviesTopN {

def main(args: Array[String]) {

val conf = new SparkConf()
.setAppName("moviesRatings")
.setMaster("local")
//可调优参数
.set("spark.sql.shuffle.partitions","4")//调节并行度
.set("spark.sql.files.maxPartitionBytes","256")//调节每个partition大小
.set("spark.sql.files.openCostInBytes","4")//小文件合并
.set("spark.sql.autoBroadcastJoinThreshold","100")//小表join时小表的大小
val sc = new SparkContext(conf)
sc.setLogLevel("OFF")

val path = """D:\蘑菇云\项目\data\moviesData\"""

//加载数据
val moviesRDD = sc.textFile(path + """movies.dat""")
val usersRDD = sc.textFile(path + """users.dat""")
val ratingsRDD = sc.textFile(path + """ratings.dat""")

/* 计算评分最高电影的TopN,思路:把所有电影的平均评分做对比,所以用到ratings.dat文件
ratings.dat文件有电影和评分两项,这就要把电影,评分取出来做成一个Tuple, 因为要算平均
所以还要加一个个数成为这种格式 (moviesID, (ratings, 1))*/

//用RDD求出
val ratingsArr = ratingsRDD.map(line => line.split("""::"""))
val ratingsRow = ratingsArr.map(line => Ratings(line(0).toLong, line(1).toLong, line(2).toInt, line(3).trim))
ratingsRow.map(row => (row.movieID, (row.rating, 1)))
//.reduceByKey(((x, y), (z, x)) => (x + z, y + x))
.reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))
.map(t => (t._2._1.toFloat / t._2._2.toFloat, t._1))
.map(t => (f"${t._1}%1.2f".toFloat, t._2))//保留两位小数,这有点绕,不过没想到其他方法
.sortByKey(false)
.take(20).foreach(println)

/*
用DataFrame的方式,就要构建schema和Row或者用反射的方式
*/

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val ratingsDF = ratingsRow.toDF
ratingsDF.registerTempTable("ratingsTable")

//用SQL求值
//val sql = "select movieID, ROUND(CAST(sum(rating) AS DOUBLE)/count('rating'), 4) as ratings from ratingsTable group by movieID order by ratings desc"
val sql = "select movieID, ROUND(avg(rating), 3) as ratings from ratingsTable group by movieID order by ratings desc"
sqlContext.sql(sql).show()
//用内置函数求值
import org.apache.spark.sql.functions._
ratingsDF.groupBy("movieID").agg(avg(ratingsDF("rating")).as("ratings")).sort($"ratings".desc).show()

/*
* 看某门电影的年龄、性别分析
* 用到Users和Ratings,通过join操作
*/

val usersDF = usersRDD.map(line => line.split("""::""")).map(line => Users(line(0).toLong, line(1).trim, line(2).toInt, line(3).trim, line(4).toLong)).toDF()
usersDF.registerTempTable("usersTable")

ratingsDF.filter(ratingsDF("movieID") === 24)//找出某电影
//.join(usersDF, ratingsDF("userID") === usersDF("userID"))
.join(usersDF, "userID")
.select(ratingsDF("movieID"),usersDF("gender"), usersDF("age"))
.show()

}
}
/**
* movies.dat文件,可以写成case class
* 格式如下
* MovieID(电影ID)::Title(标题)::Genres(类型)
* 1::Toy Story (1995)::Animation|Children's|Comedy
* 2::Jumanji (1995)::Adventure|Children's|Fantasy
* 3::Grumpier Old Men (1995)::Comedy|Romance
* 4::Waiting to Exhale (1995)::Comedy|Drama
*/

case class Movies(movieID: Long, title: String, genres: String)

/**
* users.dat文件,可以写成case class
* 格式如下
* UserID::Gender(性别)::Age::Occupation(职业)::Zip-code
* 1::F::1::10::48067
* 2::M::56::16::70072
* 3::M::25::15::55117
* 4::M::45::7::02460
*/

case class Users(userID: Long, gender: String, age: Int, occupation: String, zipCode: Long)

/**
* ratings.dat文件,可以写成case class
* 格式如下
* UserID::MovieID::Rating(评分)::Timestamp
* 1::1193::5::978300760
* 1::661::3::978302109
* 1::914::3::978301968
* 1::3408::4::978300275
*/

case class Ratings(userID: Long, movieID: Long, rating: Int, timestamp: String)

结果如下

(5.0,3172)
(5.0,3881)
(5.0,3656)
(5.0,3233)
(5.0,3382)
(5.0,3607)
(5.0,989)
(5.0,1830)
(5.0,787)
(5.0,3280)
(4.8,3245)
(4.75,53)
(4.67,2503)
(4.61,2905)
(4.56,2019)
(4.55,318)
(4.52,858)
(4.52,50)
(4.52,745)
(4.51,1148)
+-------+-------+

|movieID|ratings|
+-------+-------+

| 1830| 5.0|
| 787| 5.0|
| 3233| 5.0|
| 3382| 5.0|
| 3172| 5.0|
| 3607| 5.0|
| 3656| 5.0|
| 3881| 5.0|
| 3280| 5.0|
| 989| 5.0|
| 3245| 4.8|
| 53| 4.75|
| 2503| 4.667|
| 2905| 4.609|
| 2019| 4.561|
| 318| 4.555|
| 858| 4.525|
| 745| 4.521|
| 50| 4.517|
| 527| 4.51|
+-------+-------+

only showing top 20 rows

+-------+-----------------+
|movieID| ratings|
+-------+-----------------+

| 1830| 5.0|
| 787| 5.0|
| 3233| 5.0|
| 3382| 5.0|
| 3172| 5.0|
| 3607| 5.0|
| 3656| 5.0|
| 3881| 5.0|
| 3280| 5.0|
| 989| 5.0|
| 3245| 4.8|
| 53| 4.75|
| 2503|4.666666666666667|
| 2905|4.608695652173913|
| 2019|4.560509554140127|
| 318|4.554557700942973|
| 858|4.524966261808367|
| 745| 4.52054794520548|
| 50|4.517106001121705|
| 527|4.510416666666667|
+-------+-----------------+

only showing top 20 rows

+-------+------+---+
|movieID|gender|age|
+-------+------+---+

| 24| M| 18|
| 24| M| 18|
| 24| F| 25|
| 24| M| 18|
| 24| M| 25|
| 24| M| 35|
| 24| M| 25|
| 24| M| 35|
| 24| M| 18|
| 24| F| 50|
| 24| M| 25|
| 24| M| 18|
| 24| M| 18|
| 24| F| 25|
| 24| M| 25|
| 24| M| 50|
| 24| M| 35|
| 24| F| 25|
| 24| F| 1|
| 24| M| 25|
+-------+------+---+

only showing top 20 rows

4.优化

因为使用的是DataFrame,而Catalyst已经做过优化了,所以能优化的项并不多,主要有四个
1.并行度的优化,这要根据自己集群的配置来调节,默认情况下是200
spark.sql.shuffle.partitions=200
2.调节每个partition大小,默认 128M,可以适当调大点
spark.sql.files.maxPartitionBytes=256
3.小文件合并,默认是4M,可以调大点,不然每个小文件就是一个Task
spark.sql.files.openCostInBytes=4M
4.两个表shuffle,如join。这个最有用,经常使用的。
spark.sql.autoBroadcastJoinThreshold 默认是10M,调成100M,甚至是1G。

以上内容部分来自[DT大数据梦工厂]首席专家Spark专家王家林老师的课程分享。感谢王老师的分享,更多精彩内容请扫描关注[DT大数据梦工厂]微信公众号DT_Spark