1、mqtt服务器使用emqx
EMQX: The World's #1 Open Source Distributed MQTT Broker
2、下载安装
下载地址:
Download EMQX
选择系统,版本,安装方法
3、springboot连接mqtt服务方法:
引包:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
自定义配置信息:
spring:
#MQTT配置信息
mqtt:
enable: true
#MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
url: tcp://localhost:1883
#用户名
username: admin
#密码
password: public
#客户端id(不能重复)
provider-id: server-provider
#MQTT默认的消息推送主题,实际可在调用接口是指定
default-topic: topic
配置参数对应的自定义配置类:
package com.gnetek.monitor.api.bean;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @Description mqtt客户端配置信息
* @Author Darren Huang
* @Date 2024-04-28 13:16
*/
@Data
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class GneMqtt {
/**
* 启用
*/
private boolean enable;
/**
* url
*/
private String url;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 提供者id
*/
private String providerId;
/**
* 消费者id
*/
private String consumerId;
/**
* 默认主题
*/
private String defaultTopic;
}
启动连接mqtt服务器配置类
注意,发送消息retained=true表示保留消息,发送后,才有客户端订阅也能收到,如果收到retained的消息后要删除消息,需要再发一个空的消息(payload= new byte[0])到此主题上
package com.gnetek.monitor.api.config;
import cn.hutool.json.JSONUtil;
import com.gnetek.monitor.api.bean.GneMqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* @Description 消息生产者客户端配置
* @Author Darren Huang
* @Date 2024-04-28 13:19
*/
@Slf4j
@Configuration
@ConditionalOnProperty(name = "spring.mqtt.enable", havingValue = "true")
public class MqttProviderClient {
@Autowired
private GneMqtt gneMqtt;
private MqttClient mqttClient;
/**
* 在bean初始化后连接到服务器
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客户端连接服务端
*/
public void connect(){
try{
//创建MQTT客户端对象
mqttClient = new MqttClient(gneMqtt.getUrl(), gneMqtt.getProviderId(), new MemoryPersistence());
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接服务器都是以新的身份
options.setCleanSession(true);
//设置连接用户名
options.setUserName(gneMqtt.getUsername());
//设置连接密码
options.setPassword(gneMqtt.getPassword().toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置心跳时间 单位为秒,表示服务器每隔 1.5*30秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(30);
// 自动重连 setCallback需要实现 MqttCallbackExtended
options.setAutomaticReconnect(true);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
// options.setWill("willTopic", (mqttClient + "与服务器断开连接").getBytes(),0,false);
//设置回调
mqttClient.setCallback(new MqttProviderCallBack());
mqttClient.connect(options);
} catch(MqttException e){
e.printStackTrace();
}
}
/**
* 发布
*
* @param topic 主题
* @param message 消息
*/
public void publish(String topic, Object message) {
publish(topic, message, true, 0);
}
/**
* 发布
*
* @param topic 主题
* @param message 消息
* @param retained 保留
* @param qos qos 最多一次 (QoS0) 至少一次 (QoS1) 有且仅有一次 (QoS2)
*/
public void publish(String topic, Object message, boolean retained, int qos){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
byte[] payload = JSONUtil.toJsonStr(message).getBytes();
mqttMessage.setPayload(payload);
//主题的目的地,用于发布/订阅信息
MqttTopic mqttTopic = mqttClient.getTopic(topic);
//提供一种机制来跟踪消息的传递进度
//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
MqttDeliveryToken token;
try {
//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
@PreDestroy
public void destroy() {
try {
if (mqttClient.isConnected()) {
mqttClient.disconnect();
}
} catch (MqttException e) {
e.printStackTrace();
}
}
}
mqtt客户端监听:
注意如果你需要与服务器断开连接后重新连接,需要实现MqttCallbackExtended方法,
// 自动重连 setCallback需要实现 MqttCallbackExtended
options.setAutomaticReconnect(true);
package com.gnetek.monitor.api.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* @Description 生产者客户端消息回调
* @Author Darren Huang
* @Date 2024-04-28 13:28
*/
@Slf4j
public class MqttProviderCallBack implements MqttCallbackExtended {
/**
* 连接丢失
*
* @param throwable throwable
*/
@Override
public void connectionLost(Throwable throwable) {
log.debug("与服务器断开连接,可重连");
throwable.printStackTrace();
}
/**
* 消息到达
*
* @param topic 主题
* @param mqttMessage mqtt消息
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
log.debug("接收消息的主题 : {}", topic);
log.debug("接收消息的内容 : {}", new String(mqttMessage.getPayload()));
log.debug("接收消息的Qos : {}", mqttMessage.getQos());
log.debug("接收消息的retained : {}", mqttMessage.isRetained());
}
/**
* 交付完成
*
* @param iMqttDeliveryToken MQTT交付令牌
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.debug("发送消息成功");
}
/**
* 连接完成
*
* @param reconnect 重新连接
* @param serverURI 服务器uri
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// 可以做订阅主题 if(reconnect) 如果是重连 订阅主题
log.debug("与服务器断连接完成 是否是重连={}, serverURI={}", reconnect, serverURI);
}
}
4、android连接mqtt服务
参考:Android 使用 Kotlin 连接 MQTT | EMQ (emqx.com)
里面用到了
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
高版本的android需要修改一些代码,它的源码地址是:eclipse/paho.mqtt.android: MQTT Android (github.com)