案例1:使用Java实现spark的wordCount
案例需求:
单词计数
第一步:创建maven工程,引入依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.3</version>
</dependency>
</dependencies>
第二步:代码开发
说明:
- 使用Java编写spark程序,其实跟scala的步骤是一样的,只不过写法有点变化而已。
- scala的RDD对应Java中的JavaRDD
- scala的SparkContext对应Java中的JavaSparkContext
- scala方法中的参数为函数时,在Java中要改成对象,因为Java是面向对象的,这是scala相对于Java非常不同的地方。
- 编写spark程序的大致步骤如下:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class MyJavaSpark {
public static void main(String[] args) {
//1、创建spark Conf
SparkConf sparkConf = new SparkConf().setAppName("WordCount").setMaster("local");
//2、创建spark Context
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
//3、读取数据
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile("F:\\test\\aa.txt", 2);
//4、切分每一行数据为一个个单词
final JavaRDD<String> wordsRDD = stringJavaRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
String[] s1 = s.split(" ");
return Arrays.asList(s1).iterator();
}
});
//5、每个单词计为1
JavaPairRDD<String, Integer> wordAndOneRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
//6、相同的单词累加1
JavaPairRDD<String, Integer> resultRDD = wordAndOneRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
//7、收集数据
List<Tuple2<String, Integer>> collectRDD = resultRDD.collect();
//8、打印数据
for(Tuple2<String,Integer> t:collectRDD){
System.out.println("单词:"+t._1+"次数:"+t._2);
}
//9、关闭资源
javaSparkContext.stop();
}
}
运行结果为:
单词:hive次数:1
单词:flink次数:1
单词:spark次数:4
单词:hadoop次数:3
单词:flume次数:1
单词:hbase次数:1
案例2:实现点击流日志数据分析
点击流日志数据:用户在网站的浏览行为记录
案例数据
资料中的access.log文件,文件里一行数据的格式大致如下:
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
- 数据中的横杠-也是一个字段, 表示无
- 一行数据代表一次访问。
- 第1个字段是用户的ip地址
统计PV
PV:页面浏览量,是网站各个网页被浏览的总次数。对应于access.log中的数据,一行数据就是一条浏览记录。
因此,要获取PV,实质是要统计access.log文件中行数。
import org.apache.spark.{SparkConf, SparkContext}
object PV {
def main(args: Array[String]): Unit = {
val sparkconf=new SparkConf().setAppName("PV").setMaster("local[2]")
val sc=new SparkContext(sparkconf)
val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\access.log")
val pv=data.count();
println(pv)
sc.stop()
}
}
运行结果为:
14619
统计UV
UV(Unique Visitor)是独立访客数。放在这里就是有多少个不同的ip地址的访客访问过网站,相同ip地址的访客,无论访问网站多少次,都只算入UV一次。
因此,spark程序的大致步骤是:加载每一行数据,获取每一行数据的ip地址,对ip地址去重,然后统计ip数量。
代码开发:
import org.apache.spark.{SparkConf, SparkContext}
object PV {
def main(args: Array[String]): Unit = {
val sparkconf=new SparkConf().setAppName("PV").setMaster("local[2]")
val sc=new SparkContext(sparkconf)
val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\access.log")
//获取每一行的ip地址:
val rdd2=data.map(x=>x.split(" ")(0))
//去重:
val rdd3=rdd2.distinct()
val uv=rdd3.count();
println(uv)
sc.stop()
}
}
1050
获取被访问的TopN页面地址
数据文件里每一行数据代表一次访问,每一行数据的第11个字段是被访问的页面地址,如
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
中的"http://cos.name/category/software/packages/"。
但是,有些行的数据是不完整的,可能没有第11个字段,或者第11个字段的值是 "-" ,因此,我们首先要进行数据的处理,然后再分析数据。
注意,"-"中的双引号是包括在数据里面的,千万别少写了,特别注意下面代码块中的第11行代码。
代码开发
import org.apache.spark.{SparkConf, SparkContext}
object PV {
def main(args: Array[String]): Unit = {
val sparkconf=new SparkConf().setAppName("PV").setMaster("local[2]")
val sc=new SparkContext(sparkconf)
val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\access.log")
//处理数据,使得每一行数据至少有11个字段
val data2=data.filter(x=>x.split(" ").length>10)
//处理数据,使得每一行数据的第11个字段都不为 "-",注意,双引号也包括在里面
val data3=data2.filter(x=>x.split(" ")(10)!="\"-\"")
//获取第11个字段
val rdd10=data3.map(x=>x.split(" ")(10))
//每个计1
val result1=rdd10.map(x=>(x,1))
//统计
val result2=result1.reduceByKey(_+_)
//排序
val sortRDD=result2.sortBy(x=>x._2,false)
//获取Top5
val finalRes=sortRDD.take(5)
//打印:
finalRes.foreach(println)
sc.stop()
}
}
运行结果为:
("http://blog.fens.me/category/hadoop-action/",547)
("http://blog.fens.me/",377)
("http://blog.fens.me/wp-admin/post.php?post=2445&action=edit&message=10",360)
("http://blog.fens.me/r-json-rjson/",274)
("http://blog.fens.me/angularjs-webstorm-ide/",271)
案例3:读取文件数据写入到mysql表中
创建maven工程
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
案例数据
1,tony,18
2,xiaoqiang,20
3,xiaoming,15
4,laowang,45
创建mysql表
在node03登录mysql,创建一个表,Person
mysql> create database demo1;
mysql> use demo1
mysql> create table person(id int,name varchar(15),age int);
通过foreach算子实现
大致步骤:加载数据--》处理数据后将数据封装到RDD中--》foreach遍历数据,创建mysql连接,写入数据
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Data2MysqlForeach {
def main(args: Array[String]): Unit = {
val sparkkconf=new SparkConf().setAppName("ForeachMysql").setMaster("local[2]")
val sc=new SparkContext(sparkkconf)
val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\person.txt")
val data2:RDD[Array[String]]=data.map(x=>x.split(","))
data2.foreach(t=> {
var conne: Connection = null
try {
//创建连接
conne = DriverManager.getConnection("jdbc:mysql://node03:3306/demo1", "root", "123456")
//定义sql语句,?是占位符
val sql1 = "insert into person(id,name,age) values(?,?,?)"
//获取prepareStatement对象,这个对象可以对sql语句进行预编译
val ps = conne.prepareStatement(sql1)
//给sql语句的问号?赋值,1代表第一个问号,2代表第二个问号...
ps.setString(1, t(0))
ps.setString(2, t(1))
ps.setString(3, t(2))
//执行sql语句
ps.execute()
}catch {
case ex:Exception =>println(ex.getMessage)
}finally {
if(conne!=null){conne.close()}
}
})
sc.stop()
}
}
查看mysql的person表,如下,已经写入成功:
mysql> select * from person;
+------+-----------+------+
| id | name | age |
+------+-----------+------+
| 4 | laowang | 45 |
| 1 | tony | 18 |
| 2 | xiaoqiang | 20 |
| 3 | xiaoming | 15 |
+------+-----------+------+
说明:
- 通过foreach算子来实现的话,观察代码,会发现,foreach每遍历一条数据,就会创建一个mysql连接,如果存在大量数据的话,无疑是很耗时的。
- 从person表可看到,插入的数据的顺序并不是跟源数据一致的,这是因为受到了多个分区并行执行的影响。
通过foreachPartition算子实现
通过foreachPartition来实现与foreach来实现的源代码差不多,只需要修改几个地方,代码如下:
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Data2MysqlForeachPartition {
def main(args: Array[String]): Unit = {
val sparkkconf=new SparkConf().setAppName("ForeachMysql").setMaster("local[2]")
val sc=new SparkContext(sparkkconf)
val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\person.txt")
val data2:RDD[Array[String]]=data.map(x=>x.split(","))
data2.foreachPartition(iter=> {
var conne: Connection = null
try {
//创建连接
conne = DriverManager.getConnection("jdbc:mysql://node03:3306/demo1", "root", "123456")
//定义sql语句,?是占位符
val sql1 = "insert into person(id,name,age) values(?,?,?)"
//获取prepareStatement对象,这个对象可以对sql语句进行预编译
val ps = conne.prepareStatement(sql1)
//给sql语句的问号?赋值,1代表第一个问号,2代表第二个问号...
iter.foreach(t=>{
ps.setString(1, t(0))
ps.setString(2, t(1))
ps.setString(3, t(2))
ps.addBatch()
})
//执行sql语句
ps.executeBatch()
}catch {
case ex:Exception =>println(ex.getMessage)
}finally {
if(conne!=null){conne.close()}
}
})
sc.stop()
}
}
再次查看person表:
mysql> select * from person;
+------+-----------+------+
| id | name | age |
+------+-----------+------+
| 4 | laowang | 45 |
| 1 | tony | 18 |
| 2 | xiaoqiang | 20 |
| 3 | xiaoming | 15 |
| 1 | tony | 18 |
| 4 | laowang | 45 |
| 2 | xiaoqiang | 20 |
| 3 | xiaoming | 15 |
+------+-----------+------+
小结
-
foreach算子实现获取得到一条一条的数据之后,然后进行获取对应的数据库连接,实现把数据插入到mysql表中,这里rdd中有N条数据,这里就需要与mysql数据库创建N次连接,它是比较浪费资源
-
foreachPartition算子实现以分区为单位与mysql数据库来创建数据库连接,可以大大减少与mysql数据创建的连接数,有助于程序的性能提升。所以后期推荐大家使用foreachPartition算子
案例4:读取文件数据写入到hbase表中
案例数据
数据是资料中的users.dat文件,数据的大致格式如下,以::为分隔符,一共5个字段,分别是id,gender,age,position,code
1::F::1::10::48067
2::M::56::16::70072
3::M::25::15::55117
4::M::45::7::02460
5::M::25::20::55455
6::F::50::9::55117
7::M::35::1::06810
添加pom依赖
在之前案例的pom的基础上,添加以下依赖:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.1</version>
</dependency>
创建hbase表
确保hbase、hadoop、zookeeper都正常开启,进入hbase shell,创建表person
start-hbase.sh
hbase shell
hbase(main):001:0> create \'person\',\'f1\',\'f2\'
代码开发
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.spark.{SparkConf, SparkContext}
object Data2Hbase {
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf().setAppName("hbase").setMaster("local[2]")
val sc=new SparkContext(sparkConf)
val data=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\users.dat")
val data2=data.map(x=>x.split("::"))
data2.foreachPartition(iter=>{
var conne:Connection=null
try{
val conf=HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181")
conne=ConnectionFactory.createConnection(conf)
val tablePerson=conne.getTable(TableName.valueOf("person"))
iter.foreach(x=>{
val put=new Put(x(0).getBytes())
put.addColumn("f1".getBytes(),"gender".getBytes(),x(1).getBytes())
put.addColumn("f1".getBytes(),"age".getBytes(),x(2).getBytes())
put.addColumn("f1".getBytes(),"position".getBytes(),x(3).getBytes())
put.addColumn("f1".getBytes(),"code".getBytes(),x(4).getBytes())
tablePerson.put(put)
})
}catch {
case ex:Exception =>println(ex.getMessage)
}finally {if (conne!=null){conne.close()}}
})
}
}
查看hbase中的person表,部分数据如下:
scan \'person\'
999 column=f1:age, timestamp=1586981613489, value=25
999 column=f1:code, timestamp=1586981613489, value=62558
999 column=f1:gender, timestamp=1586981613489, value=M
999 column=f1:position, timestamp=1586981613489, value=15
案例5:实现ip地址查询
需求分析
在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。
要想实现上面热点图效果,我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。
示意图:基站下放给用户可以上网的ip地址,通过这个ip地址就可以定位用户的坐标(经纬度)。
案例数据
1、日志信息数据: 20090121000132.394251.http.format
各字段分别表示:时间戳|ip地址|.......,只需要留意前2个字段
2、城市ip段信息数据: ip.txt,类似于码表数据
开始数字和结束数字分别是开始ip和结束ip经过算法计算得到的值。
开发思路
1、 加载城市ip段信息,获取ip起始数字和结束数字,经度,维度
2、 加载日志数据,获取ip信息,然后使用相同的算法将ip转换为数字,和ip段比较
3、 比较的时候采用二分法查找,找到对应的经度和维度
4、 然后对经度和维度做单词计数
广播变量
在本次的ip案例中,要将日志数据中的每个ip都拿去跟城市ip信息数据进行匹配,为每个日志数据中的ip匹配对应的经纬度,而如果每个task都加载一份城市ip信息数据到内存中的话,无疑是非常消耗内存的,因此需要将城市ip信息数据封装在广播变量里,作为共享数据。
代码实现
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CityIp {
def ip2Long(ip:String):Long={
val ipSpl:Array[String]=ip.split("\\.")
var ipLong: Long = 0L
for(i<-ipSpl){
ipLong=i.toLong | ipLong<<8L
}
ipLong
}
def binarySearch(ipLong: Long, cityIp: Array[(String, String, String, String)]): Long ={
//定义码表数组的起始下标:
var startIndex=0
//定义码表数组的结束下标:
var endIndex=cityIp.length-1
while(startIndex<=endIndex){
val middleIndex=(startIndex+endIndex)/2
//如果正好满足中间的元组的IP数值
if(ipLong >= cityIp(middleIndex)._1.toLong && ipLong<=cityIp(middleIndex)._2.toLong){
return middleIndex
}
if(ipLong > cityIp(middleIndex)._1.toLong){
startIndex=middleIndex+1
}
if(ipLong<cityIp(middleIndex)._2.toLong){
endIndex=middleIndex
}
}
-1 //-1表示下标没有找到
}
def main(args: Array[String]): Unit = {
val sparkconf=new SparkConf().setAppName("Ip").setMaster("local")
val sc=new SparkContext(sparkconf)
//加载ip码表数据
val ipData=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\ip.txt")
//处理ip码表数据
val ipData2=ipData.map(x=>x.split("\\|")).map(x=>(x(2),x(3),x(x.length-2),x(x.length-2)))
//创建广播变量
val bdIP=sc.broadcast(ipData2.collect())
//加载运营商日志数据
val logData=sc.textFile("E:\\LearningAll\\8-HadoopEcosystem-Video\\spark下载资料\\spark课程录播资料\\案例数据\\20090121000132.394251.http.format")
//处理运营商日志数据
val logIps=logData.map(x=>x.split("\\|")(1))
//遍历日志数据中的每个ip,将ip转为Long类型数值
val andOneRDD:RDD[((String, String), Int)] =logIps.mapPartitions(iter=>{
//获取广播变量的数据
val cityIp:Array[(String,String,String,String)]=bdIP.value
//遍历日志的ip,将ip转为数值
iter.map(x=>{
val ipLong=ip2Long(x)
//获取ipLong在ip码表对应的索引数值(获取ipLong处在城市ipx信息表的第几行的ip数值区间)
val index:Int=binarySearch(ipLong,cityIp).toInt
//获取下标对应的经纬度等信息
val resultJW: (String, String, String, String)=cityIp(index)
//封装数据,作为返回值
((resultJW._3,resultJW._4),1)
})
})
//相同的经纬度出现累加1
val finalResult=andOneRDD.reduceByKey(_+_)
//打印数据:
finalResult.foreach(println)
}
}
运行结果为:
((106.51107,106.51107),91)
((108.948024,108.948024),1824)
((114.502461,114.502461),383)
((106.27633,106.27633),36)
((102.712251,102.712251),126)
((107.08166,107.08166),29)
((116.405285,116.405285),1535)
((107.7601,107.7601),85)
((107.39007,107.39007),47)
((106.57434,106.57434),177)
((106.56347,106.56347),3)
((106.504962,106.504962),400)
小结
该案例比较贴近实际的真实需求,含金量是比较高,这里使用了广播变量知识点、二分查询、ip地址转成Long类型数字,大家多多练习!掌握spark中的RDD编程。