Java 使用Mqtt 重连机制 订阅者 +服务端回复

时间:2024-02-24 10:41:54

package com.jeecg.tab.mymqtt;

import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import com.jeecg.tab.mqtt.ClientMQTT;

public class MyClient {
//设备属性读取 ----请求
String qone="PG01001/001/properties/get";
//设备修改 ----请求
String qupdate ="PG01001/001/properties/set";
//设备功能请求
String qinvoke="PG01001/001/function/invoke";
//设备消息透传
String qtransparent="PG01001/001//transparent/request";

//监控状态设定

//4.1.6.设备状态读取
//String qtransparent="PG01001/001/properties/get";

//3.3设备属性修改 回复
String hone="PG01001/001/properties/get/reply";
//设备属性 回复
String hupdate ="PG01001/001/properties/set/reply";
//设备属性上报
String uploadtext="PG01001/001/properties/report";
//设备功能上报
String hinvoke="PG01001/001/function/invoke/reply";
//设备事件上报
String uevent="PG01001/001/event/report";
//设备消息透传
String htransparent="PG01001/001//transparent/reply";



private MqttClient client;
private MqttConnectOptions options;
private static String[] myTopics = { "rpm/sfloc", "geng" ,"wang"};
private static int[] myQos = { 1, 1,1 };
private static CopyOnWriteArrayList<Map<String,String>> slistKZ = new CopyOnWriteArrayList<>();



public static CopyOnWriteArrayList<Map<String, String>> getSlistKZ() {
return slistKZ;
}

 

 

public static void setSlistKZ(CopyOnWriteArrayList<Map<String, String>> slistKZ) {
MyClient.slistKZ = slistKZ;
}

 

 

public static void main(String[] args) {
System.out.println("client start...");
MyMqtt myMqtt = new MyMqtt("12345678");
myMqtt.subscribe(myTopics, myQos);
}
}

 

 

package com.jeecg.tab.mymqtt;

import java.util.HashMap;
import java.util.Map;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MyMqtt {
private String host = "tcp://192.168.124.250:1883";
private String userName = "admin";
private String passWord = "admin";
private MqttClient client;
private String id;
private static MyMqtt instance; // = new MyMqtt();
private MqttTopic mqttTopic;
private String myTopic = "wang";
private MqttMessage message;
public MyMqtt(String id) {
// super();
this( id, null, false);
}
//断线重连
// public void reConnect() throws Exception {
// MqttConnectOptions options = new MqttConnectOptions();
// if(null != client) {
// client.connect(option);
// }
// }
public MyMqtt(String id, MqttCallback callback, boolean cleanSession){
try {
//id应该保持唯一性
client = new MqttClient(host, id, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);
options.setKeepAliveInterval(20);
if (callback == null) {
client.setCallback(new MqttCallback() {

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO Auto-generated method stub
Map<String, String> map=new HashMap<String, String>();
map.put("topic", topic);
map.put("qos", message.getQos()+"");
map.put("message", new String(message.getPayload()));
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
MyClient.getSlistKZ().add(map);

}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub

}

@Override
public void connectionLost(Throwable cause) {
// TODO Auto-generated method stub
System.out.println("连接断开正在重连");
while (true){
try {//如果没有发生异常说明连接成功,如果发生异常,则死循环
Thread.sleep(1000);

//client.connect(options);
client.reconnect();
// System.out.println("连接成功");
break;
}catch (Exception e){
continue;
}
}

}
});
} else {
client.setCallback(callback);
}
client.connect(options);
//遗嘱
// options.setWill(topic, "close".getBytes(), 1, true);
// client.connect(options);
} catch (MqttException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
private static String[] myTopics = { "/PG01001/+/properties/set", "/PG01001/+/function/invoke" ,"/PG01001/+/properties/get"};
private static int[] myQos = { 0, 0, 0 };
public void sendMessage(String msg) {
sendMessage(myTopic, msg);
}

public void sendMessage(String topic, String msg){
try {
// client = new MqttClient(host, id, new MemoryPersistence());
message = new MqttMessage();
message.setQos(0);
message.setRetained(true);
message.setPayload(msg.getBytes());
mqttTopic = client.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(message);//发布主题
token.waitForCompletion();
} catch (MqttPersistenceException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
} catch (MqttException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}

public void subscribe(String[] topicFilters, int[] qos) {
try {
client.subscribe(topicFilters, qos);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}// 订阅主题

}

}