sparkStreaming消费kafka-1.0.1方式:direct方式(存储offset到zookeeper)

时间:2020-11-28 20:48:17

版本声明:

kafka:1.0.1

spark:2.1.0

注意:在使用过程中可能会出现servlet版本不兼容的问题,因此在导入maven的pom文件的时候,需要做适当的排除操作

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0"
  3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5     <modelVersion>4.0.0</modelVersion>
  6 
  7     <groupId>kafkaDirect</groupId>
  8     <artifactId>kafkaDirect</artifactId>
  9     <version>1.0-SNAPSHOT</version>
 10     <repositories>
 11         <repository>
 12             <id>cloudera-releases</id>
 13             <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
 14             <releases>
 15                 <enabled>true</enabled>
 16             </releases>
 17             <snapshots>
 18                 <enabled>false</enabled>
 19             </snapshots>
 20         </repository>
 21     </repositories>
 22 
 23     <dependencies>
 24     <dependency>
 25         <groupId>org.apache.spark</groupId>
 26         <artifactId>spark-streaming_2.11</artifactId>
 27         <version>2.1.0</version>
 28         <exclusions>
 29             <exclusion>
 30                 <groupId>javax.servlet</groupId>
 31                 <artifactId>servlet-api</artifactId>
 32             </exclusion>
 33         </exclusions>
 34     </dependency>
 35     <!--<dependency>-->
 36         <!--<groupId>org.apache.spark</groupId>-->
 37         <!--<artifactId>spark-streaming-kafka_2.11</artifactId>-->
 38         <!--<version>2.1.0</version>-->
 39         <!--<exclusions>-->
 40             <!--<exclusion>-->
 41                 <!--<groupId>javax.servlet</groupId>-->
 42                 <!--<artifactId>servlet-api</artifactId>-->
 43             <!--</exclusion>-->
 44         <!--</exclusions>-->
 45     <!--</dependency>-->
 46         <dependency>
 47             <groupId>org.apache.spark</groupId>
 48             <artifactId>spark-core_2.11</artifactId>
 49             <version>2.1.0</version>
 50             <exclusions>
 51                 <exclusion>
 52                     <groupId>javax.servlet</groupId>
 53                     <artifactId>servlet-api</artifactId>
 54                 </exclusion>
 55             </exclusions>
 56         </dependency>
 57         <dependency>
 58             <groupId>org.scala-lang</groupId>
 59             <artifactId>scala-library</artifactId>
 60             <version>2.11.8</version>
 61             <exclusions>
 62                 <exclusion>
 63                     <groupId>javax.servlet</groupId>
 64                     <artifactId>servlet-api</artifactId>
 65                 </exclusion>
 66             </exclusions>
 67         </dependency>
 68         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
 69         <dependency>
 70             <groupId>org.apache.kafka</groupId>
 71             <artifactId>kafka-clients</artifactId>
 72             <version>1.0.1</version>
 73             <exclusions>
 74                 <exclusion>
 75                     <groupId>javax.servlet</groupId>
 76                     <artifactId>servlet-api</artifactId>
 77                 </exclusion>
 78             </exclusions>
 79         </dependency>
 80         <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
 81         <dependency>
 82             <groupId>org.apache.spark</groupId>
 83             <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
 84             <version>2.1.0</version>
 85         </dependency>
 86 
 87         <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
 88         <dependency>
 89             <groupId>org.apache.hbase</groupId>
 90             <artifactId>hbase-client</artifactId>
 91             <version>1.2.0-cdh5.14.0</version>
 92         </dependency>
 93         <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
 94         <dependency>
 95             <groupId>org.apache.hbase</groupId>
 96             <artifactId>hbase-common</artifactId>
 97             <version>1.2.0-cdh5.14.0</version>
 98         </dependency>
 99 
100     </dependencies>
101 </project>

代码:

因为使用了zookeeper作为offset的存储,因此任何能够监控zookeeper的框架,都可以监控当前kafka消费状况

例如:kafkaOffsetMonitor

https://github.com/quantifind/KafkaOffsetMonitor/releases

其中注意的小点:

1:在zookeeper中offset存储路径:/consumers/[groupId]/offsets/topic/[partitionId]

2:读取offset操作,其实就是去zookeeper的路径下拿offset值,代码:

 1 def readOffsets(
 2                    topics: Seq[String],
 3                    groupId:String ,
 4                    zkUtils: ZkUtils
 5                  ): Map[TopicPartition, Long] = {
 6 
 7     val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
 8 
 9     val partitionMap = zkUtils.getPartitionsForTopics(topics)
10 
11     // /consumers/<groupId>/offsets/<topic>/
12 
13     partitionMap.foreach(topicPartitions => {
14 
15       val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
16       //遍历每一个分区下的数据
17       topicPartitions._2.foreach(partition => {
18 
19         val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
20         try {
21 
22           val offsetStatTuple = zkUtils.readData(offsetPath)
23           if (offsetStatTuple != null) {
24             topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
25 
26           }
27 
28         } catch {
29 
30           case e: Exception =>
31 
32 //            println("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
33             println("message: {} , topic: {}, partition: {},  node path: {}" , e.getMessage , topics , topicPartitions ,  offsetPath)
34             topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
35 
36         }
37 
38       })
39 
40     })
41 
42     topicPartOffsetMap.toMap
43 
44   }

3:提交offset代码,实际就是将offset存储到zookeeper中

def persistOffsets(
                      offsets: Seq[OffsetRange],
                      groupId: String,
                      storeEndOffset: Boolean = true,
                      zkUtils: ZkUtils
                    ): Unit = {


    offsets.foreach(or => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
      val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
      val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
      println(or.topic.toString , or.partition.toString , offsetVal , offsetPath)
      zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "")//, JavaConversions.bufferAsJavaList(acls)

    })

  }

 

完整代码

  1 package offsetInZookeeper
  2 
  3 import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
  4 import org.I0Itec.zkclient.ZkClient
  5 import org.apache.kafka.clients.consumer.ConsumerRecord
  6 import org.apache.kafka.common.TopicPartition
  7 import org.apache.kafka.common.serialization.StringDeserializer
  8 import org.apache.spark.streaming.kafka010.ConsumerStrategies.{Assign, Subscribe}
  9 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
 10 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange}
 11 import org.apache.spark.{SparkConf, SparkContext}
 12 import org.apache.spark.streaming.{Seconds, StreamingContext}
 13 import org.apache.zookeeper.ZooDefs
 14 import org.apache.zookeeper.data.ACL
 15 
 16 import scala.collection.JavaConversions
 17 import scala.collection.mutable.ListBuffer
 18 
 19 /**
 20   * Created by angel
 21   */
 22 object KafkaOffsetInZookeeper {
 23   def main(args: Array[String]): Unit = {
 24     //5 cdh1:9092,cdh2:9092,cdh3:9092 test2 zk cdh1:2181,cdh2:2181,cdh3:2181
 25     if (args.length < 5) {
 26       System.err.println("Usage: KafkaDirectStreamTest " +
 27         "<batch-duration-in-seconds> " +
 28         "<kafka-bootstrap-servers> " +
 29         "<kafka-topics> " +
 30         "<kafka-consumer-group-id> " +
 31         "<kafka-zookeeper-quorum>")
 32       System.exit(1)
 33     }
 34 
 35     val batchDuration = args(0)
 36     val bootstrapServers = args(1).toString
 37     val topicsSet = args(2).toString.split(",").toSet
 38     val consumerGroupID = args(3)
 39     val zkQuorum = args(4)
 40     val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog")
 41       .setMaster("local[4]")//Uncomment this line to test while developing on a workstation
 42     val sc = new SparkContext(sparkConf)
 43     val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
 44     val topics = topicsSet.toArray
 45     val topic = topics(0)
 46     //  /consumers/[groupId]/offsets/topic/[partitionId]
 47     //+"/consumers/"+consumerGroupID+"/offsets/"+topic
 48     val zkKafkaRootDir = zkQuorum + "/consumers/"+consumerGroupID+"/offsets/"+topic
 49     val zkSessionTimeOut = 10000
 50     val zkConnectionTimeOut = 10000
 51     val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkKafkaRootDir, zkSessionTimeOut, zkConnectionTimeOut)
 52     val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false)
 53 
 54 
 55     val kafkaParams = Map[String, Object](
 56       "bootstrap.servers" -> bootstrapServers,
 57       "key.deserializer" -> classOf[StringDeserializer],
 58       "value.deserializer" -> classOf[StringDeserializer],
 59       "group.id" -> consumerGroupID,
 60       "auto.offset.reset" -> "latest",
 61       "enable.auto.commit" -> (false: java.lang.Boolean)
 62     )
 63 
 64     //去zookeeper上拿offset
 65     val fromOffsets: Map[TopicPartition, Long] = readOffsets(topics , consumerGroupID , zkUtils)
 66     //根据offset获取数据
 67 //    val inputDStream = KafkaUtils.createDirectStream[String, String](
 68 //      ssc,
 69 //      PreferConsistent,
 70 //      Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets)
 71 //    )
 72 
 73     //offsets: ju.Map[TopicPartition, jl.Long]
 74 //    val inputDStream = KafkaUtils.createDirectStream[String, String](
 75 //      ssc,
 76 //      PreferConsistent,
 77 //      Subscribe[String, String](topics, kafkaParams , fromOffsets)
 78 //    )
 79     val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets))
 80     //处理数据,处理完事之后将offset写入zookeeper
 81     var storeEndOffset: Boolean = false
 82     inputDStream.foreachRDD((rdd,batchTime) => {
 83 
 84       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 85       offsetRanges.foreach(
 86         offset =>
 87           println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset)
 88       )
 89       val newRDD = rdd.map(message => processMessage(message))
 90 //      newRDD.count()
 91       persistOffsets(offsetRanges,consumerGroupID,storeEndOffset,zkUtils)
 92     })
 93 
 94 //    println("Number of messages processed " + inputDStream.count())
 95     ssc.start()
 96     ssc.awaitTermination()
 97 
 98 
 99 
100   }
101 
102   /*
103     Create a dummy process that simply returns the message as is.
104      */
105   def processMessage(message:ConsumerRecord[String,String]):ConsumerRecord[String,String]={
106     message
107   }
108 
109   def readOffsets(
110                    topics: Seq[String],
111                    groupId:String ,
112                    zkUtils: ZkUtils
113                  ): Map[TopicPartition, Long] = {
114 
115     val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
116 
117     val partitionMap = zkUtils.getPartitionsForTopics(topics)
118 
119     // /consumers/<groupId>/offsets/<topic>/
120 
121     partitionMap.foreach(topicPartitions => {
122 
123       val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
124       //遍历每一个分区下的数据
125       topicPartitions._2.foreach(partition => {
126 
127         val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
128         try {
129 
130           val offsetStatTuple = zkUtils.readData(offsetPath)
131           if (offsetStatTuple != null) {
132             topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
133 
134           }
135 
136         } catch {
137 
138           case e: Exception =>
139 
140 //            println("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*)
141             println("message: {} , topic: {}, partition: {},  node path: {}" , e.getMessage , topics , topicPartitions ,  offsetPath)
142             topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L)
143 
144         }
145 
146       })
147 
148     })
149 
150     topicPartOffsetMap.toMap
151 
152   }
153 
154 
155   def persistOffsets(
156                       offsets: Seq[OffsetRange],
157                       groupId: String,
158                       storeEndOffset: Boolean = true,
159                       zkUtils: ZkUtils
160                     ): Unit = {
161 
162 
163     offsets.foreach(or => {
164       val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic);
165       val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition;
166       val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
167       println(or.topic.toString , or.partition.toString , offsetVal , offsetPath)
168       zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "")//, JavaConversions.bufferAsJavaList(acls)
169 
170     })
171 
172   }
173 
174 
175 }

 第二种代码:

package offsetInZookeeper

/**
  * Created by angel
  */
import java.lang.Object

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.util.Try
/**
  * Kafka的连接和Offset管理工具类
  *
  * @param zkHosts     Zookeeper地址
  * @param kafkaParams Kafka启动参数
  */
class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {
  //Logback日志对象,使用slf4j框架
  @transient private lazy val log = LoggerFactory.getLogger(getClass)
  //建立ZkUtils对象所需的参数
  val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 10000, 10000)
  //ZkUtils对象,用于访问Zookeeper
  val zkUtils = new ZkUtils(zkClient, zkConnection, false)
  /**
    * 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流
    *
    * @param ssc    Spark Streaming Context
    * @param topics Kafka话题
    * @tparam K Kafka消息Key类型
    * @tparam V Kafka消息Value类型
    * @return Kafka Streaming流
    */
  def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
    val groupId = kafkaParams("group.id").toString
    val storedOffsets = readOffsets(topics, groupId)
    log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2)))
    val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))
    kafkaStream
  }
  /**
    * 从Zookeeper读取Kafka消息队列的Offset
    *
    * @param topics  Kafka话题
    * @param groupId Kafka Group ID
    * @return 返回一个Map[TopicPartition, Long],记录每个话题每个Partition上的offset,如果还没消费,则offset为0
    */
  def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
    val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
    val partitionMap = zkUtils.getPartitionsForTopics(topics)
    // /consumers/<groupId>/offsets/<topic>/
    partitionMap.foreach(topicPartitions => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
      topicPartitions._2.foreach(partition => {
        val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
        val tryGetKafkaOffset = Try {
          val offsetStatTuple = zkUtils.readData(offsetPath)
          if (offsetStatTuple != null) {
            log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
            topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
          }
        }
        if(tryGetKafkaOffset.isFailure){
          //http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
          val consumer = new KafkaConsumer[String, Object](kafkaParams)
          val partitionList = List(new TopicPartition(topicPartitions._1, partition))
          consumer.assign(partitionList)
          val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head
          consumer.close()
          log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)
          topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)
        }
      })
    })
    topicPartOffsetMap.toMap
  }
  /**
    * 保存Kafka消息队列消费的Offset
    *
    * @param rdd            SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V]
    * @param storeEndOffset true=保存结束offset, false=保存起始offset
    */
  def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = {
    val groupId = kafkaParams("group.id").toString
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetsList.foreach(or => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic)
      val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition
      val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
      zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/)
      log.debug("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
    })
  }


}

object Manager{
  def main(args: Array[String]): Unit = {
    //5 cdh1:9092,cdh2:9092,cdh3:9092 test2 zk cdh1:2181,cdh2:2181,cdh3:2181
    if (args.length < 5) {
      System.err.println("Usage: KafkaDirectStreamTest " +
        "<batch-duration-in-seconds> " +
        "<kafka-bootstrap-servers> " +
        "<kafka-topics> " +
        "<kafka-consumer-group-id> " +
        "<kafka-zookeeper-quorum>")
      System.exit(1)
    }

    val batchDuration = args(0)
    val bootstrapServers = args(1).toString
    val topicsSet = args(2).toString.split(",").toSet
    val consumerGroupID = args(3)
    val zkQuorum = args(4)
    val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog")
      .setMaster("local[4]")


    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))

    val topics = topicsSet.toArray

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> bootstrapServers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> consumerGroupID,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean) //禁用自动提交Offset,否则可能没正常消费完就提交了,造成数据错误
    )

    lazy val kafkaManager = new KafkaManager(zkQuorum , kafkaParams)
    val inputDStream: InputDStream[ConsumerRecord[String, String]] = kafkaManager.createDirectStream(ssc , topics)
    inputDStream.foreachRDD(rdd => {
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      offsetRanges.foreach(
        offset =>
          println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset)
      )
      kafkaManager.persistOffsets(rdd)
    })
    ssc.start()
    ssc.awaitTermination()




  }



}