参考:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
http://kafka.apache.org/protocol.html
- 网络
- 分区和自举
- 分区策略
- 配料
- 版本控制和兼容性
- 议定书
- API
- 元数据API
- 生产API
- 生产请求
- 产生回应
- 可能的错误代码:(TODO)
- 获取API
- 组协调员请求
- 小组协调员回应
- 可能的错误代码
- 偏移提交请求
- 偏移提交响应
- 可能的错误代码
- 偏移提取请求
- 偏移提取响应
- 可能的错误代码
- 加入组请求
- 加入小组回应
- 可能的错误代码:
- 同步组请求
- 同步组响应
- 可能的错误代码:
- 心跳请求
- 心跳反应
- 可能的错误代码:
- 离开组请求
- 离开组响应
- 可能的错误代码:
- 管理API
- ListGroups请求
- ListGroups响应
- 可能的错误代码:
- 描述组请求
- 描述组响应
- 可能的错误代码:
- 常量
- Api密钥和当前版本
- 错误代码
介绍
本文件涵盖了在Kafka 0.8及更高版本中实施的协议。它旨在给出一个可用的指南,涵盖可用请求,二进制格式以及使用它们来实现客户端的正确方法。本文档假设您了解此处描述的基本设计和术语。
在0.7和更早版本中使用的协议与此类似,但是我们选择做了一次(我们希望)在兼容性上打破了清理和概括的事情。
概观
Kafka协议相当简单,只有六个核心客户端请求API。
- 元数据 - 描述当前可用的代理,其主机和端口信息,并提供有关哪些代理主机哪些分区的信息。
- 发送 - 发送消息到代理
- 抓取 - 从代理获取消息,获取数据的消息,获取集群元数据的消息,以及获取有关主题的偏移量信息的消息。
- 偏移 - 获取有关给定主题分区的可用偏移量的信息。
- 偏移提交 - 为消费者组提交一组偏移量
- 偏移提取 - 为消费者组提取一组偏移量
这些将在下面详细描述。另外,从0.9开始,Kafka就支持消费者和Kafka Connect的一般群体管理。客户端API由五个请求组成:
- GroupCoordinator - 找到一个组的当前协调器。
- JoinGroup - 成为组的成员,如果没有活动成员,则创建它。
- SyncGroup - 同步组的所有成员的状态(例如,将分区分配分配给消费者)。
- 心跳 - 保持一个成员在组中活着。
- 离开组 - 直接离开一个小组。
最后,有几个管理API可用于监视/管理Kafka集群(当KIP-4完成时,该列表将增长)。
- DescribeGroups - 用于检查一组组的当前状态(例如查看消费者分区分配)。
- ListGroups - 列出由代理管理的当前组。
预赛
网络
Kafka通过TCP使用二进制协议。该协议将所有apis定义为请求响应消息对。所有邮件的大小分隔,并由以下原始类型组成。
客户端启动套接字连接,然后写入一系列请求消息并读回相应的响应消息。连接或断开连接时不需要握手。如果您维护用于许多请求的持久连接以便分摊TCP握手的成本,TCP将更加快乐,但超出这种惩罚连接是相当便宜的。
客户端可能需要维护与多个经纪人的连接,因为数据被分区,并且客户端需要与具有其数据的服务器通信。然而,通常不需要从单个客户端实例(即连接池)维护到单个代理的多个连接。
服务器保证在单个TCP连接上,请求将按照发送的顺序进行处理,响应也按照该顺序返回。经纪人的请求处理仅允许每个连接的单个飞行中请求,以保证此次订购。请注意,客户端可以(并且理想地应该)使用非阻塞IO来实现请求流水线并实现更高的吞吐量。即即使在等待对先前请求的响应时,客户端也可以发送请求,因为未完成的请求将被缓存在底层的OS套接字缓冲区中。所有请求都由客户端启动,并且除了注明之外,还会导致来自服务器的相应响应消息。
服务器具有可配置的请求大小的最大限制,超出此限制的任何请求将导致套接字断开连接。
分区和自举
Kafka是一个分区系统,所以并不是所有的服务器都有完整的数据集。而是回想一下,主题被分割成一个预先定义的分区数P,每个分区被复制一些复制因子N.主题分区本身只是编号为0,1,...,P的“提交日志”。
这种性质的所有系统都有一个特定数据分配给特定分区的问题。卡夫卡的客户直接控制这个任务,经纪人本身执行没有特别的语义哪些邮件应被发布到一个特定的分区。相反,为了发布消息,客户端直接将消息寻址到特定分区,并且在获取消息时,从特定分区获取。如果两个客户端希望使用相同的分区方案,则必须使用相同的方法来计算密钥到分区的映射。
这些发布或获取数据的请求必须发送到当前作为给定分区的领导者的代理。此条件由代理执行,因此对错误代理的特定分区的请求将导致NotLeaderForPartition错误代码(如下所述)。
客户端如何找出哪些主题存在,哪些分区,以及哪些经纪人当前托管这些分区,以便它可以将请求引导到正确的主机?此信息是动态的,因此您不能仅使用一些静态映射文件配置每个客户端。相反,所有Kafka经纪人都可以回答描述集群当前状态的元数据请求:哪些主题有哪些分区,哪些分区是哪些分区,哪些代理是这些分区的领导者,以及这些代理的主机和端口信息。
换句话说,客户端需要以某种方式找到一个代理,并且该代理将告诉客户所有其他存在的代理和他们托管的分区。这个第一个经纪人本身可能会下降,所以客户端实现的最佳做法是从两个或三个url列表中进行引导。然后,用户可以选择使用负载平衡器,或者只是在客户端中静态配置其两个或三个kafka主机。
客户端不需要继续轮询查看集群是否已更改; 当它被实例化缓存元数据时,它可以获取元数据,直到它收到指示元数据过期的错误。此错误可能有两种形式:(1)指示客户端无法与特定代理进行通信的套接字错误,(2)对请求的响应中的错误代码,指示此代理不再承载请求数据的分区。
- 循环浏览“引导”卡夫卡URL列表,直到找到我们可以连接到的。获取集群元数据。
- 处理提取或产生请求,根据他们发送或从中获取的主题/分区将它们引导到相应的代理。
- 如果我们收到适当的错误,请刷新元数据,然后重试。
分区策略
如上所述,将消息分配给分区是生产客户端控制的。也就是说,这个功能应该如何暴露给最终用户?
分区真的在卡夫卡有两个目的:
- 它平衡数据并请求负担经纪人
- 它可以作为在消费者进程之间分配处理的一种方法,同时允许本地状态和分区内的保留顺序。我们称之为语义分割。
对于给定的用例,您可能只关心这些或两者之一。
为了完成简单的负载平衡,一个简单的方法将是客户端只需轮询所有经纪人的请求。另一个替代方案是,在一个生产者比经纪人多的环境中,每个客户端都会随机选择一个分区并发布。这个后来的策略将导致远远少于TCP连接。
语义分割意味着使用消息中的一些键将消息分配给分区。例如,如果您正在处理点击消息流,则可能需要按照用户ID对流进行分区,以便特定用户的所有数据都将转到单个消费者。为了完成此操作,客户端可以获取与该消息相关联的密钥,并使用该密钥的一些散列来选择要向其发送消息的分区。
配料
我们的apis鼓励将小事情分批处理,以提高效率。我们发现这是一个非常重要的表现胜利。我们的API发送消息和我们的API来获取消息始终使用一系列消息,而不是单个消息来鼓励这一点。一个聪明的客户端可以利用这一点,并支持一个“异步”模式,其中它将每个单独发送的消息分批在一起,并将其发送到更大的分组。我们进一步了解并允许跨多个主题和分区进行批处理,因此产品请求可能包含附加到多个分区的数据,并且提取请求可能一次从多个分区中提取数据。
客户端实现者可以选择忽略它,如果他们喜欢,一次发送一个。
版本控制和兼容性
该协议旨在以向后兼容的方式实现增量演进。我们的版本是基于每个API,每个版本包含请求和响应对。每个请求都包含一个API密钥,用于标识被调用的API,以及一个版本号,指示请求的格式和响应的预期格式。
意图是客户端将实现协议的特定版本,并在其请求中指示此版本。我们的目标主要是允许在不允许停机的环境中进行API演进,客户端和服务器不能一次性更改。
服务器将拒绝具有不支持的版本的请求,并且将始终以其期望的协议格式根据其请求中包含的版本来响应客户端。预期的升级路径是首先在服务器上推出新功能(老客户端不会使用它们),然后在部署较新的客户端时,将逐渐利用这些新功能。
目前,所有版本都基于0,因为我们演变了这些API,我们将分别指出每个版本的格式。
议定书
协议原始类型
该协议是由以下原始类型构建的。
固定宽度原语
int8,int16,int32,int64 - 具有以大排序顺序存储的给定精度(以位为单位)的带符号整数。
可变长度基元
字节,字符串 - 这些类型由一个有符号整数组成,其长度为N,后跟N个字节的内容。长度-1表示无效。字符串使用int16作为其大小,字节使用int32。
数组
这是处理重复结构的符号。这些将始终编码为包含长度N的int32大小,随后是N个重复的结构,其本身可以由其他原始类型组成。在下面的BNF语法中,我们将显示一个结构foo的数组作为[foo]。
阅读请求格式语法的注意事项
下面的 BNF给出了请求和响应二进制格式的确切的上下文无关语法。对于每个API,我将一起提出请求和响应,然后是所有的子定义。为了给人可读的名字,BNF有意地不紧凑(例如我为ErrorCode定义了一个生产,即使它只是一个int16来给它一个符号名)。与BNF一样,一系列生产表示连接,因此下面给出的MetadataRequest将是一系列字节,其中包含一个VersionId,然后是ClientId,然后是一个TopicNames数组(每个都有自己的定义)。制作总是以骆驼案例和原始类型小写。当有多个可能的生产时,这些与 并且可以用括号括起来进行分组。首先定义顶层定义,缩进后续子部分。
常见请求和响应结构
所有请求和响应源自以下语法,将通过本文档的其余部分逐步描述:
领域 |
描述 |
MESSAGESIZE |
MessageSize字段以字节为单位给出后续请求或响应消息的大小。客户端可以通过首先读取4字节大小作为整数N来读取请求,然后读取和解析请求的后续N个字节。 |
要求
请求都具有以下格式:
领域 |
描述 |
ApiKey |
这是调用API的数字标识(即是元数据请求,生成请求,提取请求等)。 |
ApiVersion |
这是这个api的数字版本号。我们对每个API进行版本,并且此版本号允许服务器在协议演进时正确解释请求。响应将始终以与请求版本相对应的格式。 |
的correlationID |
这是一个用户提供的整数。它会在服务器的响应中被传回,未经修改。在客户端和服务器之间匹配请求和响应是非常有用的。 |
客户端Id |
这是客户端应用程序的用户提供的标识符。用户可以使用他们喜欢的任何标识符,并且将在记录错误,监视聚合等时使用它。例如,可能希望不仅监控整体的每秒请求,而且可以监控来自每个客户端应用程序可以驻留在多个服务器上)。该ID充当来自特定客户端的所有请求的逻辑分组。 |
下面将描述各种请求和响应消息。
回应
领域 |
描述 |
的correlationID |
服务器返回客户端提供的任何整数作为请求中的相关性。 |
响应将始终与配对请求匹配(例如,我们将发送MetadataResponse以返回到MetadataRequest)。
消息集
产品和提取请求共同的一种结构是消息集格式。kafka中的消息是具有少量关联元数据的键值对。消息集只是具有偏移量和大小信息的消息序列。这种格式恰好用于代理上的磁盘存储和线上格式。
消息集也是Kafka中的压缩单位,我们允许消息递归地包含压缩消息集以允许批量压缩。
NB,MessageSets不在协议中像其他数组元素一样的int32。
留言格式
|
|
领域 |
描述 |
抵消 |
这是在kafka中用作对数序列号的偏移量。当生产者发送非压缩消息时,它可以将偏移设置为任何值。当生产者发送压缩消息时,为了避免服务器端重新压缩,每个压缩消息应具有从0开始的偏移量,并在压缩消息中为每个内部消息增加一个消息。(有关Kafka下的压缩消息的更多详细信息) |
CRC |
CRC是消息字节的其余部分的CRC32。这用于检查代理和消费者上的消息的完整性。 |
MagicByte |
这是一个用于允许向后兼容的消息二进制格式的版本号。当前值为1。 |
属性 |
该字节保存关于消息的元数据属性。 最低的3位包含用于消息的压缩编解码器。 第四个最低位表示时间戳类型。0代表CreateTime,1代表LogAppendTime。生产者应始终将此位设置为0.(自0.10.0起) 所有其他位应设置为0。 |
时间戳 |
这是消息的时间戳。时间戳类型在属性中指示。单位是从时代开始以来的毫秒(1970年1月1日(UTC))。 |
键 |
密钥是用于分区分配的可选消息密钥。键可以为空。 |
值 |
该值是作为不透明字节数组的实际消息内容。Kafka支持递归消息,在这种情况下,这可能本身包含消息集。消息可以为null。 |
在Kafka 0.11中,“MessageSet”和“Message”的结构发生了显着变化。不仅添加了新的字段来支持新功能,如完全一次语义和记录头,而且先前版本的消息格式的递归性质被消除,有利于平面结构。“MessageSet”现在被称为“RecordBatch”,它包含一个或多个“记录”(而不是“消息”)。当启用压缩时,RecordBatch头仍保持未压缩,但记录被压缩在一起。此外,“记录”中的多个字段都是varint编码的,这为更大的批次带来了显着的空间节省。
新消息格式的Magic值为2.其结构如下:
新添加的字段的语义如下:
|
|
领域 |
描述 |
FirstOffset |
表示RecordBatch中的第一个偏移量。批次中每个记录的“offsetDelta”将相对于此FirstOffset进行计算。特别地,批次中每个记录的偏移量是其“OffsetDelta”+“FirstOffset”。 |
LastOffsetDelta |
RecordBatch中最后一条消息的偏移量。即使批处理中的记录被压缩,代理也可以使用它来确保正确的行为。 |
PartitionLeaderEpoch |
由KIP-101引入,这是经纪人在收到产品请求后设定的,用于确保在日志截断时有领导更改时不会丢失数据。客户开发人员无需担心设置此值。 |
FirstTimeStamp |
批次中第一个记录的时间戳。RecordBatch中每个记录的时间戳记是它的“TimestampDelta”+“FirstTimestamp”。 |
RecordBatch属性 |
该字节保存关于消息的元数据属性。 最低的3位包含用于消息的压缩编解码器。 第四个最低位表示时间戳类型。0代表CreateTime,1代表LogAppendTime。生产者应始终将此位设置为0.(自0.10.0起) 第五个最低位表示RecordBatch是否是事务的一部分。0表示RecordBatch不是事务性的,而1表示它是事务。(从0.11.0.0开始)。 第六个最低位表示RecordBatch是否包含控制消息。1表示RecordBatch包含控制消息,0表示没有。控制消息用于启用Kafka中的事务并由代理生成。客户端不应将控制批次(即具有该位的那些)返回给应用程序。(自0.11.0.0起) |
记录属性 |
记录级属性目前未被使用。 |
MaxTimestamp |
批次中最后一个记录的时间戳。即使批次中的记录被压缩,代理也使用这种方式来确保正确的行为。 |
ProducerId |
在0.11.0.0中介绍了KIP-98,这是由“InitProducerId”请求接收的代理指派的producerId。要支持幂等消息传递和事务的客户端必须设置此字段。 |
ProducerEpoch |
在0.11.0.0中引入KIP-98,这是由“InitProducerId”请求接收的代理分配的producerEpoch。要支持幂等消息传递和事务的客户端必须设置此字段。 |
FirstSequence |
在0.11.0.0中介绍了KIP-98,这是经纪人用于重复消息消息的生产者分配的序列号。要支持幂等消息传递和事务的客户端必须设置此字段。RecordBatch中每个Record的序列号为OffsetDelta + FirstSequence。 |
头 |
Kif -82在0.11.0.0中引入了Kafka,现在支持应用级记录级标题。相应地,生产者和消费者APIS更新了写入和读取这些标题。 |
压缩
Kafka支持压缩消息以获得更高的效率,但这比仅仅压缩原始消息要复杂得多。因为单个消息可能没有足够的冗余来实现良好的压缩比,压缩消息必须以特殊批次发送(尽管如果您真的希望自己压缩消息,则可以使用一批消息)。要发送的消息被封装(未压缩)到MessageSet结构中,然后将其压缩并存储在具有适当压缩编解码器集的单个“消息”的值字段中。接收系统从解压缩值中解析实际的MessageSet。外部消息集应该只包含一个压缩的“消息”(详见KAFKA-1718)。
Kafka目前支持两种压缩编解码器,具有以下编解码器编号:
压缩 |
编解码器 |
没有 |
0 |
GZIP |
1 |
瞬间 |
2 |
API
本节详细介绍了每个单独的API,它们的用法,二进制格式及其字段的含义。
元数据API
此API回答以下问题:
- 有哪些话题存在?
- 每个主题有多少个分区?
- 哪个经纪人目前是每个分区的领导者?
- 这些经纪人的主机和端口是什么?
这是可以解决群集中任何代理的唯一请求。
由于可能有许多主题,客户端可以提供可选的主题名称列表,以便仅返回主题子集的元数据。
返回的元数据是分区级别的,但是为了方便起见,将主题分组在一起,以避免冗余。对于每个分区,元数据包含领导者的信息,以及所有副本以及当前同步的副本列表。
注意:如果在代理配置中设置了 “ auto.create.topics.enable”,则主题元数据请求将创建具有默认复制因子和分区数的主题。
主题元数据请求
领域 |
描述 |
TopicName |
制作元数据的主题。如果空,则请求将产生所有主题的元数据。 |
元数据响应
响应包含每个分区的元数据,分区按主题分组。这个元数据是指经纪人的经纪人身份。经纪人每个都有一个主机和端口。
领域 |
描述 |
领导 |
目前担任该分区的领导者的kafka代理的节点ID。如果没有领导者存在,因为我们在领导选举中,这个id将是-1。 |
副本 |
当前作为这个分区的领导者的奴隶的一组活着节点。 |
ISR |
复制品的集合,被“追赶”给领导者 |
经纪人 |
kafka代理的节点ID,主机名和端口信息 |
可能的错误代码
生产API
产品API用于将消息集发送到服务器。为了效率,它允许在单个请求中发送旨在用于许多主题分区的消息集。
产品API使用通用消息集格式,但是由于在发送时没有将偏移量分配给消息,所以生产者可以以任何方式*填充该字段。
生产请求
在v1或以后生成请求表示客户端可以解析“生产响应”中的配额节流时间。
在v2之后产生请求表示客户端可以解析产品响应中的时间戳字段。
领域 |
描述 |
RequiredAcks |
此字段指示服务器在响应请求之前应该接收到多少确认。如果为0,服务器将不会发送任何响应(这是服务器不会回复请求的唯一情况)。如果为1,则在发送响应之前,服务器将等待数据写入本地日志。如果为-1,则在发送响应之前,服务器将阻止所有同步副本中的消息提交。 |
时间到 |
这提供了服务器可以等待接收RequiredAcks中确认数量的最大时间(以毫秒为单位)。由于以下几个原因,超时不是请求时间的精确限制:(1)它不包括网络延迟,(2)定时器在处理此请求的开始处开始,因此如果许多请求由于服务器排队过载等待时间不会被包含,(3)我们不会终止本地写入,所以如果本地写入时间超过了这个超时,那么它将不被尊重。要获得此类型的硬超时,客户端应使用套接字超时。 |
TopicName |
正在发布数据的主题。 |
划分 |
正在发布数据的分区。 |
MessageSetSize |
以下消息集的大小(以字节为单位)。 |
MessageSet |
以上述标准格式的一组消息。 |
产生回应
领域 |
描述 |
话题 |
该响应条目对应的主题。 |
划分 |
该响应条目对应的分区。 |
错误代码 |
这个分区的错误,如果有的话。由于给定的分区可能不可用或在不同的主机上维护,所以以每分区为基础给出错误,而其他主机可能已经成功地接受了生成请求。 |
抵消 |
分配给附加到该分区的消息集中的第一个消息的偏移量。 |
时间戳 |
如果LogAppendTime用于该主题,则这是代理向消息集分配的时间戳。消息集中的所有消息具有相同的时间戳。 如果使用CreateTime,则此字段始终为-1。如果没有返回错误代码,生产者可以假设生产请求中的消息的时间戳已被代理接受。 单位是从时代开始以来的毫秒(1970年1月1日(UTC))。 |
ThrottleTime |
由于配额违规,请求被限制的持续时间(以毫秒为单位)。(如果请求没有违反任何配额,则为零)。 |
可能的错误代码:(TODO)
获取API
抓取API用于获取某些主题分区的一个或多个日志块。逻辑上一个指定开始获取的主题,分区和起始偏移量,并返回一大堆消息。一般来说,返回消息将具有大于或等于起始偏移量的偏移量。然而,使用压缩消息,返回的消息可能具有小于起始偏移量的偏移量。这样的消息的数量通常很小,并且呼叫者负责过滤这些消息。
提取请求遵循一个长的轮询模型,以便如果没有立即可用的数据,它们可以被阻止一段时间。
作为优化,服务器被允许在消息集的末尾返回部分消息。客户应该处理这种情况。
需要注意的是,fetch API需要指定要使用的分区。问题是消费者应该知道哪些分区要消费?特别地,如何平衡作为组的一组消费者的分区,以便每个消费者获得分区的子集。我们使用zookeeper动态地为scala和java客户端完成了此任务。这种方法的缺点是它需要一个相当胖的客户端和动物园管理员连接。我们还没有创建一个Kafka API,以便将此功能移动到服务器端,并且访问更方便。可以通过简单地要求在配置中指定分区来实现一个简单的消费者客户端,尽管如果消费者失败,这将不允许分区的动态重新分配。
提取请求
领域 |
描述 |
ReplicaId |
副本id表示启动此请求的副本的节点ID。普通客户端消费者应始终将其指定为-1,因为它们没有节点ID。其他经纪人将此设置为自己的节点ID。允许值-2被允许非代理发出抓取请求,就像它是用于调试目的的副本代理一样。 |
MaxWaitTime |
最大等待时间是在发出请求时数据不足的情况下阻止等待的最大时间(以毫秒为单位)。 |
MinBytes |
这是提供响应必须可用的消息的最小字节数。如果客户端将其设置为0,则服务器将始终立即响应,但是如果自上次请求以来没有新数据,那么它们将返回空消息集。如果设置为1,服务器将在至少一个分区至少有1个字节的数据或指定的超时发生时立即做出响应。通过设置更高的值结合超时,消费者可以调整吞吐量并交易一点额外的延迟,只读取大块数据(例如将MaxWaitTime设置为100 ms,将MinBytes设置为64k将允许服务器等待100ms到尝试在响应之前积累64k的数据)。 |
TopicName |
主题的名称。 |
划分 |
获取的分区的id。 |
FetchOffset |
从此开始获取的偏移量。 |
MaxBytes |
要包含在此分区的消息集中的最大字节数。这有助于约束响应的大小。 |
提取响应
领域 |
描述 |
ThrottleTime |
由于配额违规,请求被限制的持续时间(以毫秒为单位)。(如果请求没有违反任何配额,则为零)。 |
TopicName |
此响应条目的主题名称。 |
划分 |
该响应的分区的id。 |
HighwaterMarkOffset |
该分区日志结束处的偏移量。客户端可以使用它来确定日志结束后面有多少个消息。 |
MessageSetSize |
为该分区设置的消息的大小(以字节为单位) |
MessageSet |
以上述格式从该分区中取出的消息数据。 |
Fetch Response v1仅包含消息格式v0。
Fetch Response v2可能包含消息格式v0或消息格式v1。
可能的错误代码
偏移API(AKA ListOffset)
该API描述了一组主题分区可用的有效偏移范围。与产品和提取API一样,请求必须定向到当前有关分区的领导者的代理。这可以使用元数据API来确定。
对于版本0,响应包含所请求分区的每个段的起始偏移量以及“日志结束偏移量”,即将附加到给定分区的下一个消息的偏移量。在0.10.1.0中最初支持的版本1中,Kafka支持使用时间索引来搜索消息中使用的时间戳的偏移量,并对此API进行了更改以支持此功能。请注意,此API仅支持已启用0.10消息格式的主题,否则将返回UNSUPPORTED_FOR_MESSAGE_FORMAT。
偏移请求
领域 |
能解密 |
时间 |
用于在一定时间(ms)之前询问所有消息。有两个特殊的值。指定-1以接收最新的偏移量(即下一个来信息的偏移量)和-2接收最早的可用偏移量。这适用于所有版本的API。请注意,由于偏移量按降序拉,要求最早的偏移将始终返回单个元素。 |
偏移响应
可能的错误代码
偏移提交/提取API
这些API允许对偏移进行集中管理。阅读更多偏移管理。根据KAFKA-993的评论,这些API调用在Kafka 0.8.1.1之前的版本中并不完全正常。它将在0.8.2版本中提供。
组协调员请求
给定消费者组的偏移由称为组协调器的特定代理维护。即,消费者需要向该特定代理发出其偏移提交和提取请求。它可以通过发出组协调器请求来发现当前协调器。
小组协调员回应
可能的错误代码
偏移提交请求
在v0和v1中,每个分区的时间戳定义为提交时间戳,偏移协调器将保留提交的偏移,直到代理配置中指定的提交时间戳+偏移保留时间为止; 如果时间戳字段未设置,经纪人将提交提交时间设置为提交偏移量之前的接收时间,如果用户希望在代理上保留承诺偏移量比配置的偏移保留时间更长,则用户可以显式设置提交时间戳记。
在v2中,我们删除了时间戳字段,但添加了全局保留时间字段(详见KAFKA-1634); 经纪人将始终将提交时间戳设置为接收时间,但是可以保留提交的偏移量,直到其提交时间戳+提交请求中的用户指定的保留时间。如果未设置保留时间(-1),则默认使用代理偏移保留时间。
请注意,当该API用于不是消费者组的一部分的“简单消费者”时,则该代数必须设置为-1,并且memberId必须为空(不为空)。此外,如果存在具有相同groupId的活动消费者组,则提交将被拒绝(通常使用UNKNOWN_MEMBER_ID或ILLEGAL_GENERATION错误)。
偏移提交响应
可能的错误代码
偏移提取请求
根据对KAFKA-1841的评论 -OffsetCommitRequest API - 时间戳字段不是版本化解决方案 v0和v1 在线路上是相同的,但是v0(0.8.1或更高版本支持)读取zookeeper的偏移量,而v1(在0.8中支持)。 2或更高版本)读取卡夫卡的偏移量。
偏移提取响应
请注意,如果没有与该用户组下的主题分区相关联的偏移量,代理不会设置错误代码(因为它不是真正的错误),而是返回空元数据并将偏移量字段设置为-1。
Offset Fetch Request v0和v1之间没有格式差异。功能明智的是,Offset Fetch Request v0将从zookeeper中获取偏移量,Offset Fetch Request v1将从Kafka获取偏移量。
可能的错误代码
团体会员API
客户使用这些请求来参与卡夫卡管理的客户群。从高层次来看,集群中的每个组都分配一个经纪人(其组织协调员),以促进集团管理。一旦找到协调器(使用上面的组协调器请求),组成员可以加入组并同步状态,然后使用心跳保持在组中。当客户端关闭时,它使用离开组请求从组中注销。Kafka客户端分配方案概述了协议语义的更多细节 。
会员API的主要用例是消费者组,但是请求是有意通用的,以支持其他情况(例如Kafka Connect组)。这种通用性的代价是将特定的组语义推送到客户端。例如,下面定义的JoinGroup / SyncGroup请求没有支持消费者组的分区分配的明确字段。相反,它们包含通用字节数组,其中可以由消费者客户端实现嵌入分配。
但是,尽管这允许每个客户端实现定义自己的嵌入式架构,但与Kafka工具的兼容性要求客户端使用由Kafka提供的客户端使用的标准嵌入式架构。例如,consumer-groups.sh实用程序假定此格式显示分区分配。因此,我们建议客户端遵循相同的模式,以便这些工具可用于所有客户端实现。
加入组请求
连接组请求由客户端用来成为组的成员。当新成员加入现有组时,所有以前的成员都必须通过发送新的连接组请求重新加入。当成员首次加入组时,memberId将为空(即“”),但重新连接的成员应使用与上一代相同的memberId。
该SessionTimeout
字段用于表示客户端活动。如果协调者在会话超时到期之前没有收到至少一个心跳(见下文),那么该组件将被删除。在版本0.10.1之前,会话超时也被用作完成所需重新平衡的超时。一旦协调器开始重新平衡,组中的每个成员都具有会话超时,以便发送新的JoinGroup请求。如果他们没有这样做,他们将被从组中删除。在0.10.1中,使用单独的RebalanceTimeout
字段创建了一个新版本的JoinGroup请求。一旦重新平衡开始,每个客户端都有一段时间才能重新加入,但请注意,如果会话超时低于重新平衡超时,则客户端仍必须继续发送心跳。
该ProtocolType
字段定义了组实现的嵌入式协议。组协调器确保组中的所有成员支持相同的协议类型。该GroupProtocols
字段中包含的协议名称和元数据的含义取决于协议类型。请注意,连接组请求允许多个协议/元数据对。这样可以进行滚动升级,而不会停机。协调员选择所有成员支持的单个协议。升级的成员包括新版本和旧版本的协议。一旦所有成员升级,协调器将选择GroupProtocols
阵列中首先列出的协议。
消费者群体:下面我们定义消费者群体使用的嵌入式协议。 我们建议所有消费者实施遵循这种格式,以便工具能够在所有客户端上正常工作。
该UserData
字段可以通过自定义分区分配策略使用。例如,在粘性分区实现中,此字段可以包含上一代的分配。在基于资源的分配策略中,它可以包括托管每个消费者实例的机器上的CPU数量。
Kafka Connect使用“连接”协议类型,其协议详细信息是Connect实现的内部。
加入小组回应
在接收组中所有成员的加入组请求后,协调器将选择一个成员作为组长和所有成员支持的协议。领导者将收到成员的完整列表以及所选协议的相关元数据。其他成员,追随者,将收到一大堆成员。 领导者有责任检查每个成员的元数据,并使用下面的SyncGroup请求分配状态。
每个完成加入组阶段后,协调器会 为组增加一个 GenerationId
。它作为对每个成员的响应中的字段返回,并以心跳和偏移提交请求发送。当协调器重新平衡组时,协调器将发送一个错误代码,指示成员需要重新加入。如果成员在重新平衡完成之前没有重新加入,那么它将具有一个旧的generationId,当包含在新请求中时会导致ILLEGAL_GENERATION错误。
消费者组: 协调员负责选择所有成员之间兼容的协议(即分区分配策略)。然后领导者是实际执行选定任务的成员。连接组请求中可以包含多个分配策略,以支持将现有版本升级或更改为不同的分配策略。
可能的错误代码:
同步组请求
同步组请求由组长使用,为当前一代的所有成员分配状态(例如分区分配)。所有成员在加入组后立即发送SyncGroup,但只有领导提供组的作业。
消费者群体:MemberAssignment
消费者群体的字段格式 如下:
使用“消费者”协议类型的所有客户端实现都应该支持此模式。
同步组响应
组中的每个成员将在同步组响应中从领导者接收分配。
可能的错误代码:
心跳请求
一旦成员加入并同步,它将开始发送定期的心跳来保持自己在组中。如果协调器没有收到配置的会话超时的心跳,则该成员将被踢出该组。
心跳反应
可能的错误代码:
离开组请求
要明确离开组,客户端可以发送离开组请求。这是优先于让会话超时过期,因为它允许组更快地重新平衡,消费者意味着在分区可以重新分配给活动成员之前,时间将减少。
离开组响应
可能的错误代码:
管理API
ListGroups请求
该API可用于查找由代理管理的当前组。要获取集群中所有组的列表,您必须将ListGroup发送给所有代理。
ListGroups响应
可能的错误代码:
描述组请求
描述组响应
可能的错误代码:
常量
Api密钥和当前版本
以下是请求中的ApiKey可以为上述每个请求类型采用的数字代码。
API名称 |
ApiKey值 |
ProduceRequest |
0 |
FetchRequest |
1 |
OffsetRequest |
2 |
MetadataRequest |
3 |
非用户面对的控制API |
4-7 |
OffsetCommitRequest |
8 |
OffsetFetchRequest |
9 |
GroupCoordinatorRequest |
10 |
JoinGroupRequest |
11 |
HeartbeatRequest |
12 |
LeaveGroupRequest |
13 |
SyncGroupRequest |
14 |
DescribeGroupsRequest |
15 |
ListGroupsRequest |
16 |
错误代码
我们使用数字代码来指出在服务器上发生了什么问题。这些可以由客户端转换为异常或客户端语言中适当的错误处理机制。以下是当前使用的错误代码表:
错误 |
码 |
Retriable |
描述 |
错误 |
码 |
Retriable |
描述 |
NOERROR |
0 |
|
没有错误 - 它的工作! |
未知 |
-1 |
|
意外的服务器错误 |
OffsetOutOfRange |
1 |
|
请求的偏移超出服务器为给定主题/分区维护的偏移范围。 |
InvalidMessage / CorruptMessage |
2 |
是 |
这表示消息内容与其CRC不匹配 |
UnknownTopicOrPartition |
3 |
是 |
此请求是针对此代理不存在的主题或分区。 |
InvalidMessageSize |
4 |
|
消息的大小不正确 |
LeaderNotAvailable |
五 |
是 |
如果我们正在领导选举的中间,并且目前没有这个分区的领导者,因此它不可用于写入,那么这个错误就会被抛出。 |
NotLeaderForPartition |
6 |
是 |
如果客户端尝试向不是某个分区的前导的副本发送消息,则会抛出此错误。它表示客户端元数据已过期。 |
请求超时 |
7 |
是 |
如果请求超出了请求中用户指定的时间限制,则会抛出此错误。 |
BrokerNotAvailable |
8 |
|
这不是客户端面临的错误,并且当代理不存在时,主要由工具使用。 |
ReplicaNotAvailable |
9 |
|
如果预期在代理上复制副本,但不是(这可以安全地忽略)。 |
MessageSizeTooLarge |
10 |
|
服务器具有可配置的最大消息大小,以避免无限制的内存分配。如果客户端尝试生成大于此最大值的消息,则会抛出此错误。 |
StaleControllerEpochCode |
11 |
|
经纪人到代理人通信的内部错误代码。 |
OffsetMetadataTooLargeCode |
12 |
|
如果指定的字符串大于配置的偏移量元数据的最大值 |
GroupLoadInProgressCode |
14 |
是 |
如果它仍然加载偏移量(在该偏移主题分区的领导者更改之后)或响应组成员资格请求(例如心跳线),则组织元数据由协调器加载时,代理返回此偏移提取请求的错误代码。 |
GroupCoordinatorNotAvailableCode |
15 |
是 |
如果尚未创建偏移主题,或组协调器未处于活动状态,代理将返回组协调器请求,偏移提交和大多数组管理请求的错误代码。 |
NotCoordinatorForGroupCode |
16 |
是 |
如果代理接收到不是协调器的组的偏移获取或提交请求,则代理返回此错误代码。 |
InvalidTopicCode |
17 |
|
对于尝试访问无效主题(例如具有非法名称的主题)的请求,或者尝试写入内部主题(例如消费者偏移主题)的请求。 |
RecordListTooLargeCode |
18 |
|
如果生产请求中的消息批次超过了最大配置的段大小。 |
NotEnoughReplicasCode |
19 |
是 |
同步复制品数量低于配置的最小值时,从生产请求返回,而requiredAcks为-1。 |
NotEnoughReplicasAfterAppendCode |
20 |
是 |
当消息写入日志时,从生成请求返回,但是比所需的更少的同步副本。 |
InvalidRequiredAcksCode |
21 |
|
如果请求的requiredAcks无效(除了-1,1或0以外的任何内容),则从生成请求返回。 |
IllegalGenerationCode |
22 |
|
当请求中提供的生成ID不是当前代的时候,从组成员请求返回(如心跳线)。 |
InconsistentGroupProtocolCode |
23 |
|
当成员提供与当前组不兼容的协议类型或一组协议时,在连接组中返回。 |
InvalidGroupIdCode |
24 |
|
groupId为空或为null时,在连接组中返回。 |
UnknownMemberIdCode |
25 |
|
当memberId不在当前世代时,从组请求返回(偏移提交/提取,心跳等)。 |
InvalidSessionTimeoutCode |
26 |
|
当请求的会话超时超出了代理允许的范围时,返回连接组 |
RebalanceInProgressCode |
27 |
|
当协调员开始重新平衡组时,心跳请求返回。这向客户端指示它应该重新加入组。 |
InvalidCommitOffsetSizeCode |
28 |
|
此错误表示由于超大型元数据而导致偏移提交被拒绝。 |
TopicAuthorizationFailedCode |
29 |
|
当客户端无权访问请求的主题时由代理返回。 |
GroupAuthorizationFailedCode |
三十 |
|
客户端无权访问特定groupId时,由代理返回。 |
ClusterAuthorizationFailedCode |
31 |
|
当客户端没有授权使用中间人或管理API时由代理返回。 |
一些共同的哲学问题
有些人问我们为什么不使用HTTP。有很多原因,最好的是客户端实现者可以利用一些更先进的TCP特性 - 复用请求的能力,同时轮询许多连接的能力等。我们还发现许多HTTP库语言是令人惊讶的破旧。
有人问我们是否不应该支持许多不同的协议。以前的经验是,如果必须在许多协议实现中移植,那么它很难添加和测试新功能。我们的感觉是,大多数用户并没有真正看到多个协议作为一个功能,他们只是想要一个可靠的客户端,他们选择的语言。
另一个问题是为什么我们不采用XMPP,STOMP,AMQP或现有协议。这样做的答案因协议而异,但一般来说,问题是协议确实确定了实施的大部分,如果我们无法控制协议,我们无法做我们正在做的事情。我们的信念是,可以比现有的消息传递系统更好地提供真正的分布式消息传递系统,为此,我们需要构建一些不同的工作方式。
最后一个问题是为什么我们不使用像Protocol Buffers或Thrift这样的系统来定义我们的请求消息。这些软件包非常适合帮助您管理大量和大量的序列化消息。但是我们只有几条消息。跨语言支持有点多(取决于包)。最后,二进制日志格式和有线协议之间的映射是我们稍微仔细管理的,这对于这些系统是不可能的。最后,我们更喜欢显式版本API的样式,并检查这个来推断新值为null,因为它允许更细微的控制兼容性。