[Kafka] - Kafka Java Consumer实现(二)

时间:2023-01-08 17:20:28

Kafka提供了两种Consumer API,分别是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API)

High Level Consumer API:高度抽象的Kafka消费者API;将底层具体获取数据、更新offset、设置偏移量等操作屏蔽掉,直接将操作数据流的处理工作提供给编写程序的人员。优点是:操作简单;缺点:可操作性太差,无法按照自己的业务场景选择处理方式。(入口类:ConsumerConnector)

Lower Level Consumer API:通过直接操作底层API获取数据的方式获取Kafka中的数据,需要自行给定分区、偏移量等属性。优点:可操作性强;缺点:代码相对而言比较复杂。(入口类:SimpleConsumer)

这里主要将High Level Consumer API使用Java代码实现并测试:

Lower Level Consumer API详见博客:[Kafka] - Kafka Java Consumer实现(一)

========================================================================

一、JavaKafkaConsumerHighAPI:使用Kafka High Level Consumer API多线程读取数据的相关API实现,具体代码如下:

import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties; import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; /**
* 自定义简单Kafka消费者, 使用高级API
* Created by gerry on 12/21.
*/
public class JavaKafkaConsumerHighAPI implements Runnable {
/**
* Kafka数据消费对象
*/
private ConsumerConnector consumer; /**
* Kafka Topic名称
*/
private String topic; /**
* 线程数量,一般就是Topic的分区数量
*/
private int numThreads; /**
* 线程池
*/
private ExecutorService executorPool; /**
* 构造函数
*
* @param topic Kafka消息Topic主题
* @param numThreads 处理数据的线程数/可以理解为Topic的分区数
* @param zookeeper Kafka的Zookeeper连接字符串
* @param groupId 该消费者所属group ID的值
*/
public JavaKafkaConsumerHighAPI(String topic, int numThreads, String zookeeper, String groupId) {
// 1. 创建Kafka连接器
this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));
// 2. 数据赋值
this.topic = topic;
this.numThreads = numThreads;
} @Override
public void run() {
// 1. 指定Topic
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(this.topic, this.numThreads); // 2. 指定数据的解码器
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); // 3. 获取连接数据的迭代器对象集合
/**
* Key: Topic主题
* Value: 对应Topic的数据流读取器,大小是topicCountMap中指定的topic大小
*/
Map<String, List<KafkaStream<String, String>>> consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); // 4. 从返回结果中获取对应topic的数据流处理器
List<KafkaStream<String, String>> streams = consumerMap.get(this.topic); // 5. 创建线程池
this.executorPool = Executors.newFixedThreadPool(this.numThreads); // 6. 构建数据输出对象
int threadNumber = 0;
for (final KafkaStream<String, String> stream : streams) {
this.executorPool.submit(new ConsumerKafkaStreamProcesser(stream, threadNumber));
threadNumber++;
}
} public void shutdown() {
// 1. 关闭和Kafka的连接,这样会导致stream.hashNext返回false
if (this.consumer != null) {
this.consumer.shutdown();
} // 2. 关闭线程池,会等待线程的执行完成
if (this.executorPool != null) {
// 2.1 关闭线程池
this.executorPool.shutdown(); // 2.2. 等待关闭完成, 等待五秒
try {
if (!this.executorPool.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly!!");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly!!");
}
} } /**
* 根据传入的zk的连接信息和groupID的值创建对应的ConsumerConfig对象
*
* @param zookeeper zk的连接信息,类似于:<br/>
* hadoop-senior01.ibeifeng.com:2181,hadoop-senior02.ibeifeng.com:2181/kafka
* @param groupId 该kafka consumer所属的group id的值, group id值一样的kafka consumer会进行负载均衡
* @return Kafka连接信息
*/
private ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
// 1. 构建属性对象
Properties prop = new Properties();
// 2. 添加相关属性
prop.put("group.id", groupId); // 指定分组id
prop.put("zookeeper.connect", zookeeper); // 指定zk的连接url
prop.put("zookeeper.session.timeout.ms", "400"); //
prop.put("zookeeper.sync.time.ms", "200");
prop.put("auto.commit.interval.ms", "1000");
// 3. 构建ConsumerConfig对象
return new ConsumerConfig(prop);
} /**
* Kafka消费者数据处理线程
*/
public static class ConsumerKafkaStreamProcesser implements Runnable {
// Kafka数据流
private KafkaStream<String, String> stream;
// 线程ID编号
private int threadNumber; public ConsumerKafkaStreamProcesser(KafkaStream<String, String> stream, int threadNumber) {
this.stream = stream;
this.threadNumber = threadNumber;
} @Override
public void run() {
// 1. 获取数据迭代器
ConsumerIterator<String, String> iter = this.stream.iterator();
// 2. 迭代输出数据
while (iter.hasNext()) {
// 2.1 获取数据值
MessageAndMetadata value = iter.next(); // 2.2 输出
System.out.println(this.threadNumber + ":" + ":" + value.offset() + value.key() + ":" + value.message());
}
// 3. 表示当前线程执行完成
System.out.println("Shutdown Thread:" + this.threadNumber);
}
}
}

二、JavaKafkaConsumerHighAPITest:测试类

/**
* Created by ibf on 12/21.
*/
public class JavaKafkaConsumerHighAPITest {
public static void main(String[] args) {
String zookeeper = "192.168.187.146:2181";
String groupId = "group1";
String topic = "test2";
int threads = 1; JavaKafkaConsumerHighAPI example = new JavaKafkaConsumerHighAPI(topic, threads, zookeeper, groupId);
new Thread(example).start(); // 执行10秒后结束
int sleepMillis = 600000;
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 关闭
example.shutdown();
}
}

三、运行测试截图

Kafka相关命令可以参考博客[Kafka] - Kafka基本操作命令, 测试截图如下:

[Kafka] - Kafka Java Consumer实现(二)

[Kafka] - Kafka Java Consumer实现(二)

至此,开发基本完成

========================================================

四、Kafka Pom文件依赖

<properties>
<kafka.version>0.8.2.1</kafka.version>
</properties> <dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>

[Kafka] - Kafka Java Consumer实现(二)的更多相关文章

  1. &lbrack;Kafka&rsqb; - Kafka Java Consumer实现&lpar;一&rpar;

    Kafka提供了两种Consumer API,分别是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API) H ...

  2. 关于Kafka java consumer管理TCP连接的讨论

    本篇是<关于Kafka producer管理TCP连接的讨论>的续篇,主要讨论Kafka java consumer是如何管理TCP连接.实际上,这两篇大部分的内容是相同的,即consum ...

  3. Java进阶专题&lpar;二十一&rpar; 消息中间件架构体系(3)-- Kafka研究

    前言 Kafka 是一款分布式消息发布和订阅系统,具有高性能.高吞吐量的特点而被广泛应用与大数据传输场景.它是由 LinkedIn 公司开发,使用 Scala 语言编写,之后成为 Apache 基金会 ...

  4. Kafka Java consumer动态修改topic订阅

    前段时间在Kafka QQ群中有人问及此事——关于Java consumer如何动态修改topic订阅的问题.仔细一想才发现这的确是个好问题,因为如果简单地在另一个线程中直接持有consumer实例然 ...

  5. kafka集群和zookeeper集群的部署,kafka的java代码示例

    来自:http://doc.okbase.net/QING____/archive/19447.html 也可参考: http://blog.csdn.net/21aspnet/article/det ...

  6. kafka原理和实践(二)spring-kafka简单实践

    系列目录 kafka原理和实践(一)原理:10分钟入门 kafka原理和实践(二)spring-kafka简单实践 kafka原理和实践(三)spring-kafka生产者源码 kafka原理和实践( ...

  7. zookeeper&plus;kafka集群安装之二

    zookeeper+kafka集群安装之二 此为上一篇文章的续篇, kafka安装需要依赖zookeeper, 本文与上一篇文章都是真正分布式安装配置, 可以直接用于生产环境. zookeeper安装 ...

  8. Kafka设计解析(十三)Kafka消费组&lpar;consumer group&rpar;

    转载自 huxihx,原文链接 Kafka消费组(consumer group) 一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka ...

  9. 4 kafka集群部署及kafka生产者java客户端编程 &plus; kafka消费者java客户端编程

    本博文的主要内容有   kafka的单机模式部署 kafka的分布式模式部署 生产者java客户端编程 消费者java客户端编程 运行kafka ,需要依赖 zookeeper,你可以使用已有的 zo ...

随机推荐

  1. Unity5 新功能解析--物理渲染与standard shader

    Unity5 新功能解析--物理渲染与standard shader http://blog.csdn.net/leonwei/article/details/48395061 物理渲染是UNITY5 ...

  2. 关于 ES6箭头函数

    转自  http://simplyy.space/article/577c5b0dcbe0a3e656c87c24 多个连续的箭头函数与柯里化 高阶函数 高阶函数定义:将函数作为参数或者返回值是函 ...

  3. BigDecimal工具类处理精度计算

    /** * Created by My_coder on 2017-07-27. * 加减乘除计算工具类 */ public class BigDecimalUtil { private BigDec ...

  4. ES踩坑笔记

    现在开始在业务上使用ES,记录一些踩坑经历,做点笔记. 2018-11-13 source不返回问题 使用了角色校验,客户端插入成功之后获取数据没有source,和查询参数无关. 检查mapping, ...

  5. JavaFile I&sol;O

    Java流类图结构: 流的概念和作用: 流是一组有顺序的,有起点和终点的字节集合,是对数据传输的总称或抽象.及数据在两设备间的传输称为流,流的本质是数据传输,根据数据传输特性将抽象为各种类,方便更直观 ...

  6. 关于requestAnimationFrame与setInterval的一点差异

    requestAnimationFrame与setInterval都可以实现循环触发事件,但是setInterval是基于时间的,而requestAnimationFrame是基于帧数的,在我的一次开 ...

  7. mysql存储引擎选择&lpar;转&rpar;

    MySQL 的存储引擎可能是所有关系型数据库产品中最具有特色的了,不仅可以同时使用多种存储引擎,而且每种存储引擎和MySQL之间使用插件方式这种非常松的耦合关系.由于各存储引擎功能特性差异较大,这篇文 ...

  8. python下的socket常用方法举例

    python下的socket 1.简单的server和client端的socket代码 server.py: #!/usr/bin/env python #_*_ coding:utf-8 _*_ i ...

  9. 爬虫--scrapy&plus;redis分布式爬取58同城北京全站租房数据

    作业需求: 1.基于Spider或者CrawlSpider进行租房信息的爬取 2.本机搭建分布式环境对租房信息进行爬取 3.搭建多台机器的分布式环境,多台机器同时进行租房数据爬取 建议:用Pychar ...

  10. Linux命令&lowbar;2

    P42 远程管理 命令 目标 关机/重启 shutdown 查看或配置网卡信息 ifconfig ping 远程登录和复制文件 ssh scp 01.关机/重启 命令: shutdown  选项  时 ...