工具类,包含有发布者方法和订阅者方法。 package ;
import .mqttv3.*;
import .;
import ;
/**
* mqtt的发布和订阅
*
* @author wzq
*/
public class PublishSubscribe {
private static String serviceURI = "tcp://172.16.22.160:1883";
private static String clientID = ().toString();
private static MqttClientPersistence persistence = new MemoryPersistence();
//如果mqtt服务配置了匿名访问,则不需要使用用户名和密码就可以实现消息的订阅和发布
// private static String username = "username";
// private static String password = "password";
private static String topic = "cebPark";
/*
消息服务质量,一共有三个:
0:尽力而为。消息可能会丢,但绝不会重复传输
1:消息绝不会丢,但可能会重复传输
2:恰好一次。每条消息肯定会被传输一次且仅传输一次
*/
private static int qos = 0;
/**
* 消息发布
*
* @author wzq
**/
public static void publish() {
try {
MqttClient client = new MqttClient(serviceURI, clientID, persistence);
MqttConnectOptions connectOptions = new MqttConnectOptions();
// (username);
// (());
(false);
//发布者连接服务
(connectOptions);
("发布者连接状态: " + ());
MqttTopic mqttTopic = (topic);
//MqttMessage mqttMessage = new MqttMessage(());
MqttMessage mqttMessage = new MqttMessage();
(qos);
int i = 1;
String message = "hello,智能公厕-";
while (true) {
String payLoad = message + i++;
(());
MqttDeliveryToken deliveryToken = (mqttMessage);
if (!()) {
("发布者发布消息: " + payLoad + " 失败");
();
} else {
("发布者发布消息: " + payLoad + " 成功");
}
}
} catch (Exception e) {
();
}
}
/**
* 消息订阅
*
* @author wzq
**/
public static void subscribe() {
try {
MqttClient client = new MqttClient(serviceURI, clientID, persistence);
(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
("订阅者连接丢失...");
(());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
("订阅者接收到消息: " + ());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
MqttConnectOptions connectOptions = new MqttConnectOptions();
// (username);
// (());
(false);
//订阅者连接订阅主题
(connectOptions);
(topic, qos);
("订阅者连接状态: " + ());
} catch (MqttException e) {
();
}
}
}
发布者 package ;
/**
* mqtt发布
* @author: wzq
* @time: 2018-07-27 16:43
*/
public class Publish {
public static void main(String[] args) {
();
}
}
订阅者 package ;
/**
* mqtt订阅
* @author: wzq
* @time: 2018-07-27 16:43
*/
public class Subscribe {
public static void main(String[] args) {
();
}
}