不管是把 Kafka 作为消息队列、消息总线还是数据存储平台来使用 ,总是需要有一个可以往 Kafka 写入数据的生产者和一个从 Kafka 读取数据的消费者,或者一个兼具两种角
色的应用程序。
开发者们可以使用 Kafka 内置的客户端 API 开发 Kafka 应用程序。
我们将从 Kafra 生产者的设计和组件讲起,学习如何使用 Kafka 生产者。内容包括:
- 如何创建 KafkaProducer 和 ProducerRecords 对象、如何将记录发送给 Kafka;
- 如何处理从 Kafka 返回的错误;
- 介绍用于控制生产者行为的重要配置选项;
- 深入探讨如何使用不同的分区方棒和序列化器,以及如何自定义序列化器和分区器 。
生产者概览
一个应用程序在很多情况下需要往 Kafka 写入消息 。多样的使用场景意味着多样的需求,尽管生产者 API 使用起来很简单 , 但消息的发送过程还是有点复杂的。
Kafka 发送消息的主要步骤 :
我们从创建一个 ProducerRecord 对象开始, ProducerRecord 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送 ProducerRecord 对象时,生产者要先把键和
值对象序列化成字节数组,这样它们才能够在网络上传输。
接下来,数据被传给分区器。如果之前在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区 ,那么分区器会根据ProducerRecord对象的键来选择一个分区 。
紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker 上。
如果消息成功写入 Kafka,就返回 一 个RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败, 则会返回 一个错误。生产者在收到错误之后会尝试重新发送消息,
几次之后如果还是失败,就返回错误信息。
创建 Kafka生产者
要往 Kafka 写入消息,首先要创建一个生产者对象,井设置一些属性 。 Kafka 生产者有 3个必选的属性。
bootstrap.servers
该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要
提供两个 broker 的信息, 一旦其中一个若宕机,生产者仍然能够连接到集群上。
key.seiralizer
broker 希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因可以把 Java 对象作为键和值发送给 broker。生产者需要知道如何把这些 Java 对象转换成字节数组。
key.seiralizer 必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。
Kafka 客户端默认提供了 ByteArraySerializer(这个只做很少的事情)、 StringSerializer和 IntegerSerizalizer,因此,如果你只
使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器。要注意, key.seiralizer 是必须设置的,就算你打算只发送值内容。
value. seiralizer
与key.seiralizer 一样,value. seiralizer 指定的类会将值序列化。如果键和值都是字符串,可以使用与 key. seiralizer 一样的序列化器。如果键是整数类型而值是字符型 ,那么需要使用不同的序列化器。
下面的代码片段演示了如何创建一个新的生产者,这里只指定了必要的属性,其他使用默认设置。
- 新建一个Properties对象。
- 因为我们打算把键和值定义成字符串类型,所以使用内置的StringSerializer对象。
- 在这里我们创建了一个新的生产者对象,并为键和值设置了恰当的类型,然后把Properties对象传给它。
实例化生产者对象后,接下来就可以开始发送消息了。发送消息主要有以下 3 种方式。
发送并忘记( fire-and-forget )
我们把消息发送给服务器,但井不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候
也会丢失一些消息。
同步发送
我们使用 send () 方怯发送消息 , 它会返回…个 Future 对象,调用 get () 方法进行等待 ,就可以知道消息是否发送成功。
异步发送
我们调用 send () 方怯,并指定一个回调函数, 服务器在返回响应时调用该函数。
发送消息到Kafka
- 生产者的 send () 方住将 ProducerRecord 对象作为参数,所以我们 要先创建 一 个ProducerRecord 对象。 它需要目标主题的名字和要发送的键和值对象,它们都是字符串。键和值对象的类型必须与序列化器和生产者对象相匹配。
- 我们使用生产者的 send () 方法发送 ProducerRecord 对象。从生产者的架构图里可以看到,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。send () 方法会返回 一个包含 RecordMetadata 的 Future 对象,
不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。
我们可以忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消息之前,生产者还是有可能发生其他的异常。这些异常有可能是 SerializationException(说明序列化消息失败)、 BufferExhaustedException 或 TimeoutException (说明缓冲区已
满),又或者是 InterruptException (说明发送线程被中断)。
同步发送消息
KafkaProducer一般会发生两类错误。其中一类是可重试错误 ,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主( no leader )” 错误则可
以通过重新为分区选举首领来解决。 KafkaProducer可以被配置成自动重试,如果在多次重试后仍无能解决问题,应用程序会收到一个重试异常。另一类错误无出通过重试解决,比如
“消息太大”异常。对于这类错误, KafkaProducer不会进行任何重试,直接抛出异常。
异步发送消息
为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。下面是使用回调的一个例子。
为了使用回调,需要一个实现了org.apache. kafka . clients. producer.Callback 接口的类,这个接口只有一个 onCompletion 方法。
生产者的配置
有几个参数在内存使用、性能和可靠性方面对生产者影响比较大.
1. acks
acks 参数指定了 必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的 。这个参数对消息丢失的可能性有重要影响。 有如下选项:
• acks=0 , 生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题 , 导致服务器没有收到消息,那么生产者就无从得知,消息也就丢
失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
• acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),
生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是
同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get ()方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回
调,延迟问题就可以得到缓解,不过吞吐量还是会受发送消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
• acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算
有服务器发生崩溃,整个集群仍然可以运行。它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
2. buffer.memory
该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,
send ()方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffe 。
3. compression .type
默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy 、 gzip 或 lz4 ,它指定了消息被发送给 broker 之前使用哪一种压缩算也进行压缩。 snappy 压缩算怯由 Google发明,
它占用较少的 CPU ,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法 。 gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩
比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
4. retries
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下, retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会
放弃重试并返回错民。默认情况下,生产者会在每次重试之间等待100ms ,不过可以通过retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,
先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不
过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。
你只需要处理那些不可重试的错误或重试次数超出上限的情况。
5. batch.size
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,
批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,
也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
6. linger.ms
该参数指定了生产者在发送批次之前等待更多消息加入批次的时间 。 KafkaProducer会在批次填满或 linger. ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程, 生
产者就会把消息发送出去,就算批次里只有一个消息。把 linger. ms设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延
迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了) 。
7. client.id
该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。
8. max.in.flight.requests.per.connection
该参数指定了生产者在收到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。 把它设为 1 可以保证消息是按照发送的顺序写入服务
器的,即使发生了重试。
9. timeout.ms 、 request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间, metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器
返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与
asks 的配置相匹配一一如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误 。
10. max.block.ms
该参数指定了在调用 send () 方法或使用 partitionsFor() 方能获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方屈就会阻塞。在阻
塞时间达到 max.block.ms时,生产者会抛出超时异常。
11 . max.request.size
该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB ,那么可以发送的单个最大消
息为 1MB ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外, broker 对可接收的消息最大值也有自己的限制( message. max.
bytes ),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝 。
12. receive.buffer. bytes 和 send.buffer.bytes
这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。 如果它们被设为 -1 ,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以
适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽
序列化器
创建一个生产者对象必须指定序列化器。我们已经知道如何使用默认的字符串序列化器, Kafka 还提供了整型和字节数组序列化器,不过它们还不
足以满足大部分场景的需求。接下来演示如何开发自己的序列化器,并介绍 Avro 序列化器作为推荐的备选方案。
自定义序列化器
如果发送到 Kafka 的对象不是简单的字符串或整型,那么可以使用序列化框架来创建消息记录,如 Avro 、 Thrift 或 Protobuf,或者使用自定义序列化器。
只要使用这个 CustomerSerializer, 就可以把消息记录定义成 ProducerRecord<String,Customer>,并且可以直接把 Custolmer 对象传给生产者。这个例子很简单,在不同版
本的序列化器和反序列化器之间调试兼容性问题着实是个挑战 你需要比较原始的字节数组。不建议使用自定义序列化器,而是使用已有的序列化器和反序列化器, 比如 JSON 、 Avro 、 Thrift或 Protobuf。
使用 Avro序列化
Apache Avro (以下简称 Avro )是一种与编程语言无关的序列化格式 。 目的是提供一种共享数据文件的方式。
Avro 数据通过与语言无关的schema 来定义 。 schema 通过 JSON 来描述,数据被序列化成二进制文件或 JSON 文件,不过一般会使用二进制文件。在读写文件时需要用到
schema, schema 一般会被内嵌在数据文件里。Avro 有一个很有意思的特性是,当负责写消息的应用程序使用了新的 schema ,负责读消息的应用程序可以继续处理消息而无需做任何改动,这个特性使得它特别适合用在像
Kafka 这样的消息系统上。
做一些修改。不需要 faxNuMber字段改用elmail
使用 Avro 的好处:我们修改了消息的 schema,但并没有更新所有负责读取数据的应用程序,而这样仍然不会出现异常或阻断性错误,也不需要对现有数据进行大
幅更新。不过这里有以下两个需要注意的地方。
• 用于写入数据和读取数据的 schema 必须是相互兼容的。 Avro 文档提到了一些兼容性原则。
• 反序列化器需要用到用于写入数据的 schema,即使它可能与用于读取数据的 schema 不一样。
- 使用 Avro 的 KafkaAvroSerializer来序列化对象。注意 , AvroSerializer 也可以处理原语,这就是我们以后可以使用字符串作为记录键、使用客户对象作为值的原因 。
- schema. registry.url是一个新的参数,指向 schema 的存储位置。
- Customer是生成的对象。我们会告诉生产者 Customer对象就是记录的值 。
- 实例化一个 ProducerRecord 对象, 并指定 Customer为值的类型,然后再传给它一个Customer对象。
- 把 Customer对象作为记录发送出去, KafkaAvroSerializer会处理剩下的事情。
分区
ProducerRecord 对象包含了目标主题、键和值。 Kafka 的消息是一个个键值对, ProducerRecord 对象可以只包含目标主题和值,键可以设置为默认的 null ,不
过大多数应用程序会用到键。键有两个用途 :可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。 也就是
说,如果一个进程只从一个主题的分区读取数据,那么具有相同键的所有记录都会被该进程读取。要创建一个包含键值的记录,只需像下面这样创建
ProducerRecord 对象:
如果键值为 null , 井且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询( Round Robin)算法将消息均衡地分布到各个分区上。
如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键进行散列(使用 Kafka 自己的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射
到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写人
数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。
只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在分区数量保持不变的情况下,可以保证用户 045189 的记录总是被写到分区 34。在从分
区读取数据肘,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证了,一旦数据仍然留在分区 34 ,但新的记录可能被写到其他分区上 。 如果要使用键来映射
分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。
实现自定义分区策略
默认分区器的特点,它是使用次数最多的分区器。除了散列分区之外,有时候也需要对数据进行不一样的分区。假设你是一个 B2B 供应商,你有一个大客
户,它是孚持设备 Banana 的制造商。 Banana 占据了你整体业务 10% 的份额。如果使用默认的散列分区算怯, Banana 的账号记录将和其他账号记录一起被分配给相同的分区,导致
这个分区比其他分区要大一些。服务器可能因此出现存储空间不足、处理缓慢等问题。我们需要给 Banana 分配单独的分区,然后使用散列分区算住处理其他账号。
下面是一个自 定义分区器的例子 :
- Partitioner接口包含了 conflgure 、 partition 和 close 这 3 个方楼 。 这里我们只实现partition 方法,不过我们真不应该在 partition 方住里硬编码客户的名字 ,而应该通过 configure 方位传进来 。
- 我们只接受字符串作为键,如果不是字符串,就抛出异常 。