本次实践使用kafka console作为消息的生产者,Spark Streaming作为消息的消费者,具体实践代码如下
首先启动kafka server
.\bin\windows\kafka-server-start.bat .\config\server.properties
创建一个Topic
此处topic名以test为例
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
创建一个producer
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
创建一个Consumer
package spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object SparkStreamingKakfaWordCount {
def main(args: Array[String]) {
println("Start to run SparkStreamingKakfaWordCount")
val conf = new SparkConf().setMaster("local[3]")setAppName("SparkStreamingKakfaWordCount")
val ssc = new StreamingContext(conf, Seconds(4))
val topicMap=Map("test" -> 1)
// zookeeper quorums server list
val zkQuorum = "localhost:2181";
// consumer group
val group = "test-consumer-group01"
//下面的处理方式假设topic test只有一个分区
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.print()
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x,1L)).reduceByKey(_+_)
wordCounts.print()
// 下面的处理方式假设topic test有2个分区,spark streaming 创建2个Input DStream,并行读2个分区
// Spark Streaming将RDD重新分区为4个RDD,进行并行处理,处理逻辑的并行度是读取并行的度的2倍
// val streams = (1 to 2).map( _ => KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2))
// 将2个stream进行union
// val partitions = ssc.union(streams).repartition(4).map("DataReceived: " + _)
// partitions.print()
// val partitions = ssc.union(streams).repartition(2) //partition个数根据spark并行处理能力而定
// val words = partitions.flatMap(_.split(" "))
// val wordCounts = words.map(x => (x,1L)).reduceByKey(_+_)
// wordCounts.print()
ssc.start() //Start the computation
ssc.awaitTermination() //Wait for the computation to termination
}
}