Spark之 Spark Streaming整合kafka(并演示reduceByKeyAndWindow、updateStateByKey算子使用)

时间:2022-04-20 23:52:49

Kafka0.8版本基于receiver接受器去接受kafka topic中的数据(并演示reduceByKeyAndWindow的使用)

依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.1.3</version>
</dependency>

代码

package com.zy.kafka2streaming


import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.immutable

/**
  * sparkStreaming整合kafka: 基于receiver接受器去接受kafka topic中的数据,使用高级api(消息的偏移量由zk维护)
  * kafka0.8版本才有createStream 1.0就没有了
  * reduceByKeyAndWindow算子使用(开窗函数)
  *
  * 前提:需要开启生产者往kafka中写入数据
  */
object SparkStreamingKafkaReceiver {
  def main(args: Array[String]): Unit = {
    //sparkConf  开启WAL日志,保证数据源的安全性
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingKafkaReceiver").setMaster("local[4]").set("spark.streaming.receiver.writeAheadLog.enable", "true")
    //sc
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    //构建ssc
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
    //设置checkpoint目录
    ssc.checkpoint("./spark-receiver")

    //接收kafka数据
    //1 指定zk地址
    val zkQuorum = "bigdata-01:2181,bigdata-02:2181,bigdata-03:2181"
    //2 消费者groupid
    val groupId = "zyTest"
    // 指定topic有关信息 key:表示topic的名称,value:表示每一个receiver接收器使用多少个线程去消费topic数据
    val topic = Map("sparkDemo" -> 1)

    //使用多个receiver接收(循环创建 并放到集合中)
    val receiverList: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
      val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)
      stream
    })
    //把一个集合中多个Dstream数据汇总成一个Dstream
    val unionStream: DStream[(String, String)] = ssc.union(receiverList)

    //获取topic数据 第二个是value
    val data: DStream[String] = unionStream.map(_._2)
    //切分
    val words: DStream[String] = data.flatMap(_.split(","))
    //计数
    val wordsAndOne: DStream[(String, Int)] = words.map((_, 1))
    //聚合
    //val result: DStream[(String, Int)] = wordsAndOne.reduceByKey(_ + _)

    /**
      * 开窗函数 reduceByKeyAndWindow  三个参数
      * 第一个:逻辑函数
      * 第二个:表示窗口的长度
      * 第三个:表示窗口的滑动时间间隔,每隔多久计算一次
      *
      * 每5秒统计前15秒的结果
      */
    val result: DStream[(String, Int)] = wordsAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(15), Seconds(5))

    //打印
    result.print()

    //开启流计算
    ssc.start()
    ssc.awaitTermination()

  }
}

 

Kafka1.0版本整合Kafka(并演示updateStateByKey的使用)

依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.1.3</version>
</dependency>

代码

package com.zy.kafka2streaming

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * sparkStreaming整合kafka:利用低级api(消息的offset不再由zk去维护,有streaming处理完数据去维护)
  * updateStateByKey算子的使用(历史累计)
  * 前提:需要开启生产者往kafka中写入数据
  */
object SparkStreamingKafkaDirect {
  def main(args: Array[String]): Unit = {
    //sparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreamingKafkaDirect").setMaster("local[4]")
    //sc
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    //sparkStreaming
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))

    //checkpoint
    ssc.checkpoint("./spark-direct")


    //----------------获取kafka中的数据-------------------


    //kafka0.8版本写法
    //注意 这里0.8版本的参数是zk的地址 1.0版本的是kafka的地址
    //    val kafkaParams = Map("bootstrap.servers" -> "bigdata-01:9092,bigdata-02:9092,bigdata-03:9092", "groupId" -> "sparkDirect")
    //topic  可以设置多个topic
    //    val topics = Set("sparkDemo")

    // KafkaUtils.createDirectStream 0.8版本的写法
    //    val dstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)


    /**
      * 官网kafka1.0版本  Creating a Direct Stream  示例
      * import org.apache.kafka.clients.consumer.ConsumerRecord
      * import org.apache.kafka.common.serialization.StringDeserializer
      * import org.apache.spark.streaming.kafka010._
      * import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
      * import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
      * *
      * val kafkaParams = Map[String, Object](
      * "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
      * "key.deserializer" -> classOf[StringDeserializer],
      * "value.deserializer" -> classOf[StringDeserializer],
      * "group.id" -> "use_a_separate_group_id_for_each_stream",
      * "auto.offset.reset" -> "latest",
      * "enable.auto.commit" -> (false: java.lang.Boolean)
      * )
      * *
      * val topics = Array("topicA", "topicB")
      * val stream = KafkaUtils.createDirectStream[String, String](
      * streamingContext,
      * PreferConsistent,
      * Subscribe[String, String](topics, kafkaParams)
      * )
      * *
      *stream.map(record => (record.key, record.value))
      */

    //1.0版本的写法
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "bigdata-01:9092,bigdata-02:9092,bigdata-03:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "zy_test_direct",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Set("sparkDemo")
    val dstream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

    //--------------------------获取topic数据----------------------------

    //0.8版本可以这么写 1.0版本不能这么写了
    //val data: DStream[String] = dstream.map(_._2)

    //1.0版本 DStream.map
    val data: DStream[String] = dstream.map(_.value())
    //切分
    val words: DStream[String] = data.flatMap(_.split(","))
    //计数
    val wordsAndOne: DStream[(String, Int)] = words.map((_, 1))
    //聚合
    //val result: DStream[(String, Int)] = wordsAndOne.reduceByKey(_ + _)

    /**
      * 使用updateStateByKey 累计统计单词出现的次数
      * 需要传一个函数进去
      */
    val result: DStream[(String, Int)] = wordsAndOne.updateStateByKey(updateFunction)

    //打印
    result.print()

    //开启流计算
    ssc.start()
    ssc.awaitTermination()
  }


  /**
    *
    * @param newValues    表示当前批次汇总成的(word,1)中相同单词的所有的1
    * @param historyCount 历史的所有相同key的value总和
    * @return
    */
  def updateFunction(newValues: Seq[Int], historyCount: Option[Int]): Option[Int] = {
    //新的计数等于原来的计数加上这次数据的sum
    val newCount: Int = historyCount.getOrElse(0) + newValues.sum
    //将累加后的结果放到Option的子集Some中返回
    Some(newCount)
  }
}

 

kafka2streaming的java实现版本