kafka 学习笔记(二)之Java Producer客户端

时间:2023-01-12 08:30:22

软件神马的装好了,该写代码了,首先学习Producer。

Tutorial的代码只能用来玩,我们真正要做的是用Kafka嵌入到我们自己的程序中。因此用java客户端创建自己的consumer和producer才是正经事。下面我们来看正经事!

先来看看producer的Java客户端,这里介绍如何配置以及新建一个producer。并用自带的命令行console来测试新建的producer。

这里有代码,我们用这个代码来demo https://github.com/gwenshap/kafka-examples


代码在github上,因此我们要下载下来

git clone https://github.com/gwenshap/kafka-examples

我放在workspace 文件夹里,cd进入SimpleCounter文件夹,使用Maven

#mvn install

显示success

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

ls 会发现多出一个叫target的文件夹,OK.成功了


查看创建的topic first

bin/kafka-topics.sh --list --zookeeper localhost:2181


开consumer的时候发生了错误:

WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)

OMG!这是什么鬼?!

哦,原来是没开zookeeper,好,这就开

bin/zookeeper-server-start.sh config/zookeeper.properties &


先开自带的consumer console :

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 (test1是我以前创建的一个topic)

另外开一个窗口,再开producer:

./run_params.sh localhost:9092 test1 old sync 500 10  (sync是说同步的,producer写一个,consumer就出一个)

看consumer 窗口:

kafka 学习笔记(二)之Java Producer客户端

这几个数就一个一个的出现了。下面是运行总时间。代码里是0.5秒数一个数,加上一些overhead的delay,5392ms,正常


不错,挺好玩。我们再来玩一个不同步的 (producer写完,consumer一次性打出来,不是一个一个出现的)

 ./run_params.sh localhost:9092 test1 old async 500 10

kafka 学习笔记(二)之Java Producer客户端

可以看到,这个速度快了,因为是batch的,不用一个消息一个消息的等,而是producer发布10个消息之后一起传出来的。


counter类型参数改成new也是一个样的。

Procuder 的代码写完了,下一篇即将写consumer的。