文章目录
1、消费异常处理
1.1、application.yml配置
1.2、注册异常处理器
1.3、消费者使用异常处理器
1.4、创建生产者发送消息
1.5、创建SpringBoot启动类
1.6、屏蔽 kafka debug 日志 logback.xml
1.7、引入spring-kafka依赖
1.8、消费者控制台:
1.8.1、第一次启动SpringKafkaConsumerApplication
1.8.n、第n次启动SpringKafkaConsumerApplication
1、消费异常处理
1.1、application.yml配置
server :
port : 8120
spring :
Kafka :
bootstrap-servers : 192.168.74.148: 9095 , 192.168.74.148: 9096 , 192.168.74.148: 9097
consumer :
isolation-level : read- committed
auto-commit-interval : 1000
auto-offset-reset : earliest
key-deserializer : org.apache.kafka.common.serialization.StringDeserializer
value-deserializer : org.apache.kafka.common.serialization.StringDeserializer
listener :
ack-mode : manual_immediate
1.2、注册异常处理器
package com. atguigu. spring. kafka. consumer. config ;
import org. apache. kafka. clients. admin. NewTopic ;
import org. apache. kafka. clients. consumer. Consumer ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. kafka. config. TopicBuilder ;
import org. springframework. kafka. listener. ConsumerAwareListenerErrorHandler ;
import org. springframework. kafka. listener. ListenerExecutionFailedException ;
import org. springframework. messaging. Message ;
@Configuration
public class MyKafkaConfig {
@Bean
public NewTopic springTestPartitionTopic ( ) {
return TopicBuilder . name ( "my_topic1" )
. partitions ( 3 )
. replicas ( 3 )
. build ( ) ;
}
@Bean
public ConsumerAwareListenerErrorHandler myErrorHandler ( ) {
return new ConsumerAwareListenerErrorHandler ( ) {
@Override
public Object handleError ( Message < ? > message, ListenerExecutionFailedException e, Consumer < ? , ? > consumer) {
System . out. println ( "出现异常,消息内容:value = " + message. getPayload ( ) ) ;
System . out. println ( "header = " + message. getHeaders ( ) ) ;
System . out. println ( "异常信息:" + e. getMessage ( ) ) ;
System . out. println ( "=================" ) ;
return null ;
}
} ;
}
}
1.3、消费者使用异常处理器
package com. atguigu. spring. kafka. consumer. listener ;
import org. apache. kafka. clients. consumer. ConsumerRecord ;
import org. springframework. kafka. annotation. KafkaListener ;
import org. springframework. kafka. annotation. TopicPartition ;
import org. springframework. kafka. support. Acknowledgment ;
import org. springframework. stereotype. Component ;
@Component
public class MyKafkaListenerAck {
@KafkaListener (
topicPartitions = {
@TopicPartition ( topic = "my_topic1" , partitions = { "0" } ) }
, groupId = "my_group1" , errorHandler = "myErrorHandler" )
public void onMessage1 ( ConsumerRecord < String , String > record, Acknowledgment acknowledgment) {
System . out. println ( "my_group1消费者获取分区0的消息:topic = " + record. topic ( )
+ ",partition:" + record. partition ( )
+ ",offset = " + record. offset ( )
+ ",key = " + record. key ( )
+ ",value = " + record. value ( ) ) ;
int i = 1 / 0 ;
acknowledgment. acknowledge ( ) ;
}
}
1.4、创建生产者发送消息
package com. atguigu. spring. kafka. consumer ;
import jakarta. annotation. Resource ;
import org. junit. jupiter. api. Test ;
import org. springframework. boot. test. context. SpringBootTest ;
import org. springframework. kafka. core. KafkaTemplate ;
@SpringBootTest
class SpringKafkaConsumerApplicationTests {
@Resource
KafkaTemplate kafkaTemplate;
@Test
void contextLoads ( ) {
for ( int i = 0 ; i < 10 ; i++ ) {
kafkaTemplate. send ( "my_topic1" , i% 3 , "" , "指定ack-mode: manual_immediate消费" + i) ;
}
}
}
1.5、创建SpringBoot启动类
package com. atguigu. spring. kafka. consumer ;
import org. springframework. boot. SpringApplication ;
import org. springframework. boot. autoconfigure. SpringBootApplication ;
@SpringBootApplication
public class SpringKafkaConsumerApplication {
public static void main ( String [ ] args) {
SpringApplication . run ( SpringKafkaConsumerApplication . class , args) ;
}
}
1.6、屏蔽 kafka debug 日志 logback.xml
< configuration>
< logger name = " org.apache.kafka.clients" level = " debug" />
</ configuration>
1.7、引入spring-kafka依赖
<?xml version="1.0" encoding="UTF-8"?>
< project xmlns = " http://maven.apache.org/POM/4.0.0" xmlns: xsi= " http://www.w3.org/2001/XMLSchema-instance"
xsi: schemaLocation= " http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" >
< modelVersion> 4.0.0</ modelVersion>
< parent>
< groupId> org.springframework.boot</ groupId>
< artifactId> spring-boot-starter-parent</ artifactId>
< version> 3.0.5</ version>
< relativePath/>
</ parent>
< groupId> com.atguigu</ groupId>
< artifactId> spring-kafka-consumer</ artifactId>
< version> 0.0.1-SNAPSHOT</ version>
< name> spring-kafka-consumer</ name>
< description> spring-kafka-consumer</ description>
< properties>
< java.version> 17</ java.version>
</ properties>
< dependencies>
< dependency>
< groupId> org.springframework.boot</ groupId>
< artifactId> spring-boot-starter</ artifactId>
</ dependency>
< dependency>
< groupId> org.springframework.boot</ groupId>
< artifactId> spring-boot-starter-test</ artifactId>
< scope> test</ scope>
</ dependency>
< dependency>
< groupId> org.springframework.boot</ groupId>
< artifactId> spring-boot-starter-web</ artifactId>
</ dependency>
< dependency>
< groupId> org.springframework.kafka</ groupId>
< artifactId> spring-kafka</ artifactId>
</ dependency>
</ dependencies>
< build>
< plugins>
< plugin>
< groupId> org.springframework.boot</ groupId>
< artifactId> spring-boot-maven-plugin</ artifactId>
</ plugin>
</ plugins>
</ build>
</ project>
1.8、消费者控制台:
1.8.1、第一次启动SpringKafkaConsumerApplication
. ____ _ __ _ _
/\ \ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | ' _ | '_| | ' _ \ / _` | \ \ \ \
\ \ / ___) | | _) | | | | | || ( _| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.0.5)
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=91c96206-2381-5fa1-b391-52627056762f, timestamp=1717672488753}
异常信息:Listener method ' public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1( org.apache.kafka.clients.consumer.ConsumerRecord< java.lang.String, java.lang.String> ,org.springframework.kafka.support.Acknowledgment) ' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=8755d954-615c-37ff-67f0-85521e090b03, timestamp=1717672488753}
异常信息:Listener method ' public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1( org.apache.kafka.clients.consumer.ConsumerRecord< java.lang.String, java.lang.String> ,org.springframework.kafka.support.Acknowledgment) ' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=68082673-07d2-6f69-3935-c7034a7caf81, timestamp=1717672488753}
异常信息:Listener method ' public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1( org.apache.kafka.clients.consumer.ConsumerRecord< java.lang.String, java.lang.String> ,org.springframework.kafka.support.Acknowledgment) ' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=bcbda053-9536-df91-d937-2688b5d4c6ea, timestamp=1717672488754}
异常信息:Listener method ' public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1( org.apache.kafka.clients.consumer.ConsumerRecord< java.lang.String, java.lang.String> ,org.springframework.kafka.support.Acknowledgment) ' threw exception
== == == == == == == == =
1.8.n、第n次启动SpringKafkaConsumerApplication
. ____ _ __ _ _
/\ \ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | ' _ | '_| | ' _ \ / _` | \ \ \ \
\ \ / ___) | | _) | | | | | || ( _| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.0.5)
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 0,key = ,value = 指定ack-mode: manual_immediate消费0
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1717672170845, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费0)
header = {id=6bc9edb9-ad1c-e00e-9b27-c0c540248091, timestamp=1717672854508}
异常信息:Listener method ' public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1( org.apache.kafka.clients.consumer.ConsumerRecord< java.lang.String, java.lang.String> ,org.springframework.kafka.support.Acknowledgment) ' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 1,key = ,value = 指定ack-mode: manual_immediate消费3
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费3)
header = {id=14e6951b-b25f-10ca-702c-a1699d25645b, timestamp=1717672854508}
异常信息:Listener method ' public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1( org.apache.kafka.clients.consumer.ConsumerRecord< java.lang.String, java.lang.String> ,org.springframework.kafka.support.Acknowledgment) ' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 2,key = ,value = 指定ack-mode: manual_immediate消费6
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费6)
header = {id=26e85ef8-2502-fd29-8d5b-091ec0362900, timestamp=1717672854509}
异常信息:Listener method ' public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1( org.apache.kafka.clients.consumer.ConsumerRecord< java.lang.String, java.lang.String> ,org.springframework.kafka.support.Acknowledgment) ' threw exception
=================
my_group1消费者获取分区0的消息:topic = my_topic1,partition:0,offset = 3,key = ,value = 指定ack-mode: manual_immediate消费9
出现异常,消息内容:value = ConsumerRecord(topic = my_topic1, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1717672170855, serialized key size = 0, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = , value = 指定ack-mode: manual_immediate消费9)
header = {id=25c6b6c8-d02d-2091-7c6f-ebaea6c52d1d, timestamp=1717672854509}
异常信息:Listener method ' public void com.atguigu.spring.kafka.consumer.listener.MyKafkaListenerAck.onMessage1( org.apache.kafka.clients.consumer.ConsumerRecord< java.lang.String, java.lang.String> ,org.springframework.kafka.support.Acknowledgment) ' threw exception
== == == == == == == == =
此时如果不关闭SpringKafkaConsumerApplication,生产者继续发送消息,消费者只会往后消费,不会从头再次消费