Purgatory是Kafka server中处理请求时使用的一个重要的数据结构。正好研究ReplicaManager源码的时候发现了这篇文章,顺便翻译下。由于这个proposal里的很多东西需要看源码才能理解得比较清楚,但是代码还是比较多的,所以先大概讲一下其中的一些概念和原理,以便于阅读接下来的文章。
1. purgatory是用于缓存一些 delayed request的。这些请求因为一些条件得不到满足,所以需要先放到purgatory里,等到条件满足了,再从里边移出来。
2. 这些request得到满足的条件分成两种:(1)它需要业务类型的条件,比如fetch的最少byte数目等 (2)超时时限。这两个条件需要对应两个不同类型的缓存,第一个缓存是用一个hashmap实现的,key就是条件,value就是等待这个条件的所有请求的列表(就是文章中的watcher list,每个在等待这个key的请求就是一个watcher);第二个缓存是一个计时器,当request超时以后,它会主动complete这个请求。
3. 第一个hashmap里的key与request是多对多的关系,所以通过一个key找到一个request, 然后complete这个request以后,可以把这个request从对应这个key的watcher list里移除。但是这并不代表这个request就不在第一个缓存里了,因为它可能还在其它key的wather list里,而遍历所有wathers lists是一个开销很大的操作,所以不能每次移除一个元素,都要对这个hashmap检测一遍。所以,需要周期性地清理这个hashmap,就是下面文章中提到的purge操作。0.8.x里的实现是根据当前watcher list总的大小来确定啥时候该purge,但是这个大小并不代表了第一个缓存中的请求的数量,更不代表已实成的请求的数量。而实际应该purge的是已完成的请求的数量。旧的方案对这个问题的处理很不好,所以耗费了很多CPU,也限制了purgatory的吞吐量。新的方案部分解决了这个问题,至少比0.8.x的好很多。
4. 第二个缓存,即超时队列里的元素即使被删除了,也不能直接找到第一个缓存里的对应条目进行删除。所以已经过期的请求也不能及时被从第一个缓存里移除,这也加到对一个缓存清理的必要性。
5. 0.8.x的计时器的实现是用了一个java.util.concurrent.DelayQueue,把每个request做成一个DelayItem放进去。java的DelayQueue的实现是用的一个优先级队列,这个队列的入队和删除的时间复杂度是O(logn)。所以,如果DelayQueue很大,那么每次入队和删除的开销都会比较高。而新的实现通过一个timing wheel和基于双端链表的桶的实现,把插入和删除请求到计时器的操作的时间复杂度降到了O(1),这也降低了对CPU的使用。
Purgatory Redesign Proposal
Kafka implements several request types that cannot immediately be answered with a response. Examples:
- A produce request with acks=all cannot be considered complete until all replicas have acknowledged the write and we can guarantee it will not be lost if the leader fails.
- A fetch request with min.bytes=1 won't be answered until there is at least one new byte of data for the consumer to consume. This allows a "long poll" so that the consumer need not busy wait checking for new data to arrive.
Kafka实现了好几种不能被立即响应的请求类型, 比如:
- 一个ack=all的produce request在所有副本都确认写入之前是不能被认为已经完成了的,因为我们不能保证如果leader挂掉的话它不会丢失。
- 一个min.bytes=1的fetch request在至少有1bytes的数据可以被消费者消费之前,是不能给出回应的。这使得“长时抓取”可以实现,这样consumer就不用频率检查是否有新的数据到来。
These requests are considered complete when either (a) the criteria they requested is complete or (b) some timeout occurs.
We intend to expand the use of this delayed request facility for various additional purposes including partition assignment and potentially quota enforcement.
The number of these asynchronous operations in flight at any time scales with the number of connections, which for Kafka is often tens of thousands.
A naive implementation of these would simply block the thread on the criteria, but this would not scale to the high number of in flight requests Kafka has.
我们准备把delayed request库用于其它的一些目的,比如分区分配(partition assignment)以及可能用于配额控制(quota enforcement)功能。
对于这种问题的一个简单的实现方案是把线程阻塞在请求完成的条件上,但是对于Kafka这种拥有非常多的请求(指前边提到的这种delayed request)的情况,这种解决方案不具有扩展性。
The current approach uses a data structure called the "request purgatory". The purgatory holds any request that hasn't yet met its criteria to succeed but also hasn't yet resulted in an error. This structure holds onto these uncompleted requests and allows non-blocking event-based generation of the responses. This approach is obviously better than having a thread per in-flight request but our implementation of the data structure that accomplishes this has a number of deficiencies. The goal of this proposal is to improve the efficiency of this data structure.
当前Kafka的做法是使用一个叫做“request purgatory"的数据结构。这个purgatory持有还没有达到完成条件但也没有发生错误的请求。这个数据结构持有这些未完成的请求,并且允许以"非阻塞"的"事件驱动"的方式生成响应。这种做法很明显比为每个正在等待的请求创建一个线程好得多,但是我们对于这个数据结构的实现有一些缺陷。这个提议(proposal)的目的。
Current Design
The request purgatory consists of a timeout timer and a hash map of watcher lists for event driven processing. A request is put into a purgatory when it is not immediately satisfiable because of unmet conditions. A request in the purgatory is completed later when the conditions are met or is forced to be completed (timeout) when it passed beyond the time specified in the timeout parameter of the request. Currently (0.8.x) it uses Java DelayQueue to implement the timer.
当前的request purgatory 包括一个超时计时器以及一个以watchers列表为value的哈希表,这个哈希表用于事件驱动的处理。当一个请求不能立即满足时,它就被放到一个purgatory。对于一个在purgatory中的请求,当它的需求被满足或者它因为超过了这个请求中指定的超时时限而被强制完成的时候,它就会被完成。当前的版本(0.8.x)使用一个Java的DelayedQueue来实现这个计时器。
When a request is completed, the request is not deleted from the timer or watcher lists immediately. Instead, completed requests are deleted as they were found during condition checking. When the deletion does not keep up, the server may exhaust JVM heap and cause OutOfMemoryError. To alleviate the situation, the reaper thread purges completed requests from the purgatory when the number of requests in the purgatory (including both pending or completed requests) exceeds the configured number. The purge operation scans the timer queue and all watcher lists to find completed requests and deletes them.
By setting this configuration parameter low, the server can virtually avoid the memory problem. However, the server must pay a significant performance penalty if it scans all lists too frequently.
New Design
The goal of the new design is to allow immediate deletion of a completed request and reduce the load of expensive purge process significantly. It requires cross referencing of entries in the timer and the requests. Also it is strongly desired to have O(1) insert/delete cost since insert/delete operation happens for each request/completion.
To satisfy these requirements, we propose a new purgatory implementation based on Hierarchical Timing Wheels.
新设计的目标是允许把已完成的任务立即删除,并且显著减轻清理线程的负载。这需要对计时器的条目(entries in the timer)和请求进行交叉引用。并且,对于插入和删除的复杂度为O(1)存在着强烈的需求,因为对于生个请求/完成都会有插入/删除的操作。
为了实现上面的要求,我们提议一个基于Hierarchical Timing Wheels 的新purgatory的实现。
Hierarchical Timing Wheel
Doubly Linked List for Buckets in Timing Wheels
(译注:意思是Timing Wheels中的桶是用双端链表实现的)
Driving Clock using DelayQueue
A simple implementation may use a thread that wakes up every unit time and do the ticking, which checks if there is any task in the bucket. This can be wasteful if requests are sparse. We want the thread to wake up only when when there is a non-empty bucket to expire. We will do so by using java.util.concurrent.DelayQueue similarly to the current implementation, but we will enqueue task buckets instead of individual tasks. This design has a performance advantage. The number of items in DelayQueue is capped by the number of buckets, which is usually much smaller than the number of tasks, thus the number of offer/poll operations to the priority queue inside DelayQueue will be significantly smaller.
Purge of Watcher Lists
In the current implementation, the purge operation of watcher lists is triggered by the total size if the watcher lists. The problem is that the watcher lists may exceed the threshold even when there isn't many requests to purge. When this happens it increases the CPU load a lot. Ideally, the purge operation should be triggered by the number of completed requests the watcher lists.
在当前的实现中,对于watchers list的清理是被watchers list的大小触发。问题是,即使没有什么任务需要清理,watcher list的大小也可能会超过这个阀值。当这种情况发生,CPU负载就会增加很多。理想的情况是,清理操作是被watchers list中已经完成的请求的数目触发。
In the new design, a completed request is removed from the timer queue immediately with O(1) cost. It means that the number of requests in the timer queue is the number of pending requests exactly at any time. So, if we know the total number of distinct requests in the purgatory, which includes the sum of the number of pending request and the numbers completed but still watched requests, we can avoid unnecessary purge operations. It is not trivial to keep track of the exact number of distinct requests in the purgatory because a request may or my not be watched. In the new design, we estimate the total number of requests in the purgatory rather than trying to maintain the exactly number.
在新的设计中,一个已经完成的请求会被以O(1)的开销从计时器队列(timer queue)中被删除。这意味着计时器队列的请求的数目在任何时间点就是在等待的请求的数目。因此,如果我们知道这个purgatory中的不同请求类型请求的总数,也就是所有在等待的请求的总数以及虽然已经完成了但还在watchers lists里的请求数目,我们就可以避免没必要的清理操作。追踪purgatory中不同请求的确切数目不是一个简单的事,因为一个请求可能被watch,也可能没有。在这个新设计中,我们对purgatory中的请求的总数进行估计而不是试图维护一个确切的值。
The estimated number of requests are maintained as follows. The estimated total number of requests, E, is incremented whenever a new request is watched. Before starting the purge operation, we reset the estimated total number of requests to the size of timer queue. If no requests are added to the purgatory during purge, E is the correct number of requests after purge. If some requests are added to the purgatory during purge, E is incremented to E + the number of newly watched requests. This may be an overestimation because it is possible that some of the new requests are completed and remove from the watcher lists during the purge operation. We expect the chance of overestimation and an amount of overestimation are small.
请求总数的估计值被以下面的方式维护。请求总数的估计值, E ,每当一个新的请求被watch就会加1. 在开始清理操作之前,我们把请求总数的估计值重置为timer queue的大小。如果在清理过程中没有新的请求被加到purgatory,E就是清理之后留下来的消息的总数。如果在清理过程中有新的请求被加到了purgatory, E就增加到了E + 新被watch的请求数量。这可能会是一个高估了的值因为在清理操作中可能会有新的请求被完成并且从watcher list里移除。我们希望高估的概率以及被高估的数目会比较小。
- the tick size (the minimum time unit)
- the wheel size (the number of buckets per wheel)
- 一格的大小(也就是最小的时间单位)
- 轮的大小(每个轮的桶的数量)
We compared the enqueue performance of two purgatory implementation, the current implementation and the proposed new implementation. This is a micro benchmark. It measures the purgatory enqueue performance. The purgatory was separated from the rest of the system and also uses a fake request which does nothing useful. So, the throughput of the purgatory in a real system may be much lower than the number shown by the test.
我们比较了这两种purgatory实现的入队列(enqueue)性能,当前的实现和被提议的新的实现。这是一个小的benchmark。它度量的purgatory的入队列性能。purgatory被从系统的其它部分剥离出来,并且使用了一个捏造的请求(fake request), 这个请求啥都不做。所以实际系统中这个purgatory的吞吐量会比benchmark里显示的值低很多。
In the test, the intervals of the requests are assumed to follow the exponential distribution. Each request takes a time drawn from a log-normal distribution. By adjusting the shape of the log-normal distribution, we can test different timeout rate.
在测试里,请求的间隔被推测为符合指数分布(follow the exponential distribution). 每个请求的时间(译注:这里应该是完成请求花费的时间,也就是从进入purgatory到complete的时间)取自一个对数正态分布(log-normal distribution)。通过调整这个对数正态分布的形状,我们可以测试不同的超时比率(timeout rate)。
The tick size is 1ms and the wheel size is 20. The timeout was set to 200ms. The data size of a request was 100 bytes. For a low timeout rate case, we chose 75percentile = 60ms and 50percentile = 20. And for a high timeout rate case, we chose 75percentile = 400ms and 50percentile = 200ms. Total 1 million requests are enqueued in each run.
一个格(tick size)是一毫秒,轮的大小是20。超时时间是200ms。每个请求的数据大小是100字节。对于低超时比率的情况,我们选择百分位数为75的请求的完成时间是60ms, 百分位数50的完成时间是20ms(we chose 75percentile = 60ms and 50percentile = 20)。对于高超时比率的情况,我们选择75percentile = 400ms以及50percentile = 200ms。每一轮中总共有100万个请求被加入队列。
Requests are actively completed by a separate thread. Requests that are supposed to be completed before timeout are enqueued to another DelayQueue. And a separate thread keeps polling and completes them. There is no guarantee of accuracy in terms of actual completion time.
请求被不断的用另一个线程完成。应该在超时之前完成的请求被加入到另一个DelayQueue, 一个单独的线程不断地从这个队列里poll请求并且完成它们。并没有对请求实际完成的时间有准确地保证。(译注:这一段是讲在benchmark中是怎么样完成(complete)这些请求的, 即用不会timeout的请求被放到一个DelayQueue里,然后有一个程线不停地从里边拉取请求,然后完成它们。但是前边讲过DelayQueue的poll的时间复杂度为O(logn),所以这种方式本身会不会增加cpu load呢?尤其考虑到实际complete请求的时候,请求是从hashmap里获取的,时间复杂度要低很多。)
The JVM heap size is set to 200m to reproduce a memory tight situation.
The result shows a dramatic difference in a high enqueue rate area. As the target rate increases, both implementations keep up with the requests initially. However, in low timeout scenario the old implementation was saturated around 40000 RPS (request per second), whereas the proposed implementation didn't show any significant performance degradation, and in high timeout scenario the old implementation was saturated around 25000 RPS, whereas the proposed implementation was saturated 105000 RPS in this benchmark.
CPU usage is significantly better in the new implementation.
Finally, we measured total GC time (milliseconds) for ParNew collection and CMS collection. There isn't much difference in the old implementation and the new implementation in the region of enqueue rate that the old implementation can sustain.
最后,我们测量了用ParNew收集器和CMS收集器时的GC时间(译注:新生代用ParNew, 老年代用CMS)。在旧的实现可以承受的入队列速度的情况下,两种实现并没有什么区别。
In the new design, we use Hierarchical Timing Wheels for the timeout timer and DelayQueue of timer buckets to advance the clock on demand. Completed requests are removed from the timer queue immediately with O(1) cost. The buckets remain in the delay queue, however, the number of buckets is bounded. And, in a healthy system, most of the requests are satisfied before timeout, and many of the buckets become empty before pulled out of the delay queue. Thus, the timer should rarely have the buckets of the lower interval. The advantage of this design is that the number of requests in the timer queue is the number of pending requests exactly at any time. This allows us to estimate the number of requests need to be purged. We can avoid unnecessary purge operation of the watcher lists. As the result we achieve a higher scalability in terms of request rate with much better CPU usage.