Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

时间:2021-03-18 16:41:50

KafkaConsumer概念
消费者和消费者群组
  Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。 往群组里增加消费者是横向伸缩消费能力的主要方式。
我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。  
除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。 Kafka 设计的主要目标之一 ,就是要让 Kafka 主题里的数据能
够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取
到主题所有的消息。不同于传统的消息系统,横向伸缩 Kafka 消费者和消费者群组并不会对性能造成负面影响。

为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组,然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分消息。

Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

消费者群组和分区再均衡
  分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡 。再均衡为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),
不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消
费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
  消费者通过向被指派为群组协调器的 broker (不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间
间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会
话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

  如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者
时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。

创建 Kafka消费者

bootstrap.servers 指定了 Kafka 集群的连接字符串。key.deserializer和 value .deserializer使用指定的类把字节数组转成 Java 对象。group.id不是必需的。它指定了KafkaConsumer属于哪一个消费者群组。
创建不属于任何一个群组的消费者也是可以的,只是这样做不太常见.

Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

订阅主题
Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

可以在调用 subscribe () 方法时传入一个正则表达式匹配多个主题, 如果有人创建了新的主题,并且主题的名字与正则表达式匹配 ,那么会立即触发一次
再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很管用。在Kafka 和其他系统之间复制数据时,
使用正则表达式的方式订阅多个主题是很常见的做发。
要订阅所有与 test 相关的主题,可以这样做 :consumer.subscribe (“test.*");

轮询
  消息轮询是消费者 API 的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,
开发者只需要使用一组简单的 API 来处理从分区返回的数据。

Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

  1. 消费者是一个长期运行的应用程序,它通过持续轮询向Kafka 请求数据。
  2. 就像鲨鱼停止移动就会死掉一样,消费者必须持续对 Kafka 进行轮询,否则会被认为己经死亡 ,它的分区会被移交给群组里的其他消费者。传给poll () 方法的参数是一个超时时间,
  3. 用于控制 poll () 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为 0, poll () 会立即返回 ,否则
    它会在指定的毫秒数内一直等待 broker 返回数据。
  4. poll () 方能返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分区的信息 、 记录在分区里的偏移量 ,以及记录的键值对。我们一般会遍历这个列表 ,逐条处理这些记录。

     poll () 方法有一个超时参数 , 它指定了方法在多久之后可以返回,不管有没有可用的数据都要返回。 超时时间的设置取决于应用程序对响应速度的要求,比如要在多长时间内把控制权归还给执行轮询的线程。

5.把结果保存起来或者对已有的记录进行更新。

6.在退出应用程序之前使用 close () 方法关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡,
            因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息

轮询不只是获取数据那么简单。在第一次调用新消费者的 poll () 方法时,它会负责查找GroupCoordinator , 然后加入群组,接受分配的分区。 如果发生了再均衡,整个过程也是
在轮询期间进行的。心跳也是从轮询里发出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。

消费者的配置

与消费者的性能和可用性有很大关系属性。
1. fetch.min.bytes
  该属性指定了消费者从服务器获取记录的最小字节数。 broker在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时
才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用
数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

2. fetch.max.wait.ms
   fetch.Max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms 。如果没有足够的数据流入Kafka ,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。

如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms 被设为 100ms,并且fetch.min.bytes 被设为1MB ,那么 Kafka 在收到消费者的请求后,

要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据 , 就看哪个条件先得到满足。

3. max.patition.fetch.bytes
  该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB ,KafkaConsumer. poll () 方法从每个分区里返回的记录最多不超过 Max.partition.fecth.bytes
指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因
为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。 max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数大, 否则消费者可能无法读取这些消息,导致消费者一直挂起重试。 
在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll () 方法来避免会话过期和发生分区再均衡,如果单次调用 poll () 返回的数据太多,消费者需要更
多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况, 可以把max.patition.fetch.bytes值改小,或者延长会话过期时间。

4 . session.timeout.ms
  该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在session.timeout.ms指定的时间内发送心跳给群组协调器,就被认为
已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与heartbeat.interval.ms 紧密相关 。heartbeat.interval.ms 指定了 poll () 方法向协调器
发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。所以, 一般需要同时修改这两个属性, heartbeat.interval.ms必须比 session.timeout.ms 小, 一
般是 session.timeout.ms的三分之一。如果 session.timeout.ms是 3s ,那么 heartbeat.interval.ms 应该是 1s 。 把 session.timeout.ms值设置比默认值小,可以更快地检测和恢
复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。

5. auto.offset.reset
  该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest , 意
思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是 earliest ,意思是说,在偏移量无效的情况下,消费者将从
起始位置读取分区的记录。

6. enable.auto.commit
  该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false ,由自
己控制何时提交偏移量。如果把它设为 true ,还可以通过配置 auto.commit.interval.ms属性来控制提交的频率。

7. partition.assignment.strategy
  分区会被分配给群组里的消费者。partition.assignment.strategy根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。 Kafka 有两个默认的分配策略 。
  Range
    该策略会把主题的若干个连续的分区分配给消费者。假设悄费者C1和消费者C2同时订阅了主题 T1和主题 T2 ,并且每个主题有 3 个分区。那么消费者 C1有可能分配到过
    两个主题的分区0和分区1,而消费者 C2 分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消
    费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。

  RoundRobin
    该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1和消费者 C2 分配分区,那么消费者 C1将分到主题 Tl 的分区 0 和分区 2 以及主题 T2
    的分区 1 ,消费者 C2 将分配到主题T1的分区1以及主题T1的分区0和分区2。一般来说 ,如果所有消费者都订阅相同的主题(这种情况很常见) , RoundRobin 策略会给所
    有消费者分配相同数量的分区(或最多就差一个分区)。

可以通过设置 partition.assignment.strategy 来选择分区策略。默认使用的是org.apache.kafka.clients.consumer.RangeAssignor,
这个类实现了 Range 策略,不过也可以把它改成 。org.apache .kafka . clients.consumer.RoundRobinAssignor。我们还可以使用自定
义策略, partition.assignment.strategy 属性的值就是自定义类的名字。

8. client.id
  该属性可以是任意字符串 , broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。

9. max.poll.records
  该属性用于控制单次调用 call () 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。

10. receive.buffer.bytes 和 send.buffer.bytes
  socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为-1 ,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这
些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽 。

提交和偏移量
  每次调用 poll () 方法,它总是返回由生产者写入 Kafka 但还没有被消费者读取过的记录 ,我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。Kafka
不会像其他 JMS 队列那样需要得到消费者的确认,这是 Kafka 的一个独特之处。消费者可以使用 Kafka 来追踪消息在分区里的位置(偏移量)。

我们把更新分区当前位置的操作叫作提交

那么消费者是如何提交偏移量的呢?消费者往一个叫作_consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有
什么用处。如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续
之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
如果提交的偏移量小于客户端处理的最后一个消息的偏移量 ,那么处于两个偏移量之间的消息就会被重复处理 。
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

自动提交
  最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true ,那么每过5s钟,消费者会自动把从 poll () 方告接收到的最大偏移量提交上去。提交时间间隔
由 auto.commit.interval.ms 控制,默认值是 5s。自动提交是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是就会提交从上一次轮询返回的偏移量。
在使用这种简便的方式之前,需要知道它将会带来怎样的结果。假设使用5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再
均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地
提交偏移量,减小可能出现重复消息的时间窗 。

  在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以再次调用之前最好确保所有当前调用返回的消息
都已经处理完毕(在调用 close () 方位之前也会进行自动提交)。 一般情况下不会有什么问题,在处理异常或提前退出轮询时要格外小心 。
自动提交虽然方便 , 但并没有为开发者留有余地来避免重复处理消息。

提交当前偏移量
  大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者 API 提供了另一种提交偏移量的方式 , 开发者可以在必要的时候
提交当前偏移量,而不是基于时间间隔。把 auto.commit.offset 设为 false ,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量最简单也最可靠。
这个 API 会提交由 poll () 方能返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。在处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险。
Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

异步提交
  手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。可以通过降低提交频率来提升吞吐率,但如果发生了再
均衡, 会增加重复消息的数量。这个时候可以使用异步提交 API。只管发送提交请求,无需等待 broker 的响应。

  在成功提交或碰到无怯恢复的错误之前, commitSync () 会一直重试,但是 commitAsync()不会,这也是 commitAsync() 不好的一个地方。它之所以不进行重试,是因为在它收到
服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题 ,服务器收不到请求,自然也不会
作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果commitAsync()() 重新尝试提交偏移量 2000 ,它有可能在偏移量 3000 之后提交成功。这个时
候如果发生再均衡,就会出现重复消息。

commitAsync ()也支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标, 不过如果你要用它来进行重试, 一定要注意提交的顺序。

Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

发送提交请求然后继续做其他事情,如果提交失败,错误信息和偏移量会被记录下来 。

同步和异步组合提交
  组合使用 commitAsync ()和 commitSync  () 。如下

Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

提交特定的偏移量
  调用 commitSync ()或 commitAsync () 它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

消费者 API 允许在调用 commitSync ()和 commitAsync ()方法时传进去希望提交的分区和偏移量的 map。假设你处理了半个批次的消息 , 最后一个来自主题“customers”
分区 3 的消息的偏移量是 5000 , 你可以调用 commitSync () 方法来提交它。因为消费者可能不只读取一个分区, 你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移
量的提交会让代码变复杂。

Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

这里调用的是 commitAsync () ,调用 commitSync()也是完全可以的。当然,在提交特定偏移量时,仍然要处理可能发生的错误。

再均衡监听器

  消费者在退出和进行分区再均衡之前,会做一些清理工作 。在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,
在调用 subscribe () 方法时传进去一 个 ConsumerRebalancelistener实例就可以了。ConsumerRebalancelistener有两个需要实现的方法。

(1) public void onPartitionsRevoked(Collection<TopicPartition> partitions )再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。

(2) public  void onPartitionsAssigned(Collection<TopicPartition> partitions )方法会在重新分配分区之后和消费者开始读取消息之前被调用。

从特定偏移量处开始处理记录

  从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息, 用 seekToBeginning(Collection<TopicPartition> tp ) 和 seekToEnd ( Collection<TopicPartition> tp ) 这两个方法。
Kafka 也为我们提供了用于查找特定偏移量的 API 。
应用程序从 Kafka 读取事件(可能是网站的用户点击事件流 ),对它们进行处理(可能是使用自动程序清理点击操作井添加会话信息),然后把结果保存到数据库。不想丢失任何数据,也不想在数据库
里多次保存相同的结果。 可以每处理一条记录就提交一次偏移量。尽管如此, 在记录被保存到数据库之后以及偏移量被提交之前 ,应用程序仍然有可能发生崩溃,导致重复处理数据,数据库里
就会出现重复记录。

可以在同一个事务里把记录和偏移量都写到数据库 ,在消费者启动或分配到新分区时 ,可以使用 seek ()方告查找保存在数据库里的偏移量。  
下面的例子大致说明了如何使用 ConsumerRebalancelistener和 seek () 方法确保我们是从数据库里保存的偏移量所指定的位置开始处理消息的。

Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

  1. 使用一个虚构的方能来提交数据库事务。大致想怯是这样的 : 在处理完记录之后,将记录和偏移量插入数据库,然后在即将失去分区所有权之前提交事务,确保成功保存了这些信息 。
  2. 使用另一个虚构的方怯来从数据库获取偏移量 , 在分配到新分区的时候,使用 seek()方住定位到那些记录。
  3. 订阅主题之后 , 开始启动消费者 , 我们调用一次 poll () 方法,让消费者加入到消费者群组里,并获取分配到的分区 , 然后马上调用 seek () 方也定位分区的偏移量。

    seek () 方能只更新我们正在使用的位置,在下一次调用 poll () 时就可以获得正确的消息 。 如果 seek () 发生错误( 比如偏移量不存在) , poll () 就会抛出异常。

4.另一个虚构的方法 , 这次要更新的是数据库里用于保存偏移量的表。

通过把偏移量和记录保存到同一个外部系统来实现单次语义可以有很多种方式,不过它们都需要结合使用 ConsulmerRebalancelistener 和 seek () 方法来确保能够及时保存偏移量,
并保证消费者总是能够从正确的位置开始读取消息。

如何退出
  轮询时如果要退出循环,要通过另一个线程调用 consumer.wakeup ( )方法。如果循环运行在主线程里,可以在 ShutdownHook 里调用该方法。consumer.wakeup () 是消费者
唯一 一个可以从其他线程里安全调用的方法。调用 consumer.wakeup () 可以退出 poll() ,并抛出 wakeupException 异常,或者如果调用 consumer.wakeup () 时线程没有等待轮询, 那
么异常将在下一轮调用 poll ()时抛出。我们不需要处理 WakeupException ,因为它只是用于跳出循环的一种方式。 在退出线程之前调用 consumer.close () 是很有必要的, 它
会提交任何还没有提交的东西 , 并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡 ,而不需要等待会话超时。

独立消费者一一为什么以及怎样使用没有群组的消费者

  如果只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和再均衡了, 只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。
如果是这样的话,就不需要订阅主题, 取而代之的是为自己分配分区。一个消费者可以订阅主题(井加入消费者群组),或者为自己分配分区 , 但不能同时做这两件事情。

下面的例子演示了一个消费者是如何为自己分配分区并从分区里读取消息的:
Kafka权威指南 读书笔记之(四)Kafka 消费者一一从 Kafka读取数据

除了不会发生再均衡,也不需要手动查找分区 , 其他的看起来一切正常。不过要记住 ,如果主题增加了新的分区,消费者并不会收到通知。要么周期性地调用 consume.partitionsFor()
方法来检查是否有新分区加入, 要么在添加新分区后重启应用程序。