软件神马的装好了,该写代码了,首先学习Producer。
Tutorial的代码只能用来玩,我们真正要做的是用Kafka嵌入到我们自己的程序中。因此用java客户端创建自己的consumer和producer才是正经事。下面我们来看正经事!
先来看看producer的Java客户端,这里介绍如何配置以及新建一个producer。并用自带的命令行console来测试新建的producer。
这里有代码,我们用这个代码来demo https://github.com/gwenshap/kafka-examples
代码在github上,因此我们要下载下来
git clone https://github.com/gwenshap/kafka-examples
我放在workspace 文件夹里,cd进入SimpleCounter文件夹,使用Maven
#mvn install
显示success
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
ls 会发现多出一个叫target的文件夹,OK.成功了
查看创建的topic first
bin/kafka-topics.sh --list --zookeeper localhost:2181
开consumer的时候发生了错误:
WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
OMG!这是什么鬼?!
哦,原来是没开zookeeper,好,这就开
bin/zookeeper-server-start.sh config/zookeeper.properties &
先开自带的consumer console :
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 (test1是我以前创建的一个topic)
另外开一个窗口,再开producer:
./run_params.sh localhost:9092 test1 old sync 500 10 (sync是说同步的,producer写一个,consumer就出一个)
看consumer 窗口:
这几个数就一个一个的出现了。下面是运行总时间。代码里是0.5秒数一个数,加上一些overhead的delay,5392ms,正常
不错,挺好玩。我们再来玩一个不同步的 (producer写完,consumer一次性打出来,不是一个一个出现的)
./run_params.sh localhost:9092 test1 old async 500 10
可以看到,这个速度快了,因为是batch的,不用一个消息一个消息的等,而是producer发布10个消息之后一起传出来的。
counter类型参数改成new也是一个样的。
Procuder 的代码写完了,下一篇即将写consumer的。