Spark-Streaming的最简单使用

时间:2022-09-24 20:49:40
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

程序1:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by Administrator on 2017/6/6.
*/
object StreamingWordCount {
def main(args: Array[String]) {

//LoggerLevels.setStreamingLogLevels()
//StreamingContext
val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
//接收数据
val ds = ssc.socketTextStream("192.168.33.62", 9999)
//DStream是一个特殊的RDD
//hello tom hello jerry
val result = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
//打印结果
result.print()
ssc.start()
ssc.awaitTermination()
}
}

再另一个shell中: 使用nc命令,不断向端口发送数据

yum -y install nc

nc -lk 9999