springboot整合mqtt实现消息订阅和推送
import com.alibaba.fastjson.JSONObject;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.tio.utils.buffer.ByteBufferUtil;
/**
* @author tarzan
*/
@Component
@Slf4j
public class SimulationSubscriber {
@MqttClientSubscribe("tuoyuan/publish/zj/#")
public void zjOne(String topic, byte[] payload){
String[] strs=topic.split("/");
String ID=strs[strs.length-1];
log.info("topic:{} payload:{} ID:{}", topic, new String(payload, StandardCharsets.UTF_8),ID);
}
@MqttClientSubscribe("/sys/${deviceName}/thing/sub/register")
public void thingSubRegister(String topic, byte[] payload) {
// 1.3.8 开始支持,@MqttClientSubscribe 注解支持 ${} 变量替换,会默认替换成 +
// 注意:mica-mqtt 会先从 Spring boot 配置中替换参数 ${},如果存在配置会优先被替换。
logger.info("topic:{} payload:{}", topic, new String(payload, StandardCharsets.UTF_8));
}
@MqttClientSubscribe("/tianma/publish/cmj")
public void cmj(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//业务的处理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/zj")
public void zj(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//业务的处理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/gbj")
public void gbj(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//业务的处理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/ltl")
public void ltl(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//业务的处理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/ntl")
public void ntl(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//业务的处理
System.out.println("*****************test**************************************"+jsonObject);
}
@MqttClientSubscribe("/tianma/publish/ccl")
public void ccl(@Header("topic") String topic,@Payload byte[] payload) {
System.out.println("*****************gc**************************************"+topic);
JSONObject jsonObject = JSONObject.parseObject(ByteBufferUtil.toString(payload));
//业务的处理
System.out.println("*****************test**************************************"+jsonObject);
}
}