We have put significant effort into efficiency. One of our primary use cases is handling web activity data, which is very high volume: each page view may generate dozens of writes. Furthermore we assume each message published is read by at least one consumer (often many), hence we strive to make consumption as cheap as possible.
我们已经投入了很多精力到效率中。我们主要使用案例之一处理 web 活动数据,这是非常高的容量: 每个页面视图模式下可以产生几十个写入操作。此外,我们假设每个发布的消息至少一名消费者 (通常很多),因此我们努力使消费尽可能的廉价。
We have also found, from experience building and running a number of similar systems, that efficiency is a key to effective multi-tenant operations. If the downstream infrastructure service can easily become a bottleneck due to a small bump in usage by the application, such small changes will often create problems. By being very fast we help ensure that the application will tip-over under load before the infrastructure. This is particularly important when trying to run a centralized service that supports dozens or hundreds of applications on a centralized cluster as changes in usage patterns are a near-daily occurrence.
我们还从建设和运行多个类似的系统中发现,有效率的多租户操作是效率的关键。 如果底层的基础架构服务由于应用程序少量的效率慢的代码而容易成为瓶颈,这种通常会产生问题。 我们非常快速度的处理以确保应用程序在基础设施之前tip-over,从而使得低于负载。 当在集中式集群中运行支持数十个或数百个应用程序的集中式服务时,这一点就会特别重要,因为使用模式的变化几乎每天都会发生。
We discussed disk efficiency in the previous section. Once poor disk access patterns have been eliminated, there are two common causes of inefficiency in this type of system: too many small I/O operations, and excessive byte copying.
我们在上一节中讨论的磁盘的效率。一旦穷磁盘访问模式被淘汰,这种类型的系统有两个低效率的常见原因:大量的小I/O操作,和过度的字节复制。
The small I/O problem happens both between the client and the server and in the server’s own persistent operations.
client和server之间,服务器自己的持久化操作中,就会发生小I/O问题。
To avoid this, our protocol is built around a “message set” abstraction that naturally groups messages together. This allows network requests to group messages together and amortize the overhead of the network roundtrip rather than sending a single message at a time. The server in turn appends chunks of messages to its log in one go, and the consumer fetches large linear chunks at a time.
为了避免这种情况,我们把这些消息集合在一起,这样减少网络请求的往返,而不是一次发送单个消息。
This simple optimization produces orders of magnitude speed up. Batching leads to larger network packets, larger sequential disk operations, contiguous memory blocks, and so on, all of which allows Kafka to turn a bursty stream of random message writes into linear writes that flow to the consumers.
这种简单的优化产生了数量级的加速。 批处理导致更大的网络数据包,更大的顺序磁盘操作,连续的内存块等,这些都允许Kafka将随机消息写入的突发流转换成流向消费者的线性写入。
The other inefficiency is in byte copying. At low message rates this is not an issue, but under load the impact is significant. To avoid this we employ a standardized binary message format that is shared by the producer, the broker, and the consumer (so data chunks can be transferred without modification between them).
另一个是无效率是字节复制。 在低速率下,这不是一个问题,但负载的情况下影响是显着的。 为了避免这种情况,我们采用由生产者,经纪人和消费者共享的标准化二进制消息格式(样数据块就可以在它们之间*传输,无需转换)。
The message log maintained by the broker is itself just a directory of files, each populated by a sequence of message sets that have been written to disk in the same format used by the producer and consumer. Maintaining this common format allows optimization of the most important operation: network transfer of persistent log chunks. Modern unix operating systems offer a highly optimized code path for transferring data out of pagecache to a socket; in Linux this is done with the sendfile system call.
broker维护的消息日志本身就是文件的目录,每个文件都是由生产者和消费者使用相同的格式写入磁盘的。维护这个公共的格式并允许优化最重要的操作:网络传输持久性日志块。 现代的unix操作系统提供一个优化的代码路径,用于将数据从页缓存传输到socket;在Linux中,是通过sendfile系统调用来完成的。Java提供了访问这个系统调用的方法:FileChannel.transferTo api。
To understand the impact of sendfile, it is important to understand the common data path for transfer of data from file to socket:
要理解senfile的影响,重要的是要了解将数据从文件传输到socket的公共数据路径:
The operating system reads data from the disk into pagecache in kernel space
操作系统将数据从磁盘读入到内核空间的页缓存
The application reads the data from kernel space into a user-space buffer
应用程序将数据从内核空间读入到用户空间缓存中
The application writes the data back into kernel space into a socket buffer
应用程序将数据写回到内核空间到socket缓存中
The operating system copies the data from the socket buffer to the NIC buffer where it is sent over the network
操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出
This is clearly inefficient, there are four copies and two system calls. Using sendfile, this re-copying is avoided by allowing the OS to send the data from pagecache to the network directly. So in this optimized path, only the final copy to the NIC buffer is needed.
这样做明显是低效的,这里有四次拷贝,两次系统调用。如果使用sendfile,再次拷贝可以被避免:允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。
We expect a common use case to be multiple consumers on a topic. Using the zero-copy optimization above, data is copied into pagecache exactly once and reused on each consumption instead of being stored in memory and copied out to kernel space every time it is read. This allows messages to be consumed at a rate that approaches the limit of the network connection.
我们假设一个topic有多个消费者的情况。 并使用上面的零拷贝优化,数据被复制到页缓存中一次,并在每个消费上重复使用,而不是存储在存储器中,也不在每次读取时复制到用户空间。 这使得以接近网络连接限制的速度消费消息。
This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache.
这种页缓存和sendfile组合,意味着Kafka集群的消费者大多数都完全从缓存消费消息,而磁盘没有任何读取活动。
For more background on the sendfile and zero-copy support in Java, see this article.
有关在Java中Sendfile和zero-copy(零拷贝)的支持更多的背景,请参阅本文。
端到端的批量压缩
In some cases the bottleneck is actually not CPU or disk but network bandwidth. This is particularly true for a data pipeline that needs to send messages between data centers over a wide-area network. Of course the user can always compress its messages one at a time without any support needed from Kafka, but this can lead to very poor compression ratios as much of the redundancy is due to repetition between messages of the same type (e.g. field names in JSON or user agents in web logs or common string values). Efficient compression requires compressing multiple messages together rather than compressing each message individually.
在某些情况下,瓶颈实际上不是CPU或磁盘,而是网络带宽。 对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。 当然,用户可以一次压缩其消息,而无需Kafka所需的任何支持,但这可能导致非常差的压缩比,因为冗余的很多是由于相同类型的消息之间的重复(例如, Web日志中的JSON或用户代理或通用字符串值)。 有效的压缩需要压缩多个消息,而不是单独压缩每个消息。
Kafka supports this by allowing recursive message sets. A batch of messages can be clumped together compressed and sent to the server in this form. This batch of messages will be written in compressed form and will remain compressed in the log and will only be decompressed by the consumer.
Kafka通过递归消息集来支持这一点。 一批消息可以一起压缩并以此形式发送到服务器。 这批消息将以压缩形式写入,并将在日志中保持压缩,并且只能由消费者解压缩。
Kafka supports GZIP and Snappy compression protocols. More details on compression can be found here.