在Spring Boot中集成RocketMQ后,通过初始化代码来自动创建主题和消费分组

时间:2025-02-21 08:29:36
@Component public class RocketMQInitializer implements CommandLineRunner { @Autowired private DefaultMQProducer producer; @Override public void run(String... args) throws Exception { createTopic("yourTopic", 8, 1); createConsumerGroup("yourConsumerGroup"); } private void createTopic(String topic, int queueNum, int topicReplica) throws MQClientException { MQAdminExt admin = new DefaultMQAdminExt(); admin.setNamesrvAddr(producer.getNamesrvAddr()); try { admin.start(); ClusterInfo clusterInfo = admin.examineBrokerClusterInfo(); TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topic); if (topicRouteData == null || topicRouteData.getBrokerDatas().isEmpty()) { throw new MQClientException("Topic route data is empty"); } BrokerData brokerData = topicRouteData.getBrokerDatas().get(0); String brokerName = brokerData.getBrokerName(); String brokerAddr = clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs().get(0); admin.createTopic(brokerAddr, topic, queueNum, topicReplica); System.out.println("Topic created successfully"); } finally { admin.shutdown(); } } private void createConsumerGroup(String consumerGroup) throws MQClientException { MQAdminExt admin = new DefaultMQAdminExt(); admin.setNamesrvAddr(producer.getNamesrvAddr()); try { admin.start(); TopicList topicList = admin.fetchAllTopicList(); TopicStatsTable topicStatsTable = admin.examineTopicStats(topicList.getTopicList()); if (topicStatsTable.getOffsetTable().isEmpty()) { throw new MQClientException("Topic stats table is empty"); } for (String topic : topicStatsTable.getOffsetTable().keySet()) { admin.createOrUpdateConsumerOffset(consumerGroup, topic, 0L, false); } System.out.println("Consumer group created successfully"); } finally { admin.shutdown(); } } }