Java编程操作Kafka

时间:2024-11-17 07:57:10

同步生产消息到Kafka

接下来,我们将编写Java程序,将1-100的数字消息写入到Kafka中。

导入Maven Kafka POM依赖

  1. <!-- kafka客户端工具 -->
  2. <dependency>
  3. <groupId></groupId>
  4. <artifactId>kafka-clients</artifactId>
  5. <version>2.4.1</version>
  6. </dependency>

创建KafkaProducerTest类。

可以参考以下方式来编写第一个Kafka示例程序

1.创建用于连接Kafka的Properties配置

2.创建一个生产者对象KafkaProducer

3.调用send发送1-100消息到指定Topic test,并获取返回值Future,该对象封装了返回值

4.再调用一个()方法等待响应

5.关闭生产者

参考代码:

  1. import ;
  2. import ;
  3. import ;
  4. import ;
  5. import ;
  6. import ;
  7. public class KafkaProducerTest {
  8. public static void main(String[] args) {
  9. // 1. 创建用于连接Kafka的Properties配置
  10. Properties props=new Properties();
  11. ("","192.168.2.3:9092");
  12. ("acks","all");
  13. ("","");
  14. ("","");
  15. // 2. 创建一个生产者对象KafkaProducer
  16. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
  17. // 3. 调用send发送1-100消息到指定Topic test
  18. for(int i = 0; i < 100; ++i) {
  19. try {
  20. // 获取返回值Future,该对象封装了返回值
  21. Future<RecordMetadata> future = (new ProducerRecord<String, String>("test", null, i + ""));
  22. // 调用一个()方法等待响应
  23. ();
  24. } catch (InterruptedException e) {
  25. ();
  26. } catch (ExecutionException e) {
  27. ();
  28. }
  29. }
  30. // 5. 关闭生产者
  31. ();
  32. }
  33. }

从Kafka的topic中消费消息

从 test topic中,将消息都消费,并将记录的offset、key、value打印出来

创建KafkaConsumerTest类

1.创建Kafka消费者配置

  1. Properties props = new Properties();
  2. ("", "192.168.2.3:9092");
  3. ("", "test");
  4. ("", "true");
  5. ("", "1000");
  6. ("", "");
  7. ("", "");

2.创建Kafka消费者

3.订阅要消费的主题

4.使用一个while循环,不断从Kafka的topic中拉取消息

5.将将记录(record)的offset、key、value都打印出来

参考代码

  1. import ;
  2. import ;
  3. import ;
  4. import ;
  5. import ;
  6. import ;
  7. public class KafkaConsumerTest {
  8. public static void main(String[] args) {
  9. // 1. 创建用于连接Kafka的Properties配置
  10. Properties props = new Properties();
  11. ("", "192.168.2.3:9092");
  12. ("", "test-group");
  13. ("", ()); // 设置键的反序列化器
  14. ("", ()); // 设置值的反序列化器
  15. // 2. 创建一个消费者对象 KafkaConsumer
  16. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  17. // 3. 订阅主题
  18. (("test"));
  19. // 4. 拉取消息
  20. try {
  21. while (true) {
  22. ConsumerRecords<String, String> records = ((100));
  23. (record -> {
  24. ("offset = %d, key = %s, value = %s%n", (), (), ());
  25. });
  26. }
  27. } catch (Exception e) {
  28. ();
  29. } finally {
  30. // 5. 关闭消费者
  31. ();
  32. }
  33. }
  34. }

异步使用带有回调函数方法生产消息

如果我们想获取生产者消息是否成功,或者成功生产消息到Kafka中后,执行一些其他动作。此时,可以很方便地使用带有回调函数来发送消息。

需求:

  1. 在发送消息出现异常时,能够及时打印出异常信息
  2. 在发送消息成功时,打印Kafka的topic名字、分区id、offset
  1. import ;
  2. import ;
  3. import ;
  4. import ;
  5. import ;
  6. import ;
  7. import ;
  8. public class KafkaProducerTest {
  9. public static void main(String[] args) {
  10. // 1. 创建用于连接Kafka的Properties配置
  11. Properties props = new Properties();
  12. ("", "192.168.2.3:9092");
  13. ("acks", "all");
  14. ("", "");
  15. ("", "");
  16. // 2. 创建一个生产者对象KafkaProducer
  17. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
  18. // 3. 调用send发送1-100消息到指定Topic test
  19. for (int i = 0; i < 100; ++i) {
  20. // 一、同步方式
  21. // 获取返回值Future,该对象封装了返回值
  22. // Future<RecordMetadata> future = (new ProducerRecord<String, String>("test", null, i + ""));
  23. // 调用一个()方法等待响应
  24. // ();
  25. // 二、带回调函数异步方式
  26. (new ProducerRecord<String, String>("test", null, i + ""), new Callback() {
  27. @Override
  28. public void onCompletion(RecordMetadata metadata, Exception exception) {
  29. if (exception != null) {
  30. ("发送消息出现异常");
  31. } else {
  32. String topic = ();
  33. int partition = ();
  34. long offset = ();
  35. ("发送消息到Kafka中的名字为" + topic + "的主题,第" + partition + "分区,第" + offset + "条数据成功!");
  36. }
  37. }
  38. });
  39. }
  40. // 5. 关闭生产者
  41. ();
  42. }
  43. }