spark-streaming-kafka包源码分析

时间:2022-04-28 20:52:55

转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/5443789.html   

 

    最近由于使用sparkstreaming的同学需要对接到部门内部的的kafka集群,由于官方的spark-streaming-kafka包和现有公司的kafka集群权限系统无法对接,需要研究下spark-streaming-kafka包原有代码以便改造,本文研究的代码版本为spark在github的tag的v1.6.1版本。

     官方给出的JavaKafkaWordCount以及KafkaWordCount代码里产生kafka-streaming消费流数据的调用代码分别如下

 JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);


 val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

  

可以看到无论是java还是scala调用的都是KafkaUtils内重载实现的createStream方法。

object KafkaUtils {
  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param ssc       StreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   * @param groupId   The group id for this consumer
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread
   * @param storageLevel  Storage level to use for storing the received objects
   *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)] = {
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000")
    createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics, storageLevel)
  }

  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param ssc         StreamingContext object
   * @param kafkaParams Map of kafka configuration parameters,
   *                    see http://kafka.apache.org/08/configuration.html
   * @param topics      Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                    in its own thread.
   * @param storageLevel Storage level to use for storing the received objects
   * @tparam K type of Kafka message key
   * @tparam V type of Kafka message value
   * @tparam U type of Kafka message key decoder
   * @tparam T type of Kafka message value decoder
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)] = {
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
  }

  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
   * @param jssc      JavaStreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   * @param groupId   The group id for this consumer
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      jssc: JavaStreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: JMap[String, JInt]
    ): JavaPairReceiverInputDStream[String, String] = {
    createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
  }

  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param jssc      JavaStreamingContext object
   * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..).
   * @param groupId   The group id for this consumer.
   * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                  in its own thread.
   * @param storageLevel RDD storage level.
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream(
      jssc: JavaStreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: JMap[String, JInt],
      storageLevel: StorageLevel
    ): JavaPairReceiverInputDStream[String, String] = {
    createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
      storageLevel)
  }

  /**
   * Create an input stream that pulls messages from Kafka Brokers.
   * @param jssc      JavaStreamingContext object
   * @param keyTypeClass Key type of DStream
   * @param valueTypeClass value type of Dstream
   * @param keyDecoderClass Type of kafka key decoder
   * @param valueDecoderClass Type of kafka value decoder
   * @param kafkaParams Map of kafka configuration parameters,
   *                    see http://kafka.apache.org/08/configuration.html
   * @param topics  Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   *                in its own thread
   * @param storageLevel RDD storage level.
   * @tparam K type of Kafka message key
   * @tparam V type of Kafka message value
   * @tparam U type of Kafka message key decoder
   * @tparam T type of Kafka message value decoder
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
      jssc: JavaStreamingContext,
      keyTypeClass: Class[K],
      valueTypeClass: Class[V],
      keyDecoderClass: Class[U],
      valueDecoderClass: Class[T],
      kafkaParams: JMap[String, String],
      topics: JMap[String, JInt],
      storageLevel: StorageLevel
    ): JavaPairReceiverInputDStream[K, V] = {
    implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
    implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)

    implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
    implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)

    createStream[K, V, U, T](
      jssc.ssc,
      kafkaParams.asScala.toMap,
      Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
      storageLevel)
  }

 

其中java相关的第三个和第四个createStream调用了第一个createStream,而第一个createStream最后调用的是第二个createStream,所以所有的rdd数据流都是从下面这句代码产生的:

new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)

查看KafkaInputDStream类定义,发现获取receiver有两种类型:KafkaReceiver和ReliableKafkaReceiver。

  def getReceiver(): Receiver[(K, V)] = {
    if (!useReliableReceiver) {
      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    } else {
      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    }
  }

其中,KafkaReceiver实现比较简单,调用的是kafka的high level api产生数据流,产生的每个线程的数据流都被放到一个线程池由单独的线程来消费

val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

 ReliableKafkaReceiver是结合了spark的预写日志(Write Ahead Logs)功能,开启这个功能需要设置sparkconf属性 spark.streaming.receiver.writeAheadLog.enable为真(默认值是假)

这个receiver会把收到的kafka数据首先存储到日志上,然后才会向kafka提交offset,这样保证了在driver程序出现问题的时候不会丢失kafka数据。

 

 

参考文章 Spark Streaming容错的改进和零数据丢失