1.创建maven项目
目录如下:
2.pom文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
<project xmlns= "http://maven.apache.org/pom/4.0.0" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance"
xsi:schemalocation= "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
<modelversion> 4.0 . 0 </modelversion>
<groupid>kafka-maven</groupid>
<artifactid>kafka-maven</artifactid>
<version> 0.0 . 1 -snapshot</version>
<dependencies>
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka_2. 11 </artifactid>
<version> 0.10 . 1.1 </version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-common</artifactid>
<version> 2.2 . 0 </version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-hdfs</artifactid>
<version> 2.2 . 0 </version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-client</artifactid>
<version> 2.2 . 0 </version>
</dependency>
<dependency>
<groupid>org.apache.hbase</groupid>
<artifactid>hbase-client</artifactid>
<version> 1.0 . 3 </version>
</dependency>
<dependency>
<groupid>org.apache.hbase</groupid>
<artifactid>hbase-server</artifactid>
<version> 1.0 . 3 </version>
</dependency>
<dependency>
<groupid>org.apache.hadoop</groupid>
<artifactid>hadoop-hdfs</artifactid>
<version> 2.2 . 0 </version>
</dependency>
<dependency>
<groupid>jdk.tools</groupid>
<artifactid>jdk.tools</artifactid>
<version> 1.7 </version>
<scope>system</scope>
<systempath>${java_home}/lib/tools.jar</systempath>
</dependency>
<dependency>
<groupid>org.apache.httpcomponents</groupid>
<artifactid>httpclient</artifactid>
<version> 4.3 . 6 </version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupid>org.apache.maven.plugins</groupid>
<artifactid>maven-compiler-plugin</artifactid>
<configuration>
<source> 1.7 </source>
<target> 1.7 </target>
</configuration>
</plugin>
</plugins>
</build>
</project>
|
3.kafka生产者kafkaproduce:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package com.lijie.producer;
import java.io.file;
import java.io.fileinputstream;
import java.util.properties;
import org.apache.kafka.clients.producer.callback;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
public class kafkaproduce {
private static properties properties;
static {
properties = new properties();
string path = kafkaproducer. class .getresource( "/" ).getfile().tostring()
+ "kafka.properties" ;
try {
fileinputstream fis = new fileinputstream( new file(path));
properties.load(fis);
} catch (exception e) {
e.printstacktrace();
}
}
/**
* 发送消息
*
* @param topic
* @param key
* @param value
*/
public void sendmsg(string topic, byte [] key, byte [] value) {
// 实例化produce
kafkaproducer< byte [], byte []> kp = new kafkaproducer< byte [], byte []>(
properties);
// 消息封装
producerrecord< byte [], byte []> pr = new producerrecord< byte [], byte []>(
topic, key, value);
// 发送数据
kp.send(pr, new callback() {
// 回调函数
@override
public void oncompletion(recordmetadata metadata,
exception exception) {
if ( null != exception) {
system.out.println( "记录的offset在:" + metadata.offset());
system.out.println(exception.getmessage() + exception);
}
}
});
// 关闭produce
kp.close();
}
}
|
4.kafka消费者kafkaconsume:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package com.lijie.consumer;
import java.io.file;
import java.io.fileinputstream;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.properties;
import org.apache.htrace.fasterxml.jackson.databind.objectmapper;
import com.lijie.pojo.user;
import com.lijie.utils.jsonutils;
import kafka.consumer.consumerconfig;
import kafka.consumer.consumeriterator;
import kafka.consumer.kafkastream;
import kafka.javaapi.consumer.consumerconnector;
import kafka.serializer.stringdecoder;
import kafka.utils.verifiableproperties;
public class kafkaconsume {
private final static string topic = "lijietest" ;
private static properties properties;
static {
properties = new properties();
string path = kafkaconsume. class .getresource( "/" ).getfile().tostring()
+ "kafka.properties" ;
try {
fileinputstream fis = new fileinputstream( new file(path));
properties.load(fis);
} catch (exception e) {
e.printstacktrace();
}
}
/**
* 获取消息
*
* @throws exception
*/
public void getmsg() throws exception {
consumerconfig config = new consumerconfig(properties);
consumerconnector consumer = kafka.consumer.consumer
.createjavaconsumerconnector(config);
map<string, integer> topiccountmap = new hashmap<string, integer>();
topiccountmap.put(topic, new integer( 1 ));
stringdecoder keydecoder = new stringdecoder( new verifiableproperties());
stringdecoder valuedecoder = new stringdecoder(
new verifiableproperties());
map<string, list<kafkastream<string, string>>> consumermap = consumer
.createmessagestreams(topiccountmap, keydecoder, valuedecoder);
kafkastream<string, string> stream = consumermap.get(topic).get( 0 );
consumeriterator<string, string> it = stream.iterator();
while (it.hasnext()) {
string json = it.next().message();
user user = (user) jsonutils.jsontoobj(json, user. class );
system.out.println(user);
}
}
}
|
5.kafka.properties文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
##produce
bootstrap.servers= 192.168 . 80.123 : 9092
producer.type=sync
request.required.acks= 1
serializer. class =kafka.serializer.defaultencoder
key.serializer=org.apache.kafka.common.serialization.bytearrayserializer
value.serializer=org.apache.kafka.common.serialization.bytearrayserializer
bak.partitioner. class =kafka.producer.defaultpartitioner
bak.key.serializer=org.apache.kafka.common.serialization.stringserializer
bak.value.serializer=org.apache.kafka.common.serialization.stringserializer
##consume
zookeeper.connect= 192.168 . 80.123 : 2181
group.id=lijiegroup
zookeeper.session.timeout.ms= 4000
zookeeper.sync.time.ms= 200
auto.commit.interval.ms= 1000
auto.offset.reset=smallest
serializer. class =kafka.serializer.stringencoder
|
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/qq_20641565/article/details/56277537