eclipse paho java实现mqtt消息的发送与订阅
- 定义
- 代码实现
- 2.1发送端
- 2.2客户端
定义
mqtt是什么这里就不做解释了,自行百度。这里直接上代码。
代码实现
2.1发送端
发送端工具类:
public class SendMQTT {
private static Logger logger = ();
// 配置类,自动读取properties中的配置信息,例如:url,用户民,密码等
private static MqttConfig mqttConfig;
static{
ApplicationContext a = ();
mqttConfig = ();
}
// 客户端
private static MqttClient client;
private static MqttTopic topic11;
// 创建链接
public static void connect(MqttConfig mqttConfig){
try {
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
if(client == null){
synchronized () {
if(client == null){
// 建立链接,()获得服务器地址,()获取clientId,客户端唯一标示,new MemoryPersistence()设置clientId的保存形式,默认为以内存保存
client = new MqttClient((), (), new MemoryPersistence());
// 设置回调类,后面会有详细说明
(new PushCallback());
}
}
}
MqttConnectOptions options = getOption();
(options);
("成功链接服务器。。。");
topic11 = (());
} catch (MqttException e) {
// TODO Auto-generated catch block
();
}
}
/**
* 推送消息
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public static void publish(String body)
throws MqttPersistenceException, MqttException {
MqttMessage message = new MqttMessage();
(1);// 消息质量
(true);// 断开链接是否保存消息,true保存
(());
MqttDeliveryToken token = (message);
();
("message is published completely! "
+ ());
}
// 重连
public static void reconnect(){
try {
();
} catch (MqttException e) {
// TODO Auto-generated catch block
();
}
}
private static MqttConnectOptions getOption(){
MqttConnectOptions options = new MqttConnectOptions();
(false);
(());
(().toCharArray());
// 设置超时时间
(10);
// 设置会话心跳时间
(20);
return options;
}
}
回调函数:
public class PushCallback implements MqttCallbackExtended{
Logger logger = ();
@Override
public void connectionLost(Throwable cause) {
("=======================连接断开,进行重连");
();
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// ("======================推送成功触发该方法");
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// ("======================链接成功后触发该方法");
("断开重连成功");
}
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
// ("======================这里是客户端接受消息的地方");
}
}
测试:
可以写一个线程池循环发送消息进行测试。
public void testMqtt() {
String topic = "topic/gj/respirator_data";
(topic);
(mqttConfig);
ScheduledExecutorService timer = ();
Task timerTask = new Task(topic); // 任务需要 2000 ms 才能执行完毕
// 延时 1 秒后,按 3 秒的周期执行任务
(timerTask, 1000, 5000, );
}
public class Task implements Runnable{
@Override
public void run() {
(“消息内容。。。。。。”);
}
}
2.2客户端
客户端订阅入口:
ClientMQTT .connect(); ,调用完改法会触发回调函数进行订阅,详情看()类的实现。
客户端工具类:
public class ClientMQTT {
private static Logger logger = ();
// 订阅主题
private static String[] topic = {"topic/gj/#"};
// 消息质量
private static int[] Qos = {1};
// 配置类,读取properties文件配置,包括:url,用户名,密码等信息
private static MqttConfig mqttConfig;
static{
ApplicationContext a = ();
mqttConfig = ();
}
private static MqttClient client = null;
// 连接mqtt服务器
public static void connect(){
try {
if(client == null){
synchronized () {
if(client == null){
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient((), (), new MemoryPersistence());
// 设置回调
(new PushCallback());
}
}
}
MqttConnectOptions options = getOption();
if (!()) {
(options);
("客户端成功建立连接。。。");
}else {//这里的逻辑是如果连接成功就重新连接
();
(options);
("客户端成功建立连接。。。");
}
} catch (Exception e) {
();
}
}
// 订阅
public static void subscribe() {
try {
(topic, Qos);
("客户端成功订阅。。。");
} catch (MqttException e) {
();
}
}
// 重新链接
public static void reconnect(){
try {
();
} catch (MqttException e) {
();
}
}
private static MqttConnectOptions getOption(){
// MQTT的连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
(true);
(());
(().toCharArray());
// 设置超时时间 单位为秒
(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
(20);
//设置断开后重新连接
(true);
return options;
}
}
详细的过程都在注释里说明,大致就是先创建链接,然后发起订阅。我的订阅放在回调方法()中调用的,每当连接建立,都会触发该方法。并且回调类中做了重连的处理connectionLost(),如果重连成功,同样会调用connectComplete()方法,重新订阅。
注意,订阅多个主题,则是通过通配符#(表示n层)或+(表示一层)来实现,例如:"topic/#"可以订阅:“topic” ,“topic/data_1”,“topic/data_2”,“topic/secod/data_3”
回调类:
public class PushCallback implements MqttCallbackExtended{
Logger logger = ();
@Override
public void connectionLost(Throwable cause) {
("=======================连接断开,进行重连");
();
}
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
// ("接收消息主题 : " + topic);
// ("接收消息Qos : " + ());
("接收消息内容 : " + new String(()));
Message mess = (new String(()), );
(());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
();//具体订阅代码
}
}
配置类:
@Configuration
@PropertySource(value={"classpath:"})
public class MqttConfig {
@Value("${}")
private String host;
@Value("${}")
private String userName;
@Value("${}")
private String passWord;
@Value("${}")
private String clientId;
@Value("${}")
private String receiveTopic;
@Bean
public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
return new PropertySourcesPlaceholderConfigurer();
}
public String getHost() {
return host;
}
public void setHost(String host) {
= host;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
= userName;
}
public String getPassWord() {
return passWord;
}
public void setPassWord(String passWord) {
= passWord;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
= clientId;
}
public String getReceiveTopic() {
return receiveTopic;
}
public void setReceiveTopic(String receiveTopic) {
= receiveTopic;
}
}
其实发送端和客户端的回调类和配置类是可以共用的,我这里是分开写的,起了两个服务来分别作为发送端和客户端。