Kafka0.8版本基于receiver接受器去接受kafka topic中的数据(并演示reduceByKeyAndWindow的使用)
依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.3</version> </dependency>
代码
package com.zy.kafka2streaming import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.immutable /** * sparkStreaming整合kafka: 基于receiver接受器去接受kafka topic中的数据,使用高级api(消息的偏移量由zk维护) * kafka0.8版本才有createStream 1.0就没有了 * reduceByKeyAndWindow算子使用(开窗函数) * * 前提:需要开启生产者往kafka中写入数据 */ object SparkStreamingKafkaReceiver { def main(args: Array[String]): Unit = { //sparkConf 开启WAL日志,保证数据源的安全性 val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingKafkaReceiver").setMaster("local[4]").set("spark.streaming.receiver.writeAheadLog.enable", "true") //sc val sc: SparkContext = new SparkContext(sparkConf) sc.setLogLevel("WARN") //构建ssc val ssc: StreamingContext = new StreamingContext(sc, Seconds(5)) //设置checkpoint目录 ssc.checkpoint("./spark-receiver") //接收kafka数据 //1 指定zk地址 val zkQuorum = "bigdata-01:2181,bigdata-02:2181,bigdata-03:2181" //2 消费者groupid val groupId = "zyTest" // 指定topic有关信息 key:表示topic的名称,value:表示每一个receiver接收器使用多少个线程去消费topic数据 val topic = Map("sparkDemo" -> 1) //使用多个receiver接收(循环创建 并放到集合中) val receiverList: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => { val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic) stream }) //把一个集合中多个Dstream数据汇总成一个Dstream val unionStream: DStream[(String, String)] = ssc.union(receiverList) //获取topic数据 第二个是value val data: DStream[String] = unionStream.map(_._2) //切分 val words: DStream[String] = data.flatMap(_.split(",")) //计数 val wordsAndOne: DStream[(String, Int)] = words.map((_, 1)) //聚合 //val result: DStream[(String, Int)] = wordsAndOne.reduceByKey(_ + _) /** * 开窗函数 reduceByKeyAndWindow 三个参数 * 第一个:逻辑函数 * 第二个:表示窗口的长度 * 第三个:表示窗口的滑动时间间隔,每隔多久计算一次 * * 每5秒统计前15秒的结果 */ val result: DStream[(String, Int)] = wordsAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(15), Seconds(5)) //打印 result.print() //开启流计算 ssc.start() ssc.awaitTermination() } }
Kafka1.0版本整合Kafka(并演示updateStateByKey的使用)
依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.3</version> </dependency>
代码
package com.zy.kafka2streaming import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * sparkStreaming整合kafka:利用低级api(消息的offset不再由zk去维护,有streaming处理完数据去维护) * updateStateByKey算子的使用(历史累计) * 前提:需要开启生产者往kafka中写入数据 */ object SparkStreamingKafkaDirect { def main(args: Array[String]): Unit = { //sparkConf val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingKafkaDirect").setMaster("local[4]") //sc val sc: SparkContext = new SparkContext(sparkConf) sc.setLogLevel("WARN") //sparkStreaming val ssc: StreamingContext = new StreamingContext(sc, Seconds(5)) //checkpoint ssc.checkpoint("./spark-direct") //----------------获取kafka中的数据------------------- //kafka0.8版本写法 //注意 这里0.8版本的参数是zk的地址 1.0版本的是kafka的地址 // val kafkaParams = Map("bootstrap.servers" -> "bigdata-01:9092,bigdata-02:9092,bigdata-03:9092", "groupId" -> "sparkDirect") //topic 可以设置多个topic // val topics = Set("sparkDemo") // KafkaUtils.createDirectStream 0.8版本的写法 // val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) /** * 官网kafka1.0版本 Creating a Direct Stream 示例 * import org.apache.kafka.clients.consumer.ConsumerRecord * import org.apache.kafka.common.serialization.StringDeserializer * import org.apache.spark.streaming.kafka010._ * import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent * import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe * * * val kafkaParams = Map[String, Object]( * "bootstrap.servers" -> "localhost:9092,anotherhost:9092", * "key.deserializer" -> classOf[StringDeserializer], * "value.deserializer" -> classOf[StringDeserializer], * "group.id" -> "use_a_separate_group_id_for_each_stream", * "auto.offset.reset" -> "latest", * "enable.auto.commit" -> (false: java.lang.Boolean) * ) * * * val topics = Array("topicA", "topicB") * val stream = KafkaUtils.createDirectStream[String, String]( * streamingContext, * PreferConsistent, * Subscribe[String, String](topics, kafkaParams) * ) * * *stream.map(record => (record.key, record.value)) */ //1.0版本的写法 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "bigdata-01:9092,bigdata-02:9092,bigdata-03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "zy_test_direct", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Set("sparkDemo") val dstream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) //--------------------------获取topic数据---------------------------- //0.8版本可以这么写 1.0版本不能这么写了 //val data: DStream[String] = dstream.map(_._2) //1.0版本 DStream.map val data: DStream[String] = dstream.map(_.value()) //切分 val words: DStream[String] = data.flatMap(_.split(",")) //计数 val wordsAndOne: DStream[(String, Int)] = words.map((_, 1)) //聚合 //val result: DStream[(String, Int)] = wordsAndOne.reduceByKey(_ + _) /** * 使用updateStateByKey 累计统计单词出现的次数 * 需要传一个函数进去 */ val result: DStream[(String, Int)] = wordsAndOne.updateStateByKey(updateFunction) //打印 result.print() //开启流计算 ssc.start() ssc.awaitTermination() } /** * * @param newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1 * @param historyCount 历史的所有相同key的value总和 * @return */ def updateFunction(newValues: Seq[Int], historyCount: Option[Int]): Option[Int] = { //新的计数等于原来的计数加上这次数据的sum val newCount: Int = historyCount.getOrElse(0) + newValues.sum //将累加后的结果放到Option的子集Some中返回 Some(newCount) } }