版本声明:
kafka:1.0.1
spark:2.1.0
注意:在使用过程中可能会出现servlet版本不兼容的问题,因此在导入maven的pom文件的时候,需要做适当的排除操作
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>kafkaDirect</groupId> 8 <artifactId>kafkaDirect</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 <repositories> 11 <repository> 12 <id>cloudera-releases</id> 13 <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> 14 <releases> 15 <enabled>true</enabled> 16 </releases> 17 <snapshots> 18 <enabled>false</enabled> 19 </snapshots> 20 </repository> 21 </repositories> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.apache.spark</groupId> 26 <artifactId>spark-streaming_2.11</artifactId> 27 <version>2.1.0</version> 28 <exclusions> 29 <exclusion> 30 <groupId>javax.servlet</groupId> 31 <artifactId>servlet-api</artifactId> 32 </exclusion> 33 </exclusions> 34 </dependency> 35 <!--<dependency>--> 36 <!--<groupId>org.apache.spark</groupId>--> 37 <!--<artifactId>spark-streaming-kafka_2.11</artifactId>--> 38 <!--<version>2.1.0</version>--> 39 <!--<exclusions>--> 40 <!--<exclusion>--> 41 <!--<groupId>javax.servlet</groupId>--> 42 <!--<artifactId>servlet-api</artifactId>--> 43 <!--</exclusion>--> 44 <!--</exclusions>--> 45 <!--</dependency>--> 46 <dependency> 47 <groupId>org.apache.spark</groupId> 48 <artifactId>spark-core_2.11</artifactId> 49 <version>2.1.0</version> 50 <exclusions> 51 <exclusion> 52 <groupId>javax.servlet</groupId> 53 <artifactId>servlet-api</artifactId> 54 </exclusion> 55 </exclusions> 56 </dependency> 57 <dependency> 58 <groupId>org.scala-lang</groupId> 59 <artifactId>scala-library</artifactId> 60 <version>2.11.8</version> 61 <exclusions> 62 <exclusion> 63 <groupId>javax.servlet</groupId> 64 <artifactId>servlet-api</artifactId> 65 </exclusion> 66 </exclusions> 67 </dependency> 68 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> 69 <dependency> 70 <groupId>org.apache.kafka</groupId> 71 <artifactId>kafka-clients</artifactId> 72 <version>1.0.1</version> 73 <exclusions> 74 <exclusion> 75 <groupId>javax.servlet</groupId> 76 <artifactId>servlet-api</artifactId> 77 </exclusion> 78 </exclusions> 79 </dependency> 80 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 --> 81 <dependency> 82 <groupId>org.apache.spark</groupId> 83 <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 84 <version>2.1.0</version> 85 </dependency> 86 87 <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --> 88 <dependency> 89 <groupId>org.apache.hbase</groupId> 90 <artifactId>hbase-client</artifactId> 91 <version>1.2.0-cdh5.14.0</version> 92 </dependency> 93 <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common --> 94 <dependency> 95 <groupId>org.apache.hbase</groupId> 96 <artifactId>hbase-common</artifactId> 97 <version>1.2.0-cdh5.14.0</version> 98 </dependency> 99 100 </dependencies> 101 </project>
代码:
因为使用了zookeeper作为offset的存储,因此任何能够监控zookeeper的框架,都可以监控当前kafka消费状况
例如:kafkaOffsetMonitor
https://github.com/quantifind/KafkaOffsetMonitor/releases
其中注意的小点:
1:在zookeeper中offset存储路径:/consumers/[groupId]/offsets/topic/[partitionId]
2:读取offset操作,其实就是去zookeeper的路径下拿offset值,代码:
1 def readOffsets( 2 topics: Seq[String], 3 groupId:String , 4 zkUtils: ZkUtils 5 ): Map[TopicPartition, Long] = { 6 7 val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long] 8 9 val partitionMap = zkUtils.getPartitionsForTopics(topics) 10 11 // /consumers/<groupId>/offsets/<topic>/ 12 13 partitionMap.foreach(topicPartitions => { 14 15 val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1) 16 //遍历每一个分区下的数据 17 topicPartitions._2.foreach(partition => { 18 19 val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition 20 try { 21 22 val offsetStatTuple = zkUtils.readData(offsetPath) 23 if (offsetStatTuple != null) { 24 topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong) 25 26 } 27 28 } catch { 29 30 case e: Exception => 31 32 // println("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*) 33 println("message: {} , topic: {}, partition: {}, node path: {}" , e.getMessage , topics , topicPartitions , offsetPath) 34 topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L) 35 36 } 37 38 }) 39 40 }) 41 42 topicPartOffsetMap.toMap 43 44 }
3:提交offset代码,实际就是将offset存储到zookeeper中
def persistOffsets( offsets: Seq[OffsetRange], groupId: String, storeEndOffset: Boolean = true, zkUtils: ZkUtils ): Unit = { offsets.foreach(or => { val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic); val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition; val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset println(or.topic.toString , or.partition.toString , offsetVal , offsetPath) zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "")//, JavaConversions.bufferAsJavaList(acls) }) }
完整代码
1 package offsetInZookeeper 2 3 import kafka.utils.{ZKGroupTopicDirs, ZkUtils} 4 import org.I0Itec.zkclient.ZkClient 5 import org.apache.kafka.clients.consumer.ConsumerRecord 6 import org.apache.kafka.common.TopicPartition 7 import org.apache.kafka.common.serialization.StringDeserializer 8 import org.apache.spark.streaming.kafka010.ConsumerStrategies.{Assign, Subscribe} 9 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 10 import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, OffsetRange} 11 import org.apache.spark.{SparkConf, SparkContext} 12 import org.apache.spark.streaming.{Seconds, StreamingContext} 13 import org.apache.zookeeper.ZooDefs 14 import org.apache.zookeeper.data.ACL 15 16 import scala.collection.JavaConversions 17 import scala.collection.mutable.ListBuffer 18 19 /** 20 * Created by angel 21 */ 22 object KafkaOffsetInZookeeper { 23 def main(args: Array[String]): Unit = { 24 //5 cdh1:9092,cdh2:9092,cdh3:9092 test2 zk cdh1:2181,cdh2:2181,cdh3:2181 25 if (args.length < 5) { 26 System.err.println("Usage: KafkaDirectStreamTest " + 27 "<batch-duration-in-seconds> " + 28 "<kafka-bootstrap-servers> " + 29 "<kafka-topics> " + 30 "<kafka-consumer-group-id> " + 31 "<kafka-zookeeper-quorum>") 32 System.exit(1) 33 } 34 35 val batchDuration = args(0) 36 val bootstrapServers = args(1).toString 37 val topicsSet = args(2).toString.split(",").toSet 38 val consumerGroupID = args(3) 39 val zkQuorum = args(4) 40 val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog") 41 .setMaster("local[4]")//Uncomment this line to test while developing on a workstation 42 val sc = new SparkContext(sparkConf) 43 val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong)) 44 val topics = topicsSet.toArray 45 val topic = topics(0) 46 // /consumers/[groupId]/offsets/topic/[partitionId] 47 //+"/consumers/"+consumerGroupID+"/offsets/"+topic 48 val zkKafkaRootDir = zkQuorum + "/consumers/"+consumerGroupID+"/offsets/"+topic 49 val zkSessionTimeOut = 10000 50 val zkConnectionTimeOut = 10000 51 val zkClientAndConnection = ZkUtils.createZkClientAndConnection(zkKafkaRootDir, zkSessionTimeOut, zkConnectionTimeOut) 52 val zkUtils = new ZkUtils(zkClientAndConnection._1, zkClientAndConnection._2, false) 53 54 55 val kafkaParams = Map[String, Object]( 56 "bootstrap.servers" -> bootstrapServers, 57 "key.deserializer" -> classOf[StringDeserializer], 58 "value.deserializer" -> classOf[StringDeserializer], 59 "group.id" -> consumerGroupID, 60 "auto.offset.reset" -> "latest", 61 "enable.auto.commit" -> (false: java.lang.Boolean) 62 ) 63 64 //去zookeeper上拿offset 65 val fromOffsets: Map[TopicPartition, Long] = readOffsets(topics , consumerGroupID , zkUtils) 66 //根据offset获取数据 67 // val inputDStream = KafkaUtils.createDirectStream[String, String]( 68 // ssc, 69 // PreferConsistent, 70 // Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets) 71 // ) 72 73 //offsets: ju.Map[TopicPartition, jl.Long] 74 // val inputDStream = KafkaUtils.createDirectStream[String, String]( 75 // ssc, 76 // PreferConsistent, 77 // Subscribe[String, String](topics, kafkaParams , fromOffsets) 78 // ) 79 val inputDStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, ConsumerStrategies.Subscribe[String,String](topics, kafkaParams, fromOffsets)) 80 //处理数据,处理完事之后将offset写入zookeeper 81 var storeEndOffset: Boolean = false 82 inputDStream.foreachRDD((rdd,batchTime) => { 83 84 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 85 offsetRanges.foreach( 86 offset => 87 println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset) 88 ) 89 val newRDD = rdd.map(message => processMessage(message)) 90 // newRDD.count() 91 persistOffsets(offsetRanges,consumerGroupID,storeEndOffset,zkUtils) 92 }) 93 94 // println("Number of messages processed " + inputDStream.count()) 95 ssc.start() 96 ssc.awaitTermination() 97 98 99 100 } 101 102 /* 103 Create a dummy process that simply returns the message as is. 104 */ 105 def processMessage(message:ConsumerRecord[String,String]):ConsumerRecord[String,String]={ 106 message 107 } 108 109 def readOffsets( 110 topics: Seq[String], 111 groupId:String , 112 zkUtils: ZkUtils 113 ): Map[TopicPartition, Long] = { 114 115 val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long] 116 117 val partitionMap = zkUtils.getPartitionsForTopics(topics) 118 119 // /consumers/<groupId>/offsets/<topic>/ 120 121 partitionMap.foreach(topicPartitions => { 122 123 val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1) 124 //遍历每一个分区下的数据 125 topicPartitions._2.foreach(partition => { 126 127 val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition 128 try { 129 130 val offsetStatTuple = zkUtils.readData(offsetPath) 131 if (offsetStatTuple != null) { 132 topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong) 133 134 } 135 136 } catch { 137 138 case e: Exception => 139 140 // println("retrieving offset details - no previous node exists:" + " {}, topic: {}, partition: {}, node path: {}", Seq[AnyRef](e.getMessage, topicPartitions._1, partition.toString, offsetPath): _*) 141 println("message: {} , topic: {}, partition: {}, node path: {}" , e.getMessage , topics , topicPartitions , offsetPath) 142 topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), 0L) 143 144 } 145 146 }) 147 148 }) 149 150 topicPartOffsetMap.toMap 151 152 } 153 154 155 def persistOffsets( 156 offsets: Seq[OffsetRange], 157 groupId: String, 158 storeEndOffset: Boolean = true, 159 zkUtils: ZkUtils 160 ): Unit = { 161 162 163 offsets.foreach(or => { 164 val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic); 165 val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition; 166 val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset 167 println(or.topic.toString , or.partition.toString , offsetVal , offsetPath) 168 zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "")//, JavaConversions.bufferAsJavaList(acls) 169 170 }) 171 172 } 173 174 175 }
第二种代码:
package offsetInZookeeper /** * Created by angel */ import java.lang.Object import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils} import org.slf4j.LoggerFactory import scala.collection.JavaConversions._ import scala.reflect.ClassTag import scala.util.Try /** * Kafka的连接和Offset管理工具类 * * @param zkHosts Zookeeper地址 * @param kafkaParams Kafka启动参数 */ class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable { //Logback日志对象,使用slf4j框架 @transient private lazy val log = LoggerFactory.getLogger(getClass) //建立ZkUtils对象所需的参数 val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 10000, 10000) //ZkUtils对象,用于访问Zookeeper val zkUtils = new ZkUtils(zkClient, zkConnection, false) /** * 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流 * * @param ssc Spark Streaming Context * @param topics Kafka话题 * @tparam K Kafka消息Key类型 * @tparam V Kafka消息Value类型 * @return Kafka Streaming流 */ def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = { val groupId = kafkaParams("group.id").toString val storedOffsets = readOffsets(topics, groupId) log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2))) val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets)) kafkaStream } /** * 从Zookeeper读取Kafka消息队列的Offset * * @param topics Kafka话题 * @param groupId Kafka Group ID * @return 返回一个Map[TopicPartition, Long],记录每个话题每个Partition上的offset,如果还没消费,则offset为0 */ def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = { val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long] val partitionMap = zkUtils.getPartitionsForTopics(topics) // /consumers/<groupId>/offsets/<topic>/ partitionMap.foreach(topicPartitions => { val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1) topicPartitions._2.foreach(partition => { val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition val tryGetKafkaOffset = Try { val offsetStatTuple = zkUtils.readData(offsetPath) if (offsetStatTuple != null) { log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*) topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong) } } if(tryGetKafkaOffset.isFailure){ //http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html val consumer = new KafkaConsumer[String, Object](kafkaParams) val partitionList = List(new TopicPartition(topicPartitions._1, partition)) consumer.assign(partitionList) val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head consumer.close() log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*) topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset) } }) }) topicPartOffsetMap.toMap } /** * 保存Kafka消息队列消费的Offset * * @param rdd SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V] * @param storeEndOffset true=保存结束offset, false=保存起始offset */ def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = { val groupId = kafkaParams("group.id").toString val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetsList.foreach(or => { val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic) val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/) log.debug("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*) }) } } object Manager{ def main(args: Array[String]): Unit = { //5 cdh1:9092,cdh2:9092,cdh3:9092 test2 zk cdh1:2181,cdh2:2181,cdh3:2181 if (args.length < 5) { System.err.println("Usage: KafkaDirectStreamTest " + "<batch-duration-in-seconds> " + "<kafka-bootstrap-servers> " + "<kafka-topics> " + "<kafka-consumer-group-id> " + "<kafka-zookeeper-quorum>") System.exit(1) } val batchDuration = args(0) val bootstrapServers = args(1).toString val topicsSet = args(2).toString.split(",").toSet val consumerGroupID = args(3) val zkQuorum = args(4) val sparkConf = new SparkConf().setAppName("Kafka-Offset-Management-Blog") .setMaster("local[4]") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong)) val topics = topicsSet.toArray val kafkaParams = Map[String, Object]( "bootstrap.servers" -> bootstrapServers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> consumerGroupID, "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) //禁用自动提交Offset,否则可能没正常消费完就提交了,造成数据错误 ) lazy val kafkaManager = new KafkaManager(zkQuorum , kafkaParams) val inputDStream: InputDStream[ConsumerRecord[String, String]] = kafkaManager.createDirectStream(ssc , topics) inputDStream.foreachRDD(rdd => { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetRanges.foreach( offset => println(offset.topic, offset.partition, offset.fromOffset,offset.untilOffset) ) kafkaManager.persistOffsets(rdd) }) ssc.start() ssc.awaitTermination() } }