今天,我们给大家带来一篇如何利用kafka的api进行客户端编程的文章,这篇文章很简单,就是利用kafka的api创建一个生产者和消费者,生产者不断向kafka写入消息,消费者则不断消费kafka的消息。下面是具体的实例代码。
一、创建配置类config
这个类很简单,只是存放了两个常量,一个是话题topic,一个是线程数threads
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package com.lya.kafka;
/**
* 配置项
* @author liuyazhuang
*
*/
public class config {
/**
* 话题
*/
public static final string topic = "wordcount" ;
/**
* 线程数
*/
public static final integer threads = 1 ;
}
|
二、编程生产者类producerdemo
这个类的主要作用就是向kafka写入相应的消息,并且将消息写入wordcount话题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.lya.kafka;
import java.util.properties;
import kafka.javaapi.producer.producer;
import kafka.producer.keyedmessage;
import kafka.producer.producerconfig;
/**
* 生产者实例
* @author liuyazhuang
*
*/
public class producerdemo {
public static void main(string[] args) throws exception {
properties props = new properties();
props.put( "zk.connect" , "192.168.209.121:2181" );
props.put( "metadata.broker.list" , "192.168.209.121:9092" );
props.put( "serializer.class" , "kafka.serializer.stringencoder" );
props.put( "zk.connectiontimeout.ms" , "15000" );
producerconfig config = new producerconfig(props);
producer<string, string> producer = new producer<string, string>(config);
// 发送业务消息
// 读取文件 读取内存数据库 读socket端口
for ( int i = 1 ; i <= 100 ; i++) {
thread.sleep( 500 );
producer.send( new keyedmessage<string, string>(config.topic,
"this number ===>>> " + i));
}
}
}
|
三、编写消息者类consumerdemo
这个类的主要作用就是消费kafka中wordcount话题的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
package com.lya.kafka;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.properties;
import kafka.consumer.consumer;
import kafka.consumer.consumerconfig;
import kafka.consumer.kafkastream;
import kafka.javaapi.consumer.consumerconnector;
import kafka.message.messageandmetadata;
/**
* 消费者实例
* @author liuyazhuang
*
*/
public class consumerdemo {
public static void main(string[] args) {
properties props = new properties();
props.put( "zookeeper.connect" , "192.168.209.121:2181" );
props.put( "group.id" , "1111" );
props.put( "auto.offset.reset" , "smallest" );
props.put( "zk.connectiontimeout.ms" , "15000" );
consumerconfig config = new consumerconfig(props);
consumerconnector consumer =consumer.createjavaconsumerconnector(config);
map<string, integer> topiccountmap = new hashmap<string, integer>();
topiccountmap.put(config.topic, config.threads);
map<string, list<kafkastream< byte [], byte []>>> consumermap = consumer.createmessagestreams(topiccountmap);
list<kafkastream< byte [], byte []>> streams = consumermap.get(config.topic);
for ( final kafkastream< byte [], byte []> kafkastream : streams){
new thread( new runnable() {
@override
public void run() {
for (messageandmetadata< byte [], byte []> mm : kafkastream){
string msg = new string(mm.message());
system.out.println(msg);
}
}
}).start();
}
}
}
|
四、运行实例
首先,运行消费者类consumerdemo
运行结果如下:
没有打印任何信息。
此时,我们运行生产者类producerdemo
我们再次打开消费者的控制台查看如下:
打印出了生产者生产的消息。
至此,kafka简单客户端编程实例结束。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/l1028386804/article/details/78383783