在Spring Boot 中使用Kafka

时间:2024-07-18 11:12:04

在Spring Boot 中使用Kafka :

1. 添加 Kafka 相关依赖

首先,在 pom.xml 文件中添加 Kafka 相关的依赖。通常使用 spring-kafka 提供的依赖来集成 Kafka。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.version}</version>
</dependency>

请确保 ${spring-kafka.version} 是你所需的 Spring Kafka 版本。

2. 配置 Kafka 连接信息

application.propertiesapplication.yml 中配置 Kafka 的连接信息:

spring.kafka.bootstrap-servers=your-kafka-server:9092
spring.kafka.consumer.group-id=my-group-id

可以根据你的需要配置更多的 Kafka 相关属性,如序列化器、反序列化器等。

3. 创建 Kafka 生产者

使用 Spring Boot 创建 Kafka 生产者的步骤如下:

3.1. 创建 Kafka 生产者配置
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-server:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); // 如果发送 JSON 对象

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上述配置中,producerFactory 方法设置了 Kafka 生产者的配置,包括序列化器。kafkaTemplate 方法创建了一个 KafkaTemplate 用于发送消息。

3.2. 发送消息到 Kafka
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "your-topic-name";

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void sendMessage(Object message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

KafkaProducerService 中,通过 KafkaTemplate 发送消息到指定的 TOPIC

4. 创建 Kafka 消费者

使用 Spring Boot 创建 Kafka 消费者的步骤如下:

4.1. 创建 Kafka 消费者配置
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-server:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); // 如果消费 JSON 对象

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
                new ErrorHandlingDeserializer<>(new JsonDeserializer<>(Object.class)));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

在上述配置中,consumerFactory 方法设置了 Kafka 消费者的配置,包括反序列化器。kafkaListenerContainerFactory 方法创建了一个 ConcurrentKafkaListenerContainerFactory 用于监听 Kafka 消息。

4.2. 创建 Kafka 消费者监听器
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "your-topic-name", groupId = "my-group-id")
    public void listen(Object message) {
        System.out.println("Received Message: " + message.toString());
        // 在这里处理接收到的消息逻辑
    }
}

KafkaConsumerService 中,通过 @KafkaListener 注解监听指定的 TOPIC,并处理接收到的消息。

5. 使用 Kafka

现在,你可以在你的 Spring Boot 应用程序中通过 KafkaProducerService 发送消息,通过 KafkaConsumerService 接收和处理消息了。确保在启动应用程序时 Kafka 已经正常运行,并且配置了正确的连接信息和主题名称。