java8实现spark streaming的wordcount

时间:2022-06-19 06:17:47

概念这里就不说了,从案例开始,惯例,hellowrod,哦不,wordcount。
要计算从一个监听 TCP socket 的数据服务器接收到的文本数据(text data)中的字数。
主体代码部分跟spark相差不大,毕竟DStream是RDD产生的模板(或者说类)。

1.导入了 Spark Streaming 类

 <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.0</version>
</dependency>

2.代码示例

//注意本地调试,master必须为local[n],n>1,表示一个线程接收数据,n-1个线程处理数据
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
JavaSparkContext sc = new JavaSparkContext(conf);
//设置日志运行级别
sc.setLogLevel("WARN");
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
//创建一个将要连接到hostname:port 的离散流
JavaReceiverInputDStream<String> lines =
ssc.socketTextStream("master1", 9999);
JavaPairDStream<String, Integer> counts =
lines.flatMap(x->Arrays.asList(x.split(" ")).iterator())
.mapToPair(x -> new Tuple2<String, Integer>(x, 1))
.reduceByKey((x, y) -> x + y);

// 在控制台打印出在这个离散流(DStream)中生成的每个 RDD 的前十个元素
counts.print();
// 启动计算
ssc.start();
ssc.awaitTermination();

3.建立服务端
找台linux服务器,运行netcat小工具:
nc -lk 9999
也就是上面代码里socketTextStream的参数.

4.运行测试
本地启动java代码后,控制台会循环打印时间戳。
在nc那边随意输入,本地即可实时看到统计结果。
java8实现spark streaming的wordcount