Spark Streaming +Kafka 使用底层API直接读取Kafka的Partition数据,手动更新Offset到Zookeeper集群

时间:2021-06-08 20:57:53

      Spark Streaming  +Kafka 使用底层API直接读取Kafka的Partition数据,正常Offset存储在CheckPoint中。但是这样无法实现Kafka监控工具对Kafka的监控,所以手动更新Offset到Zookeeper集群中


相关源码简单介绍:

1:TopicAndPartition是对 topic和partition的id的封装的一个样例类

  case class TopicAndPartition(topic: String, partitionId: Int)

2:OffsetRange 是对topic name,partition id,fromOffset(当前消费的开始偏移),untilOffset(当前消费的结束偏移)的封装。所以OffsetRange 包含信息有:topic名字,分区Id,开始偏移,结束偏移。
 /**
*
* @param topic Kafka topic name
* @param partition Kafka partition id
* @param fromOffset inclusive starting offset
* @param untilOffset exclusive ending offset
*/
final class OffsetRange private(val topic: String, val partition: Int, val fromOffset: Long, val untilOffset: Long) extends Serializable


3:代码实现:
object Iteblog {

val brokerAddress = "http://www.iteblog.com:9092"

val groupID="testGroup"

val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokerAddress,
"group.id" -> "iteblog")

def main(args: Array[String]) {

val sparkConf = new SparkConf().setAppName("Test")
sparkConf.set("spark.kryo.registrator", "utils.CpcKryoSerializer")
val sc = new SparkContext(sparkConf)


val ssc = new StreamingContext(sc, Seconds(2))
val topicsSet = Set("iteblog")

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
messages.foreachRDD(rdd => {
// 把RDD转成HasOffsetRanges类型(KafkaRDD extends HasOffsetRanges)
// OffsetRange 说明:Represents a range of offsets from a single Kafka TopicAndPartition.
// OffsetRange 说明: Instances of this class can be created with `OffsetRange.create()`.
val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// offsetRanges 的实现代码(KafkaRDD中):tp:TopicAndPartition,fo:fromOffset
// val offsetRanges = fromOffsets.map { case (tp, fo) =>
// val uo = untilOffsets(tp)
// OffsetRange(tp.topic, tp.partition, fo, uo.offset)
// }.toArray

val kc = new KafkaCluster(kafkaParams)
for (offsets <- offsetsList) {
//TopicAndPartition 主构造参数第一个是topic,第二个是 partition id
val topicAndPartition = TopicAndPartition("iteblog", offsets.partition) //offsets.partition表示的是Kafka partition id
val o = kc.setConsumerOffsets(groupID, Map((topicAndPartition, offsets.untilOffset)))//offsets.untilOffset:是
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
})

ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}

核心代码讲解:
使用KafkaUtils的createDirectStream方法,调用底层API直接消费Kafka Partition的数据(Kafka Partition和RDD Partition 一一对应)。createDirectStream返回值是DStream,底层是RDD。
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

messages.foreachRDD 是对messages底层RDD计算其偏移范围。
KafkaRDD和HasOffsetRanges关系(构造参数和泛型省略,具体见源码):
KafkaRDD extends RDD[R](sc, Nil) with Logging with HasOffsetRanges

//rdd是messages.foreachRDD中的变量,rdd其类型是KafkaRDD,但是由于多态的原因rdd实际上不是KafkaRDD类型,而是RDD类型,所以需要向下转型为HasOffsetRanges,调用offsetRanges方法。(回忆OffsetRange是对什么的封装?答案:topic名字,分区Id,开始偏移,结束偏移。
val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges 的实现代码(KafkaRDD中):tp:TopicAndPartition,fo:fromOffset
val offsetRanges = fromOffsets.map { case (tp, fo) =>   val uo = untilOffsets(tp)   OffsetRange(tp.topic, tp.partition, fo, uo.offset)}.toArray

KafkaCluster 完成对更新偏移到zookeeper集群的封装工具类。
val kc = new KafkaCluster(kafkaParams)

offsetRanges数组遍历:setConsumerOffsets是KafkaCluster 
强调:OffsetRange是对什么的封装?答案:topic名字,分区Id,开始偏移,结束偏移
for (offsets <- offsetsList) {
    //offsets.untilOffset是结束偏移量
val o = kc.setConsumerOffsets(groupID, Map((topicAndPartition, offsets.untilOffset))) if (o.isLeft) { println(s"Error updating the offset to Kafka cluster: ${o.left.get}") } }
以上是完成对createDirectStream创建的DStream中的一个底层RDD完成偏移的更新zookeeper集群,通过foreachRDD完成对所有RDD的更新!!!至此已经完成RDD偏移的计算以及更新,但是具体的更新方法?在KafkaCluster中。接下来看KafkaCluster代码。
直接上代码:
package org.apache.spark.streaming.kafka//使用org.apache.spark.streaming.kafka的原因:  private[spark] object SimpleConsumerConfig限制只在spark包中使用!import kafka.api.OffsetCommitRequestimport kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}import kafka.consumer.SimpleConsumerimport org.apache.spark.SparkExceptionimport org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfigimport scala.collection.mutable.ArrayBufferimport scala.util.Randomimport scala.util.control.NonFatalclass KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {                 //Err是类型ArrayBuffer[Throwable]别名   type Err = ArrayBuffer[Throwable]   @transient private var _config: SimpleConsumerConfig = null   def config: SimpleConsumerConfig = this.synchronized {     if (_config == null) {       //SimpleConsumerConfig的apply方法部分代码:       //val brokers = kafkaParams.get("metadata.broker.list").orElse(kafkaParams.get("bootstrap.servers"))       //所以kafkaParams必须包含key=metadata.broker.list或者bootstrap.servers对应的Value       _config = SimpleConsumerConfig(kafkaParams)     }     _config   }  /**    *    * @param groupId: String    * @param offsets: Map[TopicAndPartition, Long]    * @return    */   def setConsumerOffsets(groupId: String,                          offsets: Map[TopicAndPartition, Long]                           ): Either[Err, Map[TopicAndPartition, Short]] = {     setConsumerOffsetMetadata(groupId, offsets.map { kv =>       kv._1 -> OffsetMetadataAndError(kv._2)     })   }   def setConsumerOffsetMetadata(groupId: String,                                 metadata: Map[TopicAndPartition, OffsetMetadataAndError]                                  ): Either[Err, Map[TopicAndPartition, Short]] = {     var result = Map[TopicAndPartition, Short]()     val req = OffsetCommitRequest(groupId, metadata)     val errs = new Err     val topicAndPartitions = metadata.keySet     withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>       val resp = consumer.commitOffsets(req)       val respMap = resp.requestInfo       val needed = topicAndPartitions.diff(result.keySet)       needed.foreach { tp: TopicAndPartition =>         respMap.get(tp).foreach { err: Short =>           if (err == ErrorMapping.NoError) {             result += tp -> err           } else {             errs.append(ErrorMapping.exceptionFor(err))           }         }       }       if (result.keys.size == topicAndPartitions.size) {         return Right(result)       }     }     val missing = topicAndPartitions.diff(result.keySet)     errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))     Left(errs)   }   private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)                          (fn: SimpleConsumer => Any): Unit = {     brokers.foreach { hp =>       var consumer: SimpleConsumer = null       try {         consumer = connect(hp._1, hp._2)         fn(consumer)       } catch {         case NonFatal(e) =>           errs.append(e)       } finally {         if (consumer != null) {           consumer.close()         }       }     }   }   def connect(host: String, port: Int): SimpleConsumer =     new SimpleConsumer(host, port, config.socketTimeoutMs,       config.socketReceiveBufferBytes, config.clientId) }


小拓展:
Map的连续获取值用法:
object MapDemo extends App {  val map = Map("1" -> "11", "2" -> "22", "3" -> "33")  map.map {    case (a, b) =>      println(a + " " + b)  }//语法:先获取key=4的value,如果存在返回否则获取key=5的value,如果存在返回,不存在的话直接异常。  val result = map.get("4").orElse(map.get("5")).getOrElse(throw new Exception("exception"))  println(result)}


核心代码讲解之SimpleConsumerConfig的apply方法:
    /**     * Make a consumer config without requiring group.id or zookeeper.connect,     * since communicating with brokers also needs common settings such as timeout     */    def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {// These keys are from other pre-existing kafka configs for specifying brokers,accept either     // map连续获取值用法见上面示例      val brokers = kafkaParams.get("metadata.broker.list")        .orElse(kafkaParams.get("bootstrap.servers"))        .getOrElse(throw new SparkException(          "Must specify metadata.broker.list or bootstrap.servers"))      val props = new Properties()      kafkaParams.foreach { case (key, value) =>        // prevent warnings on parameters ConsumerConfig doesn't know about        if (key != "metadata.broker.list" && key != "bootstrap.servers") {          props.put(key, value)        }      }     //如果没有zookeeper.connect和group.id,设置其value为空字符串。      Seq("zookeeper.connect", "group.id").foreach { s =>        if (!props.contains(s)) {          props.setProperty(s, "")        }      }      new SimpleConsumerConfig(brokers, props)    }
OffsetMetadataAndError样例类:
case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) {  def offset = offsetMetadata.offset  def metadata = offsetMetadata.metadata  override def toString = "[%s,ErrorCode %d]".format(offsetMetadata, error)}



TopicAndPartition是对 topic和partition的id的封装的一个样例类
 /**    *    * @param groupId: String    * @param offsets: Map[TopicAndPartition, Long]
    * Map中key和value含义TopicAndPartition 是topic和partition id封装long是消费的结束偏移
* @return */ def setConsumerOffsets(groupId: String,offsets: Map[TopicAndPartition, Long]): Either[Err, Map[TopicAndPartition, Short]] = { setConsumerOffsetMetadata(groupId, offsets.map { kv => kv._1 -> OffsetMetadataAndError(kv._2) }) }



具体更多细节见代码注释吧,写的很详细。就不在重复贴在博客了。
github地址:https://github.com/Dax1n/spark-kafka-directstream-zookeeper