springboot整合mqtt实现消息订阅和推送

时间:2025-03-10 12:38:43
import com.alibaba.fastjson.JSONObject; import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import org.tio.utils.buffer.ByteBufferUtil; /** * @author tarzan */ @Component @Slf4j public class SimulationSubscriber { @MqttClientSubscribe("tuoyuan/publish/zj/#") public void zjOne(String topic, byte[] payload){ String[] strs=topic.split("/"); String ID=strs[strs.length-1]; log.info("topic:{} payload:{} ID:{}", topic, new String(payload, StandardCharsets.UTF_8),ID); } @MqttClientSubscribe("/sys/${deviceName}/thing/sub/register") public void thingSubRegister(String topic, byte[] payload) { // 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 + // 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。 logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8)); } @MqttClientSubscribe("/tianma/publish/cmj") public void cmj(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/zj") public void zj(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/gbj") public void gbj(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/ltl") public void ltl(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/ntl") public void ntl(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } @MqttClientSubscribe("/tianma/publish/ccl") public void ccl(@Header("topic") String topic,@Payload byte[] payload) { System.out.println("*****************gc**************************************"+topic); JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload)); //业务的处理 System.out.println("*****************test**************************************"+jsonObject); } }