在Spring Boot中集成RocketMQ后,通过初始化代码来自动创建主题和消费分组
@Component
public class RocketMQInitializer implements CommandLineRunner {
@Autowired
private DefaultMQProducer producer;
@Override
public void run(String... args) throws Exception {
createTopic("yourTopic", 8, 1);
createConsumerGroup("yourConsumerGroup");
}
private void createTopic(String topic, int queueNum, int topicReplica) throws MQClientException {
MQAdminExt admin = new DefaultMQAdminExt();
admin.setNamesrvAddr(producer.getNamesrvAddr());
try {
admin.start();
ClusterInfo clusterInfo = admin.examineBrokerClusterInfo();
TopicRouteData topicRouteData = admin.examineTopicRouteInfo(topic);
if (topicRouteData == null || topicRouteData.getBrokerDatas().isEmpty()) {
throw new MQClientException("Topic route data is empty");
}
BrokerData brokerData = topicRouteData.getBrokerDatas().get(0);
String brokerName = brokerData.getBrokerName();
String brokerAddr = clusterInfo.getBrokerAddrTable().get(brokerName).getBrokerAddrs().get(0);
admin.createTopic(brokerAddr, topic, queueNum, topicReplica);
System.out.println("Topic created successfully");
} finally {
admin.shutdown();
}
}
private void createConsumerGroup(String consumerGroup) throws MQClientException {
MQAdminExt admin = new DefaultMQAdminExt();
admin.setNamesrvAddr(producer.getNamesrvAddr());
try {
admin.start();
TopicList topicList = admin.fetchAllTopicList();
TopicStatsTable topicStatsTable = admin.examineTopicStats(topicList.getTopicList());
if (topicStatsTable.getOffsetTable().isEmpty()) {
throw new MQClientException("Topic stats table is empty");
}
for (String topic : topicStatsTable.getOffsetTable().keySet()) {
admin.createOrUpdateConsumerOffset(consumerGroup, topic, 0L, false);
}
System.out.println("Consumer group created successfully");
} finally {
admin.shutdown();
}
}
}