最近在使用MQTT来实现消息的传输,网上demo很多,这里就不在重复介绍了,直接上代码,百度就能出现一大堆
下面是MQTT实现订阅的主要代码部分
MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10); options.setKeepAliveInterval(20); //设置断开后重新连接 options.setAutomaticReconnect(true); try { client.setCallback(new PushCallback());//设置各种情况的回调函数 client.connect(options); //订阅消息 int[] Qos = {0}; String[] topic1 = {TOPIC}; client.subscribe(topic1, Qos); } catch (Exception e) { e.printStackTrace(); }
回到方法实现代码如下
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 连接丢失后,触发这个方法 } @Override public void deliveryComplete(IMqttDeliveryToken token) { // publish后会执行到这里 } @Override public void messageArrived(String arg0, MqttMessage arg1) throws Exception { System.out.println("接收消息主题:"+arg0); System.out.println("接收消息Qos:"+arg1.getQos()); System.out.println("接收消息内容:"+new String(arg1.getPayload())); } }
如果MQTT连接断开,会调用connectionLost 函数,我一开始觉得直接在这里使用client.connect(options),就可以直接实现重连了,结果报错mqtt的状态为
这个好解决,如果这样设置,可以实现断开自动重连
//设置断开后重新连接
options.setAutomaticReconnect(true);
但这样重连是实现了,但是之前订阅的主题却接收不到消息了,需要重新订阅主题才能正常接收消息,那我这个重新订阅的代码要怎么再放进去呢,反正不是再connectionLost里就是了,那是后连接还没有重连连上!
继续看MQTT的connec的源码发现了一段代码使我找到了解决方案
MqttReconnectCallback 是实现MqttCallbackExtended接口的
发现comms中有设置重连的回调对象,但是怎么把这个回调由我们来主动放进去呢?继续往下看源码可以发现
也就是如果我们在之前放入client的回调对象是实现的 MqttCallbackExtended 接口,则MQTT会将我们的回调对象放入 connectActionListener 中 然后由 connectActionListener实现具体的connect
接下来我们不callback 对象改为实现 MqttCallbackExtended这个接口,然后实现下面方法,
@Override public void connectComplete(boolean reconnect, String serverURI) { //连接成功后调用
client.subscribe(topics,Qos);//具体订阅代码
}
就可以解决MQTT重连后无法订阅的问题