sparkstreaming直连方式跟receiver方式整合kafka

时间:2020-11-28 20:48:23

sparkstreaming整合kafka又两种方式(kafka0.8有支持两种方式,kafka0.10仅支持直连方式,舍弃了receive方式)

1,有receive方式。相当有接收者接收一个批次产生的数据。然后再进行计算。缺点:使用高级的消费API连接ZK,自动更新偏移量,但是效率低。

2,直连方式。相当与Task直接连到kafka对应topic分区上,以迭代器的方式一条一条读取数据。边读边计算,计算一个批次的时间,生成一个批次的小结果。该消费方式,不需要连接ZK,直接连接到broker上,但是需要自己维护偏移量。偏移量可以记录到Mysql,Redis,ZK中。下面的代码就是将记录的偏移量保存到ZK中。


消费端不依赖zookeeper来维护偏移量,需要自己写代码记录偏移量,这种方式优缺点:优点:不会造成数据丢失,因为直接连接kafka的分区。缺点代码复杂。

package cn.edu360.streaming


import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Duration, StreamingContext}
//kafka0.8版本
object KafkaDirectWordCount {

  def main(args: Array[String]): Unit = {

    //指定组名
    val group = "g001"
    //创建SparkConf
    val conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]")
    //创建SparkStreaming,并设置间隔时间
    val ssc = new StreamingContext(conf, Duration(5000))
    //指定消费的 topic 名字
    val topic = "wordcount"
    //指定kafka的broker地址(sparkStream的Task直连到kafka的分区上,用更加底层的API消费,效率更高)
    val brokerList = "node-1.xiaoniu.com:9092,node-2.xiaoniu.com:9092,node-3.xiaoniu.com:9092"

    //指定zk的地址,后期更新消费的偏移量时使用
    val zkQuorum = "node-1.xiaoniu.com:2181,node-2.xiaoniu.com:2181,node-3.xiaoniu.com:2181"
    //创建 stream 时使用的 topic 名字集合
    val topics: Set[String] = Set(topic)

    //创建一个 ZKGroupTopicDirs 对象,其实是指定往zk中写入数据的目录,用于保存偏移量
    val topicDirs = new ZKGroupTopicDirs(group, topic)
    //获取 zookeeper 中的路径 "/g001/offsets/wordcount/"
    val zkTopicPath = s"${topicDirs.consumerOffsetDir}"

    //准备kafka的参数
    val kafkaParams = Map(
      "metadata.broker.list" -> brokerList,
      "group.id" -> group,
      "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
    )

    //zookeeper 的host 和 ip,创建一个 client,用于跟新偏移量量的
    val zkClient = new ZkClient(zkQuorum)

    //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
    // /g001/offsets/wordcount/0/10001"
    // /g001/offsets/wordcount/1/30001"
    // /g001/offsets/wordcount/2/10001"
    //zkTopicPath  -> /g001/offsets/wordcount/
    val children = zkClient.countChildren(zkTopicPath)

    var kafkaStream: InputDStream[(String, String)] = null

    //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
    var fromOffsets: Map[TopicAndPartition, Long] = Map()

    //如果保存过 offset
    if (children > 0) {
      for (i <- 0 until children) {
        // /g001/offsets/wordcount/0/10001
        val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}")
        // wordcount/0
        val tp = TopicAndPartition(topic, i)
        //将不同 partition 对应的 offset 增加到 fromOffsets 中
        // wordcount/0 -> 10001
        fromOffsets += (tp -> partitionOffset.toLong)
      }
      //Key: wordcount   values: "hello tom hello jerry"
      //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
      
      //通过KafkaUtils创建直连的DStream(fromOffsets参数的作用是:按照前面计算好了的偏移量继续消费数据)
      //[String, String, StringDecoder, StringDecoder,     (String, String)]
      //  key    value    key的解码方式   value的解码方式
      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
    } else {
      //如果未保存,根据 kafkaParam 的配置使用最新(largest)或者最旧的(smallest) offset
      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    }

    //偏移量的范围
    var offsetRanges = Array[OffsetRange]()

    //从kafka读取的消息,DStream的Transform方法可以将当前批次的RDD获取出来
    //该transform方法计算获取到当前批次RDD的偏移量
    val transform: DStream[(String, String)] = kafkaStream.transform { rdd =>
      //得到该 rdd 对应 kafka 的消息的 offset
      //该RDD是一个KafkaRDD,可以获得偏移量的范围
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }
    val messages: DStream[String] = transform.map(_._2)

    //依次迭代DStream中的RDD
    messages.foreachRDD { rdd =>
      //对RDD进行操作,触发Action
      rdd.foreachPartition(partition =>
        partition.foreach(x => {
          println(x)
        })
      )

      for (o <- offsetRanges) {
        val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
        //将该 partition 的 offset 保存到 zookeeper
        ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

kafka0.10版本

package cn.edu360.streaming.kafka10

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
  * Created by zx on 2017/7/6.
  */
object DirectStream {

  def main(args: Array[String]): Unit = {

    //创建SparkConf,如果将任务提交到集群中,那么要去掉.setMaster("local[2]")
    val conf = new SparkConf().setAppName("DirectStream").setMaster("local[2]")
    //创建一个StreamingContext,其里面包含了一个SparkContext
    val streamingContext = new StreamingContext(conf, Seconds(5))

    //配置kafka的参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node-1.xiaoniu.com:9092,node-2.xiaoniu.com:9092,node-3.xiaoniu.com:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test123",
      "auto.offset.reset" -> "earliest", // lastest
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("xiaoniu")
    //在Kafka中记录读取偏移量
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      //位置策略(可用的Executor上均匀分配分区)
      LocationStrategies.PreferConsistent,
      //消费策略(订阅固定的主题集合)
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    //迭代DStream中的RDD(KafkaRDD),将每一个时间点对于的RDD拿出来
    stream.foreachRDD { rdd =>
      //获取该RDD对于的偏移量
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      //拿出对应的数据
      rdd.foreach{ line =>
        println(line.key() + " " + line.value())
      }
      //异步更新偏移量到kafka中
      // some time later, after outputs have completed
      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

以下两种方式在实际生产环境中不使用。利用zookeeper来维护偏移量,不用自己再去维护,但是这种方式运行速度慢,这种方式不是直接从kafka的分区中获取数据,而是相当于拿一个容器,每隔一段时间取出,当写入大数据量时内存无法承受,容易造成内存溢出,数据丢失。

package cn.edu360.streaming

import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StateFulKafkaWordCount {

  /**
    * 迭代器元组中的三个参数:
    * 第一个参数:代表分组的KEY
    * 第二个参数:当前批次Key对应的Value,由于有多个Value,那么会将这个批次的Value放到一个SEQ中
    * 第三个参数:代表初始值或累加的中间结果
    */
  val updateFunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
    //it.map(tp => (tp._1, tp._2.sum + tp._3.getOrElse(0)))
    it.map{ case (x, y, z) => (x, y.sum + z.getOrElse(0))}
  }


  def main(args: Array[String]): Unit = {

    val sc = new SparkConf().setAppName("StateFulKafkaWordCount").setMaster("local[*]")

    val ssc = new StreamingContext(sc, Seconds(5))

    //如果要累加历史结果,一定要指定CheckPiont
    ssc.checkpoint("./ck")

    //传统低效的API,需要连ZK
    //通过kafkaUtils创建kafkaStream
    val zkQuorum = "node-1.xiaoniu.com:2181,node-2.xiaoniu.com:2181,node-3.xiaoniu.com:2181"
    val groupId = "g1"
    //要消费的topic的名字,消费者的线程数量
    val topic = Map[String, Int]("wordcount" -> 1)
    //从kafka中拉取数据
    //通过kafka创建DStream
    val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)

    //获取kafka的value即为真正存储的数据
    val lines: DStream[String] = data.map(_._2)

    val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)

    //触发Action
    result.print()

    //启动
    ssc.start()

    //等待优雅退出
    ssc.awaitTermination()

  }
}

方式二

package cn.edu360.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaWordCount {

  def main(args: Array[String]): Unit = {

    val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[*]")

    val ssc = new StreamingContext(sc, Seconds(5))

    //传统低效的API,需要连ZK
    //通过kafkaUtils创建kafkaStream
    val zkQuorum = "node-1.xiaoniu.com:2181,node-2.xiaoniu.com:2181,node-3.xiaoniu.com:2181"
    val groupId = "g1"
    //要消费的topic的名字,消费者的线程数量
    val topic = Map[String, Int]("wordcount" -> 1)
    //从kafka中拉取数据
    //通过kafka创建DStream
    val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic)

    //获取kafka的value即为真正存储的数据
    val lines: DStream[String] = data.map(_._2)

    val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

    //触发Action
    result.print()

    //启动
    ssc.start()

    //等待优雅退出
    ssc.awaitTermination()
  }
}