springboot+netty+mqtt服务端实现

时间:2025-04-02 21:51:46
/** * @author: zhouwenjie * @description: 对接收到的消息进行业务处理 * @create: 2023-04-07 16:29 * CONNECT 1 C->S 客户端请求与服务端建立连接 (服务端接收) * CONNACK 2 S->C 服务端确认连接建立(客户端接收) * PUBLISH 3 CóS 发布消息 (服务端接收【QoS(服务质量等级) 0级别,最多分发一次】)-->生产者只会发送一次消息,不关心消息是否被代理服务端或消费者收到 * PUBACK 4 CóS 收到发布消息确认(客户端接收【QoS 1级别,至少分发一次】) -->保证消息发送到服务端(也就是代理服务器broker),如果没收到或一定时间没收到服务端的ack,就会重发消息 * PUBREC 5 CóS 收到发布消息(客户端接收【QoS 2级别】)| * PUBREL 6 CóS 释放发布消息(服务端接收【QoS 2级别】)|只分发一次消息,且保证到达 -->这三步保证消息有且仅有一次传递给消费者 * PUBCOMP 7 CóS 完成发布消息(客户端接收【QoS 2级别】)| * SUBSCRIBE 8 C->S 订阅请求(服务端接收) * SUBACK 9 S->C 订阅确认(客户端接收) * UNSUBSCRIBE 10 C->S 取消订阅(服务端接收) * UNSUBACK 11 S->C 取消订阅确认(客户端接收) * PINGREQ 12 C->S 客户端发送PING(连接保活)命令(服务端接收) * PINGRESP 13 S->C PING命令回复(客户端接收) * DISCONNECT 14 C->S 断开连接 (服务端接收) * <p> * 注意:在我们发送消息的时候,一定要确认好等级,回复确认的消息统一设置为qos=0; * 比如,我们需要发送最高等级的消息就将qos设置为2,当我们接收方收到这个等级的消息的时候判断一下等级,设置好消息类型,然后qos因为是回复消息, * 所以全部设置成0,而当发送端收到消息之后,比如PUBREC,那么我们还将发送(注意不是回复)PUBREL给接收端,希望对方回复确认一下,所以qos设置为1。 * 综上可知:发送端没有回复确认消息之说,只有发送消息,接收端没有发送消息之说,只有回复确认消息,搞清楚这个概念,在设置参数的时候就明了。 **/ @Slf4j @Component public class MqttMsgBack { @Value("${.user_name}") private String userName; @Value("${}") private String password; @Value("${.wait_time}") private long waitTime; /** * 功能描述:存放主题和其订阅的客户端集合 */ public static final ConcurrentHashMap<String, HashSet<String>> subMap = new ConcurrentHashMap<String, HashSet<String>>(); /** * 功能描述:存放订阅是的服务质量等级,只有发送小于或等于这个服务质量的消息给订阅者 */ public static final ConcurrentHashMap<String, MqttQoS> qoSMap = new ConcurrentHashMap<String, MqttQoS>(); /** * 功能描述:存放客户端和其所订阅的主题集合,用来在客户端断开的时候删除订阅中的客户端 */ public static final ConcurrentHashMap<String, Set<String>> ctMap = new ConcurrentHashMap<String, Set<String>>(); /** * 功能描述:存放需要缓存的消息,一边发送给新订阅的客户端 */ public static final ConcurrentHashMap<String, MqttPublishMessage> cacheRetainedMessages = new ConcurrentHashMap<String, MqttPublishMessage>(); /** * 功能描述:缓存需要重复发送的消息,以便在收到ack的时候将消息内存释放掉 */ public static final ConcurrentHashMap<String, ByteBuf> cacheRepeatMessages = new ConcurrentHashMap<String, ByteBuf>(); /** * 确认连接请求 * * @param ctx * @param mqttMessage */ public void connectionAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttConnectMessage mqttConnectMessage; try { mqttConnectMessage = (MqttConnectMessage) mqttMessage; //获取连接者的ClientId String clientIdentifier = mqttConnectMessage.payload().clientIdentifier(); //查询用户名密码是否正确 String userNameNow = mqttConnectMessage.payload().userName(); String passwordNow = mqttConnectMessage.payload().password(); if (userName.equals(userNameNow) && password.equals(passwordNow)) { MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader(); MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader(); //构建返回报文, 可变报头 MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession()); //构建返回报文, 固定报头 至多一次(至少—次,只有一次) MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK, mqttFixedHeaderInfo.isDup(), AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); //构建连接回复消息体 MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack); ctx.writeAndFlush(connAck); //设置节点名 InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); log.info("终端登录成功,ID号:{},IP信息:{},终端号:{}", clientIdentifier, address.getHostString(), address.getPort()); } else { //如果用户名密码错误则提示对方 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0x02); MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false); MqttConnAckMessage mqttConnAckMessage = new MqttConnAckMessage(fixedHeader, variableHeader); ctx.writeAndFlush(mqttConnAckMessage); ctx.close(); log.error("连接失败:" + MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.name()); } } catch (ClassCastException e) { //转换失败,对方发送的协议版本不兼容 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0x02); MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false); MqttConnAckMessage mqttConnAckMessage = new MqttConnAckMessage(fixedHeader, variableHeader); ctx.writeAndFlush(mqttConnAckMessage); ctx.close(); e.printStackTrace(); log.error("连接失败:" + MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION.name()); } } /** * 根据qos发布确认 * 1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。 * 0:仅仅为当前订阅者推送此消息。 * 假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、那么就代表需要将这个缓存消息删除掉,不再继续推送给新订阅者,前提是一定保证空消息体。 * 两个条件需要对应上才能删除,消息体为空、主题名称对应. * 注意:对于每个主题,只能保留一条消息。当发布一个带有RETAIN标志的新消息时,它将替换上一条保留的消息,因此在同一主题下只会存在一条保留消息。 * * @param ctx * @param mqttMessage 拷贝的数据需要释放,其他的消息,比如连接消息,不用转发,会在handler中自动释放 */ public void publishAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage; MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader(); MqttQoS qos = mqttFixedHeaderInfo.qosLevel(); //得到主题 MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader(); String topicName = variableHeader.topicName(); //将消息发送给订阅的客户端 ByteBuf byteBuf = mqttPublishMessage.payload(); HashSet<String> set = subMap.get(topicName); if (set != null) { for (String channelId : set) { ChannelHandlerContext context = ServerMqttHandler.clientMap.get(channelId); if (context != null && context.channel().isActive()) { MqttQoS cacheQos = qoSMap.get(topicName + "-" + channelId); if (cacheQos != null && qos.value() <= cacheQos.value()) { // retainedDuplicate()增加引用计数器,不至于后续操作byteBuf出现错误,引用计数器为0的情况,这里会清除retainedDuplicate的操作有: // SimpleChannelInboundHandler处理器、编码器 MqttEncoder、和最后确认的时候释放,所以每次操作消息之前,先进行一次retainedDuplicate byteBuf.retainedDuplicate(); context.writeAndFlush(mqttPublishMessage); if (qos == AT_LEAST_ONCE || qos == EXACTLY_ONCE) { //只发送服务质量等级小于等于订阅时客户端指定的服务质量等级 // 创建一个新的缓冲区 byteBuf.retainedDuplicate(); //防止内存溢出,最后在消息被ack或者客户端断开掉线的时候,拿到并进行释放 cacheRepeatMessages.put(channelId, byteBuf); cachePublishMsg(qos, byteBuf, variableHeader, mqttFixedHeaderInfo, context); } } } else { if (context != null) { InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); log.error(address.getHostString() + "转发订阅消息提醒:客户端连接异常~"); } //防止客户端频繁上下线导致id变化,带来不必要的空指针 ServerMqttHandler.clientMap.remove(channelId); //删除订阅主题 Set<String> topicSet = ctMap.get(channelId); if (CollUtil.isNotEmpty(topicSet)) { for (String topic : topicSet) { if (subMap != null) { HashSet<String> ids = subMap.get(topic); if (CollUtil.isNotEmpty(ids)) { ids.remove(channelId); if (CollUtil.isEmpty(ids)) { subMap.remove(topic); } } } if (qoSMap != null) { qoSMap.remove(topic + "-" + channelId); } } } ctMap.remove(channelId); } } } // 缓存消息给后订阅的客户端 boolean retain = mqttFixedHeaderInfo.isRetain(); if (retain) { if (byteBuf.readableBytes() > 0) { byteBuf.retainedDuplicate(); cacheRetainedMessages.put(topicName, mqttPublishMessage); } else { MqttPublishMessage message = cacheRetainedMessages.get(topicName); if (message != null) { cacheRetainedMessages.remove(topicName); // 这里需要手动删除ByteBuf缓存,因为一直会有一份缓存在内存中备用 boolean release = ReferenceCountUtil.release(message); log.info("缓存消息给后订阅的客户端释放成功失败:{}", release); } } } //返回消息给发送端 switch (qos) { //至多一次 case AT_MOST_ONCE: break; //至少一次 case AT_LEAST_ONCE: //构建返回报文, 可变报头 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId()); //构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(PUBACK, mqttFixedHeaderInfo.isDup(), AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02); //构建PUBACK消息体 MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack); ctx.writeAndFlush(pubAck); break; //刚好一次 case EXACTLY_ONCE: //构建返回报文,固定报头 MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0x02); //构建返回报文,可变报头 MqttPubReplyMessageVariableHeader mqttPubReplyMessageVariableHeader = new MqttPubReplyMessageVariableHeader(mqttPublishMessage.variableHeader().packetId(), MqttPubReplyMessageVariableHeader.REASON_CODE_OK, MqttProperties.NO_PROPERTIES); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2, mqttPubReplyMessageVariableHeader); ctx.writeAndFlush(mqttMessageBack); break; default: break; } } private void cachePublishMsg(MqttQoS qos, ByteBuf byteBuf, MqttPublishVariableHeader variableHeader, MqttFixedHeader mqttFixedHeaderInfo, ChannelHandlerContext context) { //缓存一份消息,规定时间内没有收到ack,用作重发,重发时将isDup设置为true,代表重复消息 MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, true, qos, false, mqttFixedHeaderInfo.remainingLength()); MqttPublishMessage cachePubMessage = new MqttPublishMessage(fixedHeader, variableHeader, byteBuf); ScheduledFuture<?> scheduledFuture = TimerData.scheduledThreadPoolExecutor.scheduleAtFixedRate(new MonitorMsgTime(variableHeader.packetId(), cachePubMessage, context), waitTime, waitTime, TimeUnit.MILLISECONDS); TimerData.scheduledFutureMap.put(variableHeader.packetId(), scheduledFuture); } /** * 发布完成 qos2 * * @param ctx * @param mqttMessage */ public void publishComp(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); //构建返回报文, 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0x02); //构建返回报文, 可变报头 MqttPubReplyMessageVariableHeader mqttPubReplyMessageVariableHeader = new MqttPubReplyMessageVariableHeader(messageIdVariableHeader.messageId(), MqttPubReplyMessageVariableHeader.REASON_CODE_OK, MqttProperties.NO_PROPERTIES); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttPubReplyMessageVariableHeader); ctx.writeAndFlush(mqttMessageBack); } /** * 订阅确认 * 订阅和取消订阅没有qos2级别,默认就是1级别 * 需要存储订阅主题和客户端、客户端和订阅主题、验证防止重复订阅、发送缓存消息 * * @param ctx * @param mqttMessage */ public void subscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage; MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader(); //构建返回报文, 可变报头 MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); MqttSubscribePayload subscribePayload = mqttSubscribeMessage.payload(); List<MqttTopicSubscription> mqttTopicSubscriptions = subscribePayload.topicSubscriptions(); List<Integer> grantedQoSLevels = new ArrayList<Integer>(); String id = ctx.channel().id().toString(); //存储客户端订阅的主题集合,断开或者异常连接时,删除订阅ctMap和subMap里的值 Set<String> topicSet = ctMap.get(id); if (topicSet == null) { topicSet = new HashSet<String>(); } for (MqttTopicSubscription subscription : mqttTopicSubscriptions) { String topicName = subscription.topicName(); HashSet<String> contexts = subMap.get(topicName); if (contexts == null) { contexts = new HashSet<String>(); } //先判断主题是否已经订阅过了,防止重复订阅 boolean isSub = contexts.contains(topicName); if (!isSub) { MqttQoS qos = subscription.option().qos(); //存储主题被订阅的客户端集合 contexts.add(id); qoSMap.put(topicName + "-" + id, qos); subMap.put(topicName, contexts); //存储客户端订阅的主题集合 topicSet.add(topicName); } //存储客户端订阅的主题集合 int value = subscription.qualityOfService().value(); grantedQoSLevels.add(value); } // 构建返回报文 有效负载 MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels); // 构建返回报文 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); // 构建返回报文 订阅确认 MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack, variableHeaderBack, payloadBack); ctx.writeAndFlush(subAck); for (String topic : topicSet) { MqttQoS cacheQos = qoSMap.get(topic + "-" + id); //查看订阅的主题是否需要需要发送消息 MqttPublishMessage mqttMsg = cacheRetainedMessages.get(topic); if (mqttMsg != null) { MqttFixedHeader mqttFixedHeaderInfo = mqttMsg.fixedHeader(); MqttQoS qos = mqttFixedHeaderInfo.qosLevel(); if (cacheQos != null && qos.value() <= cacheQos.value()) { if (mqttMsg != null) { MqttPublishVariableHeader variableHeader = mqttMsg.variableHeader(); ByteBuf payload = mqttMsg.payload(); //引用计数器增加 payload.retainedDuplicate(); ctx.writeAndFlush(mqttMsg); // 开启消息重发机制 if (qos == AT_LEAST_ONCE || qos == EXACTLY_ONCE) { //引用计数器增加 payload.retainedDuplicate(); cacheRepeatMessages.put(id, payload); cachePublishMsg(qos, payload, variableHeader, mqttFixedHeaderInfo, ctx); } } } } } } /** * 取消订阅确认 * * @param ctx * @param mqttMessage */ public void unsubscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttUnsubscribeMessage mqttUnsubscribeMessage = (MqttUnsubscribeMessage) mqttMessage; MqttMessageIdVariableHeader messageIdVariableHeader = mqttUnsubscribeMessage.variableHeader(); // 构建返回报文 可变报头 MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId()); // 构建返回报文 固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0x02); // 构建返回报文 取消订阅确认 MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack, variableHeaderBack); log.info("取消订阅回复:{}", unSubAck); //删除本地订阅客户端 String id = ctx.channel().id().toString(); List<String> topics = mqttUnsubscribeMessage.payload().topics(); Set<String> topicSet = ctMap.get(id); for (String topic : topics) { if (subMap != null) { HashSet<String> ids = subMap.get(topic); if (CollUtil.isNotEmpty(ids)) { ids.remove(id); if (CollUtil.isEmpty(ids)) { subMap.remove(topic); } } } if (qoSMap != null) { qoSMap.remove(topic + "-" + id); } if (CollUtil.isNotEmpty(topicSet)) { topicSet.remove(topic); } } ctx.writeAndFlush(unSubAck); } /** * 心跳响应 * * @param ctx * @param mqttMessage */ public void pingResp(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessage mqttMessageBack = new MqttMessage(fixedHeader); log.info("心跳回复:{}", mqttMessageBack.toString()); ctx.writeAndFlush(mqttMessageBack); } /** * ------------------------------------------------------服务端作为发送消息端可能会接收的事件---------------------------------------------------------------- * <p> * 收到接收方消息确认,qos>1的情况,应该删除消息缓存(缓存消息保存到线程定时中了,循环发送取消,消息缓存也没有了),取消消息重发机制 * * @param ctx * @param mqttMessage */ public void receivePubAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttFixedHeader fixedHeader = mqttMessage.fixedHeader(); MqttMessageType messageType = fixedHeader.messageType(); if (messageType == PUBACK) { MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); int messageId = variableHeader.messageId(); //等级为1的情况,直接删除原始消息,取消消息重发机制 ScheduledFuture<?> scheduledFuture = TimerData.scheduledFutureMap.remove(messageId); if (scheduledFuture != null) { scheduledFuture.cancel(true); } //移除消息记录 ByteBuf byteBuf = cacheRepeatMessages.remove(ctx.channel().id().toString()); if (byteBuf != null) { // 释放内存 byteBuf.release(); } } if (messageType == PUBREC) { //等级为2的情况,收到PUBREC报文消息,先停止消息重发机制,再响应一个PUBREL报文并且构建消息重发机制 MqttPubReplyMessageVariableHeader variableHeader = (MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader(); int messageId = variableHeader.messageId(); //构建返回报文,固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0); //构建返回报文,可变报头 MqttPubReplyMessageVariableHeader mqttPubReplyMessageVariableHeader = new MqttPubReplyMessageVariableHeader(messageId, MqttPubReplyMessageVariableHeader.REASON_CODE_OK, MqttProperties.NO_PROPERTIES); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttPubReplyMessageVariableHeader); //删除初始消息重发机制 ScheduledFuture<?> scheduledFuture = TimerData.scheduledFutureMap.remove(messageId); if (scheduledFuture != null) { scheduledFuture.cancel(true); } //释放消息缓存 ByteBuf byteBuf = cacheRepeatMessages.remove(ctx.channel().id().toString()); if (byteBuf != null) { byteBuf.release(); } ctx.writeAndFlush(mqttMessageBack); //重发机制要放在最下方,否则,一旦出错,会多次出发此机制 cachePubrelMsg(messageId, ctx); } } private void cachePubrelMsg(int messageId, ChannelHandlerContext context) { //缓存一份消息,规定时间内没有收到ack,用作重发,重发时将isDup设置为true,代表重复消息 //构建返回报文,固定报头 MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBREL, true, AT_LEAST_ONCE, false, 0); //构建返回报文,可变报头 MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageId); MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack); ScheduledFuture<?> scheduledFuture = TimerData.scheduledThreadPoolExecutor.scheduleAtFixedRate(new MonitorMsgTime(messageId, mqttMessageBack, context), waitTime, waitTime, TimeUnit.MILLISECONDS); TimerData.scheduledFutureMap.put(messageId, scheduledFuture); } /** * 功能描述: 接收到最后一次确认,取消上次PUBREL的消息重发机制 * * @param ctx * @param mqttMessage * @return void * @author zhouwenjie * @date 2023/6/9 16:00 */ public void receivePubcomp(ChannelHandlerContext ctx, MqttMessage mqttMessage) { MqttPubReplyMessageVariableHeader variableHeader = (MqttPubReplyMessageVariableHeader) mqttMessage.variableHeader(); int messageId = variableHeader.messageId(); ScheduledFuture<?> scheduledFuture = TimerData.scheduledFutureMap.remove(messageId); if (scheduledFuture != null) { scheduledFuture.cancel(true); } ByteBuf byteBuf = cacheRepeatMessages.remove(ctx.channel().id().toString()); if (byteBuf != null) { byteBuf.release(); } } }