1.HighLevelApi
High Level Api是多线程的应用程序,以Topic的Partition数量为中心。消费的规则如下:
- 一个partition只能被同一个ConsumersGroup的一个线程所消费.
- 线程数小于partition数,某些线程会消费多个partition.
- 线程数等于partition数,一个线程正好消费一个线程.
- 当添加消费者线程时,会触发rebalance,partition的分配发送变化.
- 同一个partition的offset保证消费有序,不同的partition消费不保证顺序.
关于与ZK的几个参数意思解释
- zookeeper.connect: ZK连接。
- group.id: Consumer消费ID。
- zookeeper.session.timeout.ms: kafka节点与ZK会话的超时时间。
- zookeeper.sync.time.ms: zk的follower与leader的同步时间间隔。
- auto.commit.interval.ms: Consumer offset自动提交给Zookeeper的时间。
Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.(由于记录Offset是基于时间的,所以当Consumer发生错误的时候,有可能会收到重复的消息。)
消费者的代码
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(threads));
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(threads);
for (KafkaStream<byte[], byte[]> stream : streams) {
executor.submit(new KafkaConsumerWorker(stream));
}
2.ZookeeperConsumerConnector
一个Consumer会创建一个ZookeeperConsumerConnector,代表一个消费者进程.
- fetcher: 消费者获取数据, 使用ConsumerFetcherManager fetcher线程抓取数据
- zkUtils: 消费者要和ZK通信, 除了注册自己,还有其他信息也会写到ZK中
- topicThreadIdAndQueues: 消费者会指定自己消费哪些topic,并指定线程数, 所以topicThreadId都对应一个队列
- messageStreamCreated: 消费者会创建消息流, 每个队列都对应一个消息流
- offsetsChannel: offset可以存储在ZK或者kafka中,如果存在kafka里,像其他请求一样,需要和Broker通信
- 还有其他几个Listener监听器,分别用于topicPartition的更新,负载均衡,消费者重新负载等
当Broker挂掉的时候,在这个Broker上的所有Partition都丢失了,而Partition是给消费者服务的.
所以Broker挂掉后在做迁移的时候,会将其上的Partition转移到其他Broker上,因此消费者要消费的Partition也跟着变化.
2.1 init
在创建ZookeeperConsumerConnector时,有几个初始化方法需要事先执行.
- 消费者要和ZK通信,所以connectZk会确保连接上ZooKeeper
- 消费者要消费数据,需要有抓取线程,所有的抓取线程交给ConsumerFetcherManager统一管理
- 由消费者客户端自己保存offset,而消费者会消费多个topic的多个partition.
- 多个partition的offset管理类OffsetManager是一个GroupCoordinator
- 定时提交线程会使用OffsetManager建立的通道定时提交offset到zk或者kafka.
2.2 createMessageStreams
ConsumerConnector创建消息流,需要指定解码器,因为要将日志反序列化(生产者写消息时对消息序列化到日志文件).
在kafka的运行过程中,会有其他的线程将数据放入partition对应的queue中. 而queue是用于KafkaStream的.
一旦数据添加到queue后,KafkaStream的阻塞队列就有数据了,消费者就可以从队列中消费消息.
-
createMessageStreams
: 返回KafkaStream, 每个Topic都对应了多个KafkaStream. 数量和topicCount中的count一样.
例子解释
假设消费者C1声明了topic1:2, topic2:3. topicThreadIds=consumerThreadIdsPerTopicMap.
topicThreadIds.values = [ (C1_1,C1_2), (C1_1,C1_2,C1_3)]一共有5个线程,queuesAndStreams也有5个元素.
consumerThreadIdsPerTopicMap = {
topic1: [C1_1, C1_2],
topic2: [C1_1, C1_2, C1_3]
}
topicThreadIds.values = [
[C1_1, C1_2],
[C1_1, C1_2, C1_3]
]
threadIdSet循环[C1_1, C1_2]时, 生成两个queue->stream pair.
threadIdSet循环[C1_1, C1_2, C1_3]时, 生成三个queue->stream pair.
queuesAndStreams = [
(LinkedBlockingQueue_1,KafkaStream_1), //topic1:C1_1
(LinkedBlockingQueue_2,KafkaStream_2), //topic1:C1_2
(LinkedBlockingQueue_3,KafkaStream_3), //topic2:C1_1
(LinkedBlockingQueue_4,KafkaStream_4), //topic2:C1_2
(LinkedBlockingQueue_5,KafkaStream_5), //topic2:C1_3
]
- 客户端关注的是我的每个线程都对应了一个队列,每个队列都是一个消息流就可以了.
- 客户端的每个线程实际上是针对Partition级别的,一个线程对应一个或多个partition。
2.3 registerConsumerInZK
消费者需要向ZK注册一个临时节点,路径为:/consumers/[group_id]/ids/[consumer_id],内容为订阅的topic.
问题:什么时候这个节点会被删除掉呢? Consumer进程挂掉时,或者Session失效时删除临时节点. 重连时会重新创建.
由于是临时节点,一旦创建节点的这个进程挂掉了,临时节点就会自动被删除掉. 这是由zk机制决定的,不是由消费者完成的.
2.4 reinitializeConsumer listener
当前Consumer在ZK注册之后,需要重新初始化Consumer.对于全新的消费者,注册多个监听器,在zk的对应节点的注册事件发生时,会回调监听器的方法.
- 将topic对应的消费者线程id及对应的LinkedBlockingQueue放入topicThreadIdAndQueues中,LinkedBlockingQueue是真正存放数据的queue
- 注册sessionExpirationListener,监听状态变化事件.在session失效重新创建session时调用
- 向/consumers/[group_id]/ids注册Child变更事件的loadBalancerListener,当消费组下的消费者发生变化时调用
- 向/brokers/topics/[topic]注册Data变更事件的topicPartitionChangeListener,在topic数据发生变化时调用
- 显式调用loadBalancerListener.syncedRebalance(), 会调用reblance方法进行consumer的初始化工作
private def reinitializeConsumer[K,V](topicCount: TopicCount,
queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
val dirs = new ZKGroupDirs(config.groupId)
// ② listener to consumer and partition changes
if (loadBalancerListener == null) {
val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString,
topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
}
// ① create listener for session expired event if not exist yet
if (sessionExpirationListener == null) sessionExpirationListener =
new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener)
// ③ create listener for topic partition change event if not exist yet
if (topicPartitionChangeListener == null)
topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
// listener to consumer and partition changes
zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
// register on broker partition path changes.
topicStreamsMap.foreach { topicAndStreams =>
zkUtils.zkClient.subscribeDataChanges(BrokerTopicsPath+"/"+topicAndStreams._1, topicPartitionChangeListener)
}
// explicitly trigger load balancing for this consumer
loadBalancerListener.syncedRebalance()
}
ZKRebalancerListener传入ZKSessionExpireListener和ZKTopicPartitionChangeListener.它们都会使用ZKRebalancerListener完成自己的工作.
2.5 ZKSessionExpireListener
当Session失效时,新的会话建立时,立即进行rebalance操作.
2.6 ZKTopicPartitionChangeListener
当topic的数据变化时,通过触发的方式启动rebalance操作.
2.7 ZKRebalancerListener watcher
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
extends IZkChildListener {
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
override def run() {
var doRebalance = false
while (!isShuttingDown.get) {
lock.lock()
try {
// 如果isWatcherTriggered=false,则不会触发syncedRebalance. 等待1秒后,继续判断
if (!isWatcherTriggered)
cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
} finally {
// 不管isWatcherTriggered值是多少,在每次循环时,都会执行. 如果isWatcherTriggered=true,则会执行syncedRebalance
doRebalance = isWatcherTriggered
// 重新设置isWatcherTriggered=false, 因为其他线程触发一次后就失效了,想要再次触发,必须再次设置isWatcherTriggered=true
isWatcherTriggered = false
lock.unlock()
}
if (doRebalance) syncedRebalance // 只有每次rebalanceEventTriggered时,才会调用一次syncedRebalance
}
}
}
watcherExecutorThread.start()
// 触发rebalance开始进行, 修改isWatcherTriggered标志位,触发cond条件运行
def rebalanceEventTriggered() {
inLock(lock) {
isWatcherTriggered = true
cond.signalAll()
}
}
3.ZKRebalancerListener rebalance
因为消费者加入/退出时,消费组的成员会发生变化,而消费组中的所有存活消费者负责消费可用的partitions.
可用的partitions或者消费组中的消费者成员一旦发生变化,都要重新分配partition给存活的消费者.下面是一个示例.
当然分配partition的工作绝不仅仅是这么简单的,还要处理与之相关的线程,并重建必要的数据:
- 关闭数据抓取线程,获取之前为topic设置的存放数据的queue并清空该queue
- 释放partition的ownership,删除partition和consumer的对应关系
- 为各个partition重新分配threadid
获取partition最新的offset并重新初始化新的PartitionTopicInfo(queue存放数据,两个offset为partition最新的offset) - 重新将partition对应的新的consumer信息写入zookeeper
- 重新创建partition的fetcher线程
private def rebalance(cluster: Cluster): Boolean = {
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString,
zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
val brokers = zkUtils.getAllBrokersInCluster()
if (brokers.size == 0) {
zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
true
} else {
// ① 停止fetcher线程防止数据重复.如果当前调整失败了,被释放的partitions可能被其他消费者拥有.
// 而没有先停止fetcher的话,原先的消费者仍然会和新的拥有者共同消费同一份数据.
closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
// ② 释放topicRegistry中topic-partition的owner
releasePartitionOwnership(topicRegistry)
// ③ 为partition重新分配消费者....
// ④ 为partition添加consumer owner
if(reflectPartitionOwnershipDecision(partitionAssignment)) {
allTopicsOwnedPartitionsCount = partitionAssignment.size
topicRegistry = currentTopicRegistry
// ⑤ 创建拉取线程
updateFetcher(cluster)
true
}
}
}
rebalance操作涉及了以下内容:
- PartitionOwnership: Partition的所有者(ownership)的删除和重建
- AssignmentContext: 分配信息上下文
- PartitionAssignor: 为Partition分配Consumer的算法
- PartitionAssignment: Partition分配之后的上下文
- PartitionTopicInfo: Partition的最终信息
- Fetcher: 完成了rebalance,消费者就可以重新开始抓取数据
3.1 核心:PartitionAssignor
将可用的partitions以及消费者线程排序, 将partitions处于线程数,表示每个线程(不是消费者数量)平均可以分到几个partition.
如果除不尽,剩余的会分给前面几个消费者线程. 比如有两个消费者,每个都是两个线程,一共有5个可用的partitions: (p0-p4).
每个消费者线程(一共四个线程)可以获取到至少一共partition(5/4=1),剩余一个(5%4=1)partition分给第一个线程.
最后的分配结果为: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1
关闭Fetcher时要注意:
- 先提交offset,然后才停止消费者. 因为在停止消费者的时候当前的数据块中还会有点残留数据.
- 因为这时候还没有释放partiton的ownership(即partition还归当前consumer所有),强制提交offset,
这样拥有这个partition的下一个消费者线程(rebalance后),就可以使用已经提交的offset了,确保不中断. - 因为fetcher线程已经关闭了(stopConnections),这是消费者能得到的最后一个数据块,以后不会有了,直到平衡结束,fetcher重新开始
topicThreadIdAndQueues来自于topicThreadIds,所以它的topic应该都在relevantTopicThreadIdsMap的topics中.
为什么还要过滤呢? 注释中说到在本次平衡之后,只需要清理可能不再属于这个消费者的队列(部分的topicPartition抓取队列).问题:新创建的ZKRebalancerListener中kafkaMessageAndMetadataStreams(即这里的messageStreams)为空的Map.
如何清空里面的数据? 实际上KafkaStream只是一个迭代器,在运行过程中会有数据放入到这个流中,这样流就有数据了.
4.ConsumerFetcherManager
Fetcher线程要抓取数据关心的是PartitionTopicInfo,首先要找出Partition Leader(因为只向Leader Partition发起抓取请求).
初始时假设所有topicInfos(PartitionTopicInfo)都找不到Leader,即同时加入partitionMap和noLeaderPartitionSet.
在LeaderFinderThread线程中如果找到Leader,则从noLeaderPartitionSet中移除.
ConsumerFetcherManager管理了当前Consumer的所有Fetcher线程.
5.小结
high level的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。
每个Consumer被创建时会触发Consumer Group的Rebalance,具体的启动流程是:
- (High Level)Consumer启动时将其ID注册到其Consumer Group下 (registerConsumerInZK)
- 在/consumers/[group_id]/ids上和/brokers/ids上分别注册Watch (reinitializeConsumer->Listener)
- 强制自己在其Consumer Group内启动Rebalance流程 (ZKRebalancerListener.rebalance)
在这种策略下,每一个Consumer或者Broker的增加或者减少都会触发Consumer Rebalance。
因为每个Consumer只负责调整自己所消费的Partition,为了保证整个Consumer Group的一致性,
当一个Consumer触发了Rebalance时,该Consumer Group内的其它所有其它Consumer也应该同时触发Rebalance。
该方式有如下缺陷:
- Herd effect(羊群效应): 任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance
- Split Brain(脑裂): 每个Consumer分别单独通过Zookeeper判断哪些Broker和Consumer 宕机了,
那么不同Consumer在同一时刻从Zookeeper“看”到的View就可能不一样,这是由Zookeeper的特性决定的,这就会造成不正确的Reblance尝试。 - 调整结果不可控: 所有的Consumer都并不知道其它Consumer的Rebalance是否成功,这可能会导致Kafka工作在一个不正确的状态。
根据Kafka官方文档,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(coordinator)。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功.