用Spark SQL分析热门电影的TopN
1.数据结构
数据可以在此下载 https://pan.baidu.com/s/1eSNt6E2#list/path=%2FshareData
文件夹中包含三个数据文件,一个说明文件
数据结构如下
users.dat
5220::M::25::7::91436
5221::F::56::1::96734
5222::M::25::12::94501
5223::M::56::10::11361
5224::M::35::1::10016
movies.dat
5::Father of the Bride Part II (1995)::Comedy
6::Heat (1995)::Action|Crime|Thriller
7::Sabrina (1995)::Comedy|Romance
8::Tom and Huck (1995)::Adventure|Children’s
9::Sudden Death (1995)::Action
ratings.dat
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
README是说明文件,说明了字段含意
2.TopN求值
求出TopN其实只要ratings.dat这个文件的数据就行了。
步骤
- 求出每部电影的平均评分
- 排序
3.代码
package main.scala.com.hlf.project.moguyun.movies
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* 用RDD与DataFrame计算电影topN
* Created by hlf on 2016/11/1.
*/
object MoviesTopN {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("moviesRatings")
.setMaster("local")
//可调优参数
.set("spark.sql.shuffle.partitions","4")//调节并行度
.set("spark.sql.files.maxPartitionBytes","256")//调节每个partition大小
.set("spark.sql.files.openCostInBytes","4")//小文件合并
.set("spark.sql.autoBroadcastJoinThreshold","100")//小表join时小表的大小
val sc = new SparkContext(conf)
sc.setLogLevel("OFF")
val path = """D:\蘑菇云\项目\data\moviesData\"""
//加载数据
val moviesRDD = sc.textFile(path + """movies.dat""")
val usersRDD = sc.textFile(path + """users.dat""")
val ratingsRDD = sc.textFile(path + """ratings.dat""")
/* 计算评分最高电影的TopN,思路:把所有电影的平均评分做对比,所以用到ratings.dat文件
ratings.dat文件有电影和评分两项,这就要把电影,评分取出来做成一个Tuple, 因为要算平均
所以还要加一个个数成为这种格式 (moviesID, (ratings, 1))*/
//用RDD求出
val ratingsArr = ratingsRDD.map(line => line.split("""::"""))
val ratingsRow = ratingsArr.map(line => Ratings(line(0).toLong, line(1).toLong, line(2).toInt, line(3).trim))
ratingsRow.map(row => (row.movieID, (row.rating, 1)))
//.reduceByKey(((x, y), (z, x)) => (x + z, y + x))
.reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2))
.map(t => (t._2._1.toFloat / t._2._2.toFloat, t._1))
.map(t => (f"${t._1}%1.2f".toFloat, t._2))//保留两位小数,这有点绕,不过没想到其他方法
.sortByKey(false)
.take(20).foreach(println)
/*
用DataFrame的方式,就要构建schema和Row或者用反射的方式
*/
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val ratingsDF = ratingsRow.toDF
ratingsDF.registerTempTable("ratingsTable")
//用SQL求值
//val sql = "select movieID, ROUND(CAST(sum(rating) AS DOUBLE)/count('rating'), 4) as ratings from ratingsTable group by movieID order by ratings desc"
val sql = "select movieID, ROUND(avg(rating), 3) as ratings from ratingsTable group by movieID order by ratings desc"
sqlContext.sql(sql).show()
//用内置函数求值
import org.apache.spark.sql.functions._
ratingsDF.groupBy("movieID").agg(avg(ratingsDF("rating")).as("ratings")).sort($"ratings".desc).show()
/*
* 看某门电影的年龄、性别分析
* 用到Users和Ratings,通过join操作
*/
val usersDF = usersRDD.map(line => line.split("""::""")).map(line => Users(line(0).toLong, line(1).trim, line(2).toInt, line(3).trim, line(4).toLong)).toDF()
usersDF.registerTempTable("usersTable")
ratingsDF.filter(ratingsDF("movieID") === 24)//找出某电影
//.join(usersDF, ratingsDF("userID") === usersDF("userID"))
.join(usersDF, "userID")
.select(ratingsDF("movieID"),usersDF("gender"), usersDF("age"))
.show()
}
}
/**
* movies.dat文件,可以写成case class
* 格式如下
* MovieID(电影ID)::Title(标题)::Genres(类型)
* 1::Toy Story (1995)::Animation|Children's|Comedy
* 2::Jumanji (1995)::Adventure|Children's|Fantasy
* 3::Grumpier Old Men (1995)::Comedy|Romance
* 4::Waiting to Exhale (1995)::Comedy|Drama
*/
case class Movies(movieID: Long, title: String, genres: String)
/**
* users.dat文件,可以写成case class
* 格式如下
* UserID::Gender(性别)::Age::Occupation(职业)::Zip-code
* 1::F::1::10::48067
* 2::M::56::16::70072
* 3::M::25::15::55117
* 4::M::45::7::02460
*/
case class Users(userID: Long, gender: String, age: Int, occupation: String, zipCode: Long)
/**
* ratings.dat文件,可以写成case class
* 格式如下
* UserID::MovieID::Rating(评分)::Timestamp
* 1::1193::5::978300760
* 1::661::3::978302109
* 1::914::3::978301968
* 1::3408::4::978300275
*/
case class Ratings(userID: Long, movieID: Long, rating: Int, timestamp: String)
结果如下
(5.0,3172)
(5.0,3881)
(5.0,3656)
(5.0,3233)
(5.0,3382)
(5.0,3607)
(5.0,989)
(5.0,1830)
(5.0,787)
(5.0,3280)
(4.8,3245)
(4.75,53)
(4.67,2503)
(4.61,2905)
(4.56,2019)
(4.55,318)
(4.52,858)
(4.52,50)
(4.52,745)
(4.51,1148)
+-------+-------+
|movieID|ratings|
+-------+-------+
| 1830| 5.0|
| 787| 5.0|
| 3233| 5.0|
| 3382| 5.0|
| 3172| 5.0|
| 3607| 5.0|
| 3656| 5.0|
| 3881| 5.0|
| 3280| 5.0|
| 989| 5.0|
| 3245| 4.8|
| 53| 4.75|
| 2503| 4.667|
| 2905| 4.609|
| 2019| 4.561|
| 318| 4.555|
| 858| 4.525|
| 745| 4.521|
| 50| 4.517|
| 527| 4.51|
+-------+-------+
only showing top 20 rows
+-------+-----------------+
|movieID| ratings|
+-------+-----------------+
| 1830| 5.0|
| 787| 5.0|
| 3233| 5.0|
| 3382| 5.0|
| 3172| 5.0|
| 3607| 5.0|
| 3656| 5.0|
| 3881| 5.0|
| 3280| 5.0|
| 989| 5.0|
| 3245| 4.8|
| 53| 4.75|
| 2503|4.666666666666667|
| 2905|4.608695652173913|
| 2019|4.560509554140127|
| 318|4.554557700942973|
| 858|4.524966261808367|
| 745| 4.52054794520548|
| 50|4.517106001121705|
| 527|4.510416666666667|
+-------+-----------------+
only showing top 20 rows
+-------+------+---+
|movieID|gender|age|
+-------+------+---+
| 24| M| 18|
| 24| M| 18|
| 24| F| 25|
| 24| M| 18|
| 24| M| 25|
| 24| M| 35|
| 24| M| 25|
| 24| M| 35|
| 24| M| 18|
| 24| F| 50|
| 24| M| 25|
| 24| M| 18|
| 24| M| 18|
| 24| F| 25|
| 24| M| 25|
| 24| M| 50|
| 24| M| 35|
| 24| F| 25|
| 24| F| 1|
| 24| M| 25|
+-------+------+---+
only showing top 20 rows
4.优化
因为使用的是DataFrame,而Catalyst已经做过优化了,所以能优化的项并不多,主要有四个
1.并行度的优化,这要根据自己集群的配置来调节,默认情况下是200
spark.sql.shuffle.partitions=200
2.调节每个partition大小,默认 128M,可以适当调大点
spark.sql.files.maxPartitionBytes=256
3.小文件合并,默认是4M,可以调大点,不然每个小文件就是一个Task
spark.sql.files.openCostInBytes=4M
4.两个表shuffle,如join。这个最有用,经常使用的。
spark.sql.autoBroadcastJoinThreshold 默认是10M,调成100M,甚至是1G。
以上内容部分来自[DT大数据梦工厂]首席专家Spark专家王家林老师的课程分享。感谢王老师的分享,更多精彩内容请扫描关注[DT大数据梦工厂]微信公众号DT_Spark