Kafka学习(三)-------- Kafka核心之Cosumer

时间:2022-07-13 19:07:45

了解了什么是kafka( https://www.cnblogs.com/tree1123/p/11226880.html)以后

学习Kafka核心之消费者,kafka的消费者经过几次版本变化,特别容易混乱,所以一定要搞清楚是哪个版本再研究。

一、旧版本consumer

只有旧版本(0.9以前)才有 high-level consumer 和 low-level consumer之分,很多的文章提到的就是这两个:低阶消费者和高阶消费者,低阶消费者更灵活但是需要自己维护很多东西,高阶就死板一点但是不需要维护太多东西。

high-level consumer就是消费者组。

low-level consumer是单独一个消费者,单个consumer没有什么消费者组的概念,与其他consumer相互之间不关联。

1、low-level consumer

low-level consumer底层实现是

SimpleConsumer 他可以自行管理消费者

Storm的Kafka插件 storm-kafka就是使用了SimpleConsumer

优点是灵活 , 可以从任意位置拿消息 。

如果需要: 重复读取数据 只消费部分分区数据 精确消费 就得用这个,

不过必须自己处理位移提交 寻找分区leader broker 处理leader变更。

接口中的方法:
fetch
send 发送请求
getOffsetBefore
commitOffsets
fetchOffsets
earliestOrlatestOffset
close

使用步骤:

参照官网,比较复杂需要好几步来拉取消息。

Find an active Broker and find out which Broker is the leader for your topic and partition

找到活跃的broker 找到哪个broker是你的topic和partition的leader

Determine who the replica Brokers are for your topic and partition

查出replica 的brokers

Build the request defining what data you are interested in

建立请求

Fetch the data

拿数据

Identify and recover from leader changes

leader变化时恢复

也可以查询一些offset等metadata信息,具体代码如下。

//根据指定的分区从主题元数据中找到主副本
SimpleConsumer consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
"leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata(); String leader = metaData.leader().host(); //获取分区的offset等信息
//比如获取lastoffset
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); long[] offsets = response.offsets(topic, partition);
long lastoffset = offsets[0];

这个api现在应用不多,除非你有特殊需求,比如要自己写监控,你可能需要更多的元数据信息。

2、high-level consumer

主要使用的类:ConsumerConnector

屏蔽了每个topic的每个Partition的offset的管理(自动读取zookeeper中该Consumer group的last offset)

Broker失败转移,增减Partition Consumer时的负载均衡(当Partiotion和Consumer增减时,Kafka自动负载均衡)

这些功能low-level consumer都需要自己实现的。

主要方法如下:
createMessageStreams
createMessageStreamsByFilter
commitOffsets
setconsumerReblanceListener
shutdown

group通过zookeeper完成核心功能,

zookeeper目录结构如下:

/consumers/groupId/ids/consumre.id

记录该consumer的订阅信息,还被用来监听consumer存活状态。这是一个临时节点,会话失效将会自动删除。

/consumers/groupId/owners/topic/partition

保存consumer各个消费线程的id,执行rebalance时保存。

/consumers/groupId/offsets/topic/partition

保存该group消费指定分区的位移信息。

这个consumer支持多线程设计,只创建一个consumer实例,但如果是多个分区,将会自动创建多个线程消费。

使用步骤:

   Properties properties = new Properties();
properties.put("zookeeper.connect", "ip1:2181,ip2:2181,ip3:2181");//声明zk
properties.put("group.id", "group03");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据 如果是多线程在这里处理多分区的情况
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
} //auto.offset.reset 默认值为largest
//从头消费 properties.put("auto.offset.reset", "smallest");

很简单,我们0.9版本之前使用的很多都是他,集成spring的方法等等。不过0.9版本以后新的consumer出现了。

二、新版本consumer

先说一下版本的问题:

Kafka 0.10.0.0之后 增加了 Kafka Streams 所以Kafka1.0开始Streams 就稳定了。

kafka security 0.9.0.0以后 0.10.0.1之后稳定

0.10.1.0之后 新版本consumer稳定

storm有两个连kafka的包:

storm-kafka 使用了旧版本的consumer

storm-kafka-client 使用了新版本consumer

kafka 0.9.0.0废弃了旧版producer和consumer 旧版时scala版 新版用java开发

版本 推荐producer 推荐consumer 原因
0.8.2.2 旧版 旧版 新producer尚不稳定
0.9.0.x 新版 旧版 新producer稳定
0.10.0.x 新版 旧版 新consumer不稳定
0.10.1.0 新版 新版 新consumer稳定
0.10.2.x 新版 新版 都稳定了

旧版本中offset管理依托zookeeper,新版本中不在依靠zookeeper。

语言 包名 主要使用类
旧版本 scala kafka.consumer.* ZookeeperConsumerConnector SimpleConsumer
新版本 java org.apache.kafka.clients.consumer.* KafkaConsumer

新版本的几个核心概念:

consumer group

消费者使用一个消费者组名(group.id)来标记自己,topic的每条消息都只会发送到每个订阅他的消费者组的一个消费者实例上。

1、一个消费者组有若干个消费者。

2、对于同一个group,topic的每条消息只能被发送到group下的一个consumer实例上。

3、topic消息可以被发送到多个group中。

consumer端offset

记录每一个consumer消费的分区的位置

kafka没有把这个放在服务器端,保存在了consumer group中,并定期持久化。

旧版本会把这个offset定期存在zookeeper中:路径是 /consumers/groupid/offsets/topic/partitionid

新版本将offset放在了一个内部topic中:__consumer_offsets(前面两个下划线) 里面有50个分区

所以新版本的consumer就不需要连zookeeper了。

旧版本设置offsets.storage=kafka设置位移提交到这,不常使用。

__consumer_offsets中的结构: key = group.id+topic+partition value=offset

consumer group reblance

单个consumer是没有rebalance的。

他规定了一个consumer group下的所有consumer如何去分配所有的分区。

单线程示例代码:
Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset","earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
try{
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}finally{
consumer.close();
}

很简单,1、只需要配置kafka的server groupid autocommit 序列化 autooffsetreset(其中 bootstrap.server group.id key.deserializer value.deserializer 必须指定);

2、用这些Properties构建consumer对象(KafkaConsumer还有其他构造,可以把序列化传进去);

3、subscribe订阅topic列表(可以用正则订阅Pattern.compile("kafka.*")

使用正则必须指定一个listener subscribe(Pattern pattern, ConsumerRebalanceListener listener)); 可以重写这个接口来实现 分区变更时的逻辑。如果设置了enable.auto.commit = true 就不用理会这个逻辑。

4、然后循环poll消息(这里的1000是超时设定,如果没有很多数据,也就等一秒);

5、处理消息(打印了offset key value 这里写处理逻辑)。

6、关闭KafkaConsumer(可以传一个timeout值 等待秒数 默认是30)。

Properties详解:

bootstrap.server(最好用主机名不用ip kafka内部用的主机名 除非自己配置了ip)

deserializer 反序列化consumer从broker端获取的是字节数组,还原回对象类型。

默认有十几种:StringDeserializer LongDeserializer DoubleDeserializer。。

也可以自定义:定义serializer格式 创建自定义deserializer类实现Deserializer 接口 重写逻辑

除了四个必传的 bootstrap.server group.id key.deserializer value.deserializer

还有session.timeout.ms "coordinator检测失败的时间"

是检测consumer挂掉的时间 为了可以及时的rebalance 默认是10秒 可以设置更小的值避免消息延迟。

max.poll.interval.ms "consumer处理逻辑最大时间"

处理逻辑比较复杂的时候 可以设置这个值 避免造成不必要的 rebalance ,因为两次poll时间超过了这个参数,kafka认为这个consumer已经跟不上了,会踢出组,而且不能提交offset,就会重复消费。默认是5分钟。

auto.offset.reset "无位移或者位移越界时kafka的应对策略"

所以如果启动了一个group从头消费 成功提交位移后 重启后还是接着消费 这个参数无效

所以3个值的解释是:

earliset 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从最早的位移消费

latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

(注意kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中) 、

我们这是说的是新版本:kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面))

enable.auto.commit 是否自动提交位移

true 自动提交 false需要用户手动提交 有只处理一次需要的 最近设置为false自己控制。

fetch.max.bytes consumer单次获取最大字节数

max.poll.records 单次poll返回的最大消息数

默认500条 如果消费很轻量 可以适当提高这个值 增加消费速度。

hearbeat.interval.ms consumer其他组员感知rabalance的时间

该值必须小于 session.timeout.ms 如果检测到 consumer挂掉 也就根本无法感知rabalance了

connections.max.idle.ms 定期关闭连接的时间

默认是9分钟 可以设置为-1 永不关闭

poll方法详解:

(旧版本:多分区多线程 新版本:一个线程管理多个socket连接)

但新版本KafkaConsumer是双线程的,主线程负责:消息获取,rebalance,coordinator,位移提交等等,

另一个是后台心跳线程。

根据上边的各种配置,poll方法会找到offset,当获取了足够多的可用数据,或者等待时间超过了指定的超时时间,就会返回。

java consumer不是线程安全的,同一个KafkaConsumer用在了多个线程中,将会报Kafka Consumer is not safe for multi-threaded assess异常。可以加一个同步锁进行保护。

poll的超时参数,已经说过1000的话是超时设定,如果没有很多数据,也就等一秒,就返回了,比如定时5秒的将消息写入,就可以将超时参数设置为5000,达到效率最大化。

如果没有定时任务呢,那就设置为 Long.MAX_VALUE 未获取足够多的数据就无限等待。这里要捕获一下WakeupException。

consumer offset详解:

consumer需要定期向kafka提交自己的offset信息。已经学过 新版本将他提交到了一个topic中 __consumer_offsets。

offset有一个更大的作用是实现交付语义:

最多一次 at most once 可能丢失 不会重复

最少一次 at least once 可能重复 不会丢失

精确一次 exactly once 不丢失 不重复 就一次

若consumer在消费之前提交位移 就实现了at most once

若是消费后提交 就实现了 at least once 默认是这个。

consumer的多个位置信息:

​ 上次提交的位置 当前位置 水位 日志最新位移

0 1 。。 5 。。 10 。。 15

上次提交位置:consumer最近一次提交的offset值;

当前位置:consumer上次poll 到了这个位置 但是还没提交;

水位:这是分区日志的管理 consumer无法读取水位以上的消息;

最新位移: 也是分区日志的管理 最大的位移值 一定不会比水位小。

新版本的consumer会在broker选一个broker作为consumergroup的coordinator,用于实现组成员管理,消费分配方案,提交位移。如果consumer崩溃,他负责的分区就分配给其他consumer,如果没有做好位移提交就可能重复消费。

多次提交的情况,kafka只关注最新一次的提交。

默认consumer自动提交位移 提交间隔为5秒 可以通过 auto.commit.interval.ms 设置这个间隔。

自动提交可以减少开发,但是可能重复消费,所以需要精准消费时还是要手动提交。设置手动提交 enable.auto.commit = false,然后调用 consumer.commitSync() 或者 consumer.commitAync() Sync为同步方式,阻塞 Aync为异步方式,不会阻塞。这两个方法可以传参,指定为哪个分区提交,这样更合理一些。

(旧版本的自动提交设置是 auto.commit.enable 默认间隔为60秒)

rebalance详解:

rebalance是consumer group如何分配topic的所有分区。

正常情况,比如有10个分区,5个consumer 那么consumer group将为每个consumer 平均分配两个分区。

每个分区只会分给一个consumer实例。有consumer出现问题,会重新执行这个过程,这个过程就是rebalance。

(旧版本通过zookeeper管理rebalance,新版本会选取某个broker为group coordinator来管理)

rebalance的触发条件:

1、有新的consumer加入,或者有consumer离开或者挂掉。

2、group订阅的topic发生变更,比如正则订阅。

3、group订阅的分区数发生变化。

第一个经常出现,不一定是挂掉,也可能是处理太慢,为了避免频繁rebalance,要调整好request.timeout.ms max.poll.records和ma.poll.interval.

rebalance分区策略:

partition.assignment.strategy 设置 自定义分区策略-创建分区器 assignor

range策略(默认),将分区划分为分区段,一次分配给每个consumer。

round-robin策略,轮询分配。

sticky策略(0.11.0.0出现,更优秀),range策略在订阅多个topic时会不均匀。

sticky有两个原则,当两者发生冲突时,第一个目标优先于第二个目标。

  1. 分区的分配要尽可能的均匀;
  2. 分区的分配尽可能的与上次分配的保持相同。

rebalance generation分代机制保证rabalance时重复提交的问题,延迟的offset提交时旧的generation信息会报异常ILLEGAL_GENERATION

rebalance过程:

1、确定coordinator所在的broker,建立socket连接。

确定算法: Math.abs(groupID.hashCode) % offsets.topic.num.partition 参数值(默认50)

寻找__consumer_offset分区50的leader副本所在的broker,该broker即为这个group的coordinator

2、加入组

所有consumer会向coordinator发送JoinGroup请求,收到所有请求后选一个consumer做leader(这个leader是consumer coordinator是broker),coordinator把成员和订阅信息发给coordinator。

3、同步分配方案

leader制定分配方案,通过SyncGroup请求发给coordinator,每个consumer也会发请求返回方案。

kafka也支持offset不提交到__consumer_offset,可以自定义,这时候就需要实现一个监听器ConsumerRebalanceListener,在这里重新处理Rebalance的逻辑。

多线程示例代码:
这里要根据自身需求开发,我这里只举一个简单的例子,就是几个分区就启动几个consumer,一一对应。
三个类:
Main:
public static void main(String[] args) { String bootstrapServers = "kafka01:9092,kafka02:9092";
String groupId = "test";
String topic = "testtopic";
int consumerNum = 3;
ConsumerGroup cg = new ConsumerGroup(consumerNum,bootstrapServers,groupId,topic);
cg.execute();
} import java.util.ArrayList;
import java.util.List; public class ConsumerGroup { private List<ConsumerRunnable> consumers; public ConsumerGroup(int consumerNum,String bootstrapServers,String groupId,String topic){ consumers = new ArrayList<>(consumerNum); for(int i=0;i < consumerNum;i++){
ConsumerRunnable ConsumerRunnable = new ConsumerRunnable(bootstrapServers,groupId,topic);
consumers.add(ConsumerRunnable);
}
} public void execute(){ for(ConsumerRunnable consumerRunnable:consumers){
new Thread(consumerRunnable).start();
}
}
} import java.util.Arrays;
import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerRunnable implements Runnable{ private final KafkaConsumer<String,String> consumer; public ConsumerRunnable(String bootstrapServers,String groupId,String topic){ Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset","earliest");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
} @Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
standalone consumer

有一些需求,需要指定一个消费者消费某一个分区。彼此之间不干扰,一个standalone consumer崩溃不会影响其他。

类似旧版本的低阶消费者。

示例代码如下:consumer.assign方法订阅分区

public static void main(String[] args) {

		Properties props = new Properties();
props.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset","earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> allpartitions = consumer.partitionsFor("testtopic");
if(allpartitions!=null && !allpartitions.isEmpty()){
for(PartitionInfo partitionInfo:allpartitions){
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
}
consumer.assign(partitions);
} while (true) {
ConsumerRecords<String, String> records = consumer.poll(10);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
} }

以上为kafka消费者的学习,不同的具体细节还需要通过官网文档仔细学习。

更多实时计算,Flink,Kafka等相关技术博文,欢迎关注实时流式计算

Kafka学习(三)-------- Kafka核心之Cosumer