kafka生产者和消费者的javaAPI的示例代码

时间:2022-09-22 21:49:05

写了个kafkajava demo 顺便记录下,仅供参考

1.创建maven项目

目录如下:

kafka生产者和消费者的javaAPI的示例代码

2.pom文件:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
  xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelversion>4.0.0</modelversion>
  <groupid>kafka-maven</groupid>
  <artifactid>kafka-maven</artifactid>
  <version>0.0.1-snapshot</version>
  <dependencies>
    <dependency>
      <groupid>org.apache.kafka</groupid>
      <artifactid>kafka_2.11</artifactid>
      <version>0.10.1.1</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-common</artifactid>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-hdfs</artifactid>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-client</artifactid>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hbase</groupid>
      <artifactid>hbase-client</artifactid>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hbase</groupid>
      <artifactid>hbase-server</artifactid>
      <version>1.0.3</version>
    </dependency>
    <dependency>
      <groupid>org.apache.hadoop</groupid>
      <artifactid>hadoop-hdfs</artifactid>
      <version>2.2.0</version>
    </dependency>
    <dependency>
      <groupid>jdk.tools</groupid>
      <artifactid>jdk.tools</artifactid>
      <version>1.7</version>
      <scope>system</scope>
      <systempath>${java_home}/lib/tools.jar</systempath>
    </dependency>
    <dependency>
      <groupid>org.apache.httpcomponents</groupid>
      <artifactid>httpclient</artifactid>
      <version>4.3.6</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupid>org.apache.maven.plugins</groupid>
        <artifactid>maven-compiler-plugin</artifactid>
        <configuration>
          <source>1.7</source>
          <target>1.7</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

3.kafka生产者kafkaproduce:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.lijie.producer;
 
import java.io.file;
import java.io.fileinputstream;
import java.util.properties;
 
import org.apache.kafka.clients.producer.callback;
import org.apache.kafka.clients.producer.kafkaproducer;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
 
public class kafkaproduce {
  private static properties properties;
 
  static {
    properties = new properties();
    string path = kafkaproducer.class.getresource("/").getfile().tostring()
        + "kafka.properties";
    try {
      fileinputstream fis = new fileinputstream(new file(path));
      properties.load(fis);
    } catch (exception e) {
      e.printstacktrace();
    }
  }
 
  /**
   * 发送消息
   *
   * @param topic
   * @param key
   * @param value
   */
  public void sendmsg(string topic, byte[] key, byte[] value) {
 
    // 实例化produce
    kafkaproducer<byte[], byte[]> kp = new kafkaproducer<byte[], byte[]>(
        properties);
 
    // 消息封装
    producerrecord<byte[], byte[]> pr = new producerrecord<byte[], byte[]>(
        topic, key, value);
 
    // 发送数据
    kp.send(pr, new callback() {
      // 回调函数
      @override
      public void oncompletion(recordmetadata metadata,
          exception exception) {
        if (null != exception) {
          system.out.println("记录的offset在:" + metadata.offset());
          system.out.println(exception.getmessage() + exception);
        }
      }
    });
 
    // 关闭produce
    kp.close();
  }
}

4.kafka消费者kafkaconsume:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.lijie.consumer;
 
import java.io.file;
import java.io.fileinputstream;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import java.util.properties;
 
import org.apache.htrace.fasterxml.jackson.databind.objectmapper;
 
import com.lijie.pojo.user;
import com.lijie.utils.jsonutils;
 
import kafka.consumer.consumerconfig;
import kafka.consumer.consumeriterator;
import kafka.consumer.kafkastream;
import kafka.javaapi.consumer.consumerconnector;
import kafka.serializer.stringdecoder;
import kafka.utils.verifiableproperties;
 
public class kafkaconsume {
 
  private final static string topic = "lijietest";
 
  private static properties properties;
 
  static {
    properties = new properties();
    string path = kafkaconsume.class.getresource("/").getfile().tostring()
        + "kafka.properties";
    try {
      fileinputstream fis = new fileinputstream(new file(path));
      properties.load(fis);
    } catch (exception e) {
      e.printstacktrace();
    }
  }
 
  /**
   * 获取消息
   *
   * @throws exception
   */
  public void getmsg() throws exception {
    consumerconfig config = new consumerconfig(properties);
 
    consumerconnector consumer = kafka.consumer.consumer
        .createjavaconsumerconnector(config);
 
    map<string, integer> topiccountmap = new hashmap<string, integer>();
 
    topiccountmap.put(topic, new integer(1));
 
    stringdecoder keydecoder = new stringdecoder(new verifiableproperties());
 
    stringdecoder valuedecoder = new stringdecoder(
        new verifiableproperties());
 
    map<string, list<kafkastream<string, string>>> consumermap = consumer
        .createmessagestreams(topiccountmap, keydecoder, valuedecoder);
 
    kafkastream<string, string> stream = consumermap.get(topic).get(0);
 
    consumeriterator<string, string> it = stream.iterator();
 
    while (it.hasnext()) {
      string json = it.next().message();
      user user = (user) jsonutils.jsontoobj(json, user.class);
      system.out.println(user);
    }
  }
}

5.kafka.properties文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
##produce
bootstrap.servers=192.168.80.123:9092
producer.type=sync
request.required.acks=1
serializer.class=kafka.serializer.defaultencoder
key.serializer=org.apache.kafka.common.serialization.bytearrayserializer
value.serializer=org.apache.kafka.common.serialization.bytearrayserializer
bak.partitioner.class=kafka.producer.defaultpartitioner
bak.key.serializer=org.apache.kafka.common.serialization.stringserializer
bak.value.serializer=org.apache.kafka.common.serialization.stringserializer
 
##consume
zookeeper.connect=192.168.80.123:2181
group.id=lijiegroup
zookeeper.session.timeout.ms=4000
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000
auto.offset.reset=smallest
serializer.class=kafka.serializer.stringencoder

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/qq_20641565/article/details/56277537