spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
4. 创建Kafka Producer
创建一个用于发送消息的Kafka Producer:
java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
5. 创建Kafka Consumer
创建一个用于接收消息的Kafka Consumer:
java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "your-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
请确保your-topic是您想要监听的Kafka主题名称,并且与Producer中发送消息的主题相匹配。
- 运行和测试
运行Spring Boot应用程序,并尝试发送和接收消息。您可以在Controller、Service或任何其他地方注入KafkaProducerService并调用sendMessage方法。
java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
private final KafkaProducerService kafkaProducerService;
@Autowired
public TestController(KafkaProducerService kafkaProducerService) {
this.kafkaProducerService = kafkaProducerService;
}
@GetMapping("/send")
public String sendTestMessage() {
kafkaProducerService.sendMessage("your-topic", "Hello, Kafka!");
return "Message sent!";
}
}
访问/send端点,您应该能够在Consumer的控制台上看到接收到的消息。
这个简化版的教程应该能帮助您快速上手Spring Boot与Kafka的集成。如果您需要进行更高级的配置或定制,建议查阅Spring Kafka和Spring Boot的官方文档。