springboot + kafka 入门实例 入门demo
版本说明
- springboot版本:2.3.3.RELEASE
- kakfa服务端版本:kafka_2.12-2.6.0.tgz
- zookeeper服务端版本:apache-zookeeper-3.6.1-bin.tar.gz
实例搭建前提条件
1,搭建好zookeeper服务,本实例zookeeper使用单机伪集群模式,
192.168.1.126:2181, 192.168.1.126:2182, 192.168.1.126:2183
2,搭建好kafka服务,本实例kafka使用单机伪集群模式,
192.168.1.126:9092, 192.168.1.126:9093, 192.168.1.126:9094
1. 导入相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>springboot-kafka-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>springboot-kafka-demo</name>
<description>springboot-kafka-demo</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.54</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. yml配置
server:
port: 8080
servlet:
context-path: /
tomcat:
uri-encoding: UTF-8
spring:
kafka:
#本地虚拟机kafka伪集群
bootstrap-servers: 192.168.1.126:9092,192.168.1.126:9093,192.168.1.126:9094
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
batch-size: 65536
buffer-memory: 524288
#自定义的topic
myTopic1: testTopic1
myTopic2: testTopic2
consumer:
group-id: default-group #默认组id 后面会配置多个消费者组
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
enable-auto-commit: false #关闭自动提交 改由spring-kafka提交
auto-commit-interval: 100
max-poll-records: 20 #批量消费 一次接收的最大数量
3. 部分代码
消息实体类
package com.example.demo.entity;
import java.util.Date;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class Message {
private Long id;
private String msg;
private Date sendTime;
}
kafka配置类
package com.example.demo.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* kafka配置类
*/
@Data
@Configuration
public class KafkaConfiguration {
/**
* kafaka集群列表
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* kafaka消费group列表
*/
@Value("${spring.kafka.consumer.group-id}")
private String defaultGroupId;
/**
* 消费开始位置
*/
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
/**
* 是否自动提交
*/
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String enableAutoCommit;
/**
* #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
*/
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitInterval;
/**
* 一次调用poll()操作时返回的最大记录数,默认值为500
*/
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
/**
* 自定义的topic1
*/
@Value("${spring.kafka.producer.myTopic1}")
private String myTopic1;
/**
* 自定义的topic2
*/
@Value("${spring.kafka.producer.myTopic2}")
private String myTopic2;
}
消费者监听类
package com.example.demo.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* 消费者1(监听topic1队列)
*/
@Component
public class ConsumerListener1 {
@KafkaListener(topics = "${spring.kafka.producer.myTopic1}")
public void listen(ConsumerRecord<?,String> record) {
System.out.println(record);
String value = record.value();
System.out.println("消费者1接收到消息:" + value);
}
}
测试类
package com.example.demo.controller;
import com.alibaba.fastjson.JSON;
import com.example.demo.config.KafkaConfiguration;
import com.example.demo.entity.Message;
import com.example.demo.service.KafkaService;
import com.example.demo.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaService kafkaService;
@Autowired
private KafkaConfiguration kafkaConfiguration;
/**
* 发送文本消息
* @param msg
* @return
*/
@GetMapping("/send/{msg}")
public String send(@PathVariable String msg) {
kafkaService.send(kafkaConfiguration.getMyTopic1(), msg);
return "生产者发送消息给topic1:"+msg;
}
/**
* 发送JSON数据
* @return
*/
@GetMapping("/send2")
public String send2() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg("生产者发送消息到topic1: " + UUID.getUUID32());
message.setSendTime(new Date());
String value = JSON.toJSONString(message);
log.info("生产者发送消息到topic1 message = {}", value);
kafkaService.send(kafkaConfiguration.getMyTopic1(),value);
return value;
}
/**
* 发送JSON数据
* @return
*/
@GetMapping("/send3")
public String send3() {
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg("生产者发送消息到topic2: " + UUID.getUUID32());
message.setSendTime(new Date());
String value = JSON.toJSONString(message);
log.info("生产者发送消息到topic2 message = {}", value);
kafkaService.send(kafkaConfiguration.getMyTopic2(),value);
return value;
}
}
4. 实例运行结果
5. 写在最后
本实例源代码:https://gitee.com/jelly_oy/springboot-kafka-demo
本实例采用springboot2.3.3 + zookeeper3.6.1 + kafka2.6.0 进行搭建
如果本项目对你有帮助,欢迎留言评论,欢迎git clone源代码。
springboot + kafka 入门实例 入门demo的更多相关文章
-
springboot + mybatisPlus 入门实例 入门demo
springboot + mybatisPlus 入门实例 入门demo 使用mybatisPlus的优势 集成mybatisplus后,简单的CRUD就不用写了,如果没有特别的sql,就可以不用ma ...
-
基于springboot构建dubbo的入门demo
之前记录了构建dubbo入门demo所需的环境以及基于普通maven项目构建dubbo的入门案例,今天记录在这些的基础上基于springboot来构建dubbo的入门demo:众所周知,springb ...
-
vue入门 0 小demo (挂载点、模板、实例)
vue入门 0 小demo (挂载点.模板) 用直接的引用vue.js 首先 讲几个基本的概念 1.挂载点即el:vue 实例化时 元素挂靠的地方. 2.模板 即template:vue 实例化时挂 ...
-
.net core kafka 入门实例 一篇看懂
kafka 相信都有听说过,不管有没有用过,在江湖上可以说是大名鼎鼎,就像天龙八部里的乔峰.国际惯例,先介绍生平事迹 简介 Kafka 是由 Apache软件基金会 开发的一个开源流处理平台, ...
-
wxPython中文教程入门实例
这篇文章主要为大家分享下python编程中有关wxPython的中文教程,分享一些wxPython入门实例,有需要的朋友参考下 wxPython中文教程入门实例 wx.Window 是一个基类 ...
-
Omnet++ 4.0 入门实例教程
http://blog.sina.com.cn/s/blog_8a2bb17d01018npf.html 在网上找到的一个讲解omnet++的实例, 是4.0下面实现的. 我在4.2上试了试,可以用. ...
-
SpringBoot系列: RestTemplate 快速入门
====================================相关的文章====================================SpringBoot系列: 与Spring R ...
-
【React】入门实例
React 可以灵活的应用在各种各样的项目中.你可以用它来创建新的应用程序,你也可以逐步引用而不改变现有的代码库. React 起源于 Facebook 的内部项目,因为该公司对市场上所有 JavaS ...
-
React 入门实例
React 入门实例教程 一.安装 React 的安装包,可以到官网下载. $ git clone git@github.com:ruanyf/react-demos.git 如果你没安装 git, ...
随机推荐
-
android 定时器AlarmManager
1.android中通常是使用AlarmManager来定时启动一个单次或重复多次操作的.具体的说就是我们通过AlarmManager设定一个时间和注册一个intent到系统中,然后在该时间到来时,系 ...
-
HDU 5607 graph 矩阵快速幂 + 快速幂
这道题得到了学长的助攻,其实就是一个马尔科夫链,算出一步转移矩阵进行矩阵快速幂就行了,无奈手残 这是我第一回写矩阵快速幂,写的各种毛病,等到调完了已经8点44了,交了一发,返回PE,(发现是少了换行) ...
-
linux学习之八---Linux进程基础知识
一.linux进程 linux是一个多用户多任务的操作系统. 多用户是指多个用户能够在同一时间使用计算机. 多任务是指linux能够同一时候运行几个任务. 进程简单来说就是执行中的程序,Linux系统 ...
-
C++ tree(1)
建立与基本操作 .有关二叉树的相关概念,这里不再赘述,假设不了解二叉树相关概念,建议先学习数据结构中的二叉树的知识点 准备数据 定义二叉树结构操作中须要用到的变量及数据等. #define MAXLE ...
-
cmake 递归依赖
现在有3个模块:main.service.base,main依赖service的service.h.service依赖base的base.h,怎么写CMakeList.txt避免main直接耦合bas ...
-
解决微信小程序video属性controls失效问题
<view class="VideoBox"> <video class='myVideo' id="myVideo01" src=" ...
-
使用Apache JMeter对SQL Server、Mysql、Oracle压力测试(二)
接着第一篇的写: 第三步,测试SQL Server数据库的性能: a.加载JDBC SQL Server驱动.添加线程组和设置线程属性和第二步一样,就不再赘述了: b.设置JDBC Connectio ...
-
HDU 2062:Subset sequence(思维)
Subset sequence Time Limit: 1000/1000 MS (Java/Others) Memory Limit: 32768/32768 K (Java/Others) Tot ...
-
(其他)最常用的15大Eclipse开发快捷键技巧
转自CSDNJava我人生(陈磊兴) 原文出处 引言 做java开发的,经常会用Eclipse或者MyEclise集成开发环境,一些实用的Eclipse快捷键和使用技巧,可以在平常开发中节约出很多 ...
-
18位身份证验证(Java)加入身份证输入验证是否满足18位代码(修订稿)
package day20181016; /** * 身份证的验证 34052419800101001X * */ import java.util.Scanner; public class Zuo ...