springboot整合mqtt实现android推送功能

时间:2024-05-01 20:59:49

1、mqtt服务器使用emqx

EMQX: The World's #1 Open Source Distributed MQTT Broker

2、下载安装

下载地址:

Download EMQX

选择系统,版本,安装方法

3、springboot连接mqtt服务方法:

引包:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

自定义配置信息:

spring:
  #MQTT配置信息
  mqtt:
    enable: true
    #MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开
    url: tcp://localhost:1883
    #用户名
    username: admin
    #密码
    password: public
    #客户端id(不能重复)
    provider-id: server-provider
    #MQTT默认的消息推送主题,实际可在调用接口是指定
    default-topic: topic

 配置参数对应的自定义配置类:

package com.gnetek.monitor.api.bean;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @Description mqtt客户端配置信息
 * @Author Darren Huang
 * @Date 2024-04-28 13:16
 */
@Data
@Component
@ConfigurationProperties(prefix = "spring.mqtt")
public class GneMqtt {

    /**
     * 启用
     */
    private boolean enable;

    /**
     * url
     */
    private String url;

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 提供者id
     */
    private String providerId;

    /**
     * 消费者id
     */
    private String consumerId;

    /**
     * 默认主题
     */
    private String defaultTopic;
}

启动连接mqtt服务器配置类 

注意,发送消息retained=true表示保留消息,发送后,才有客户端订阅也能收到,如果收到retained的消息后要删除消息,需要再发一个空的消息(payload= new byte[0])到此主题上

package com.gnetek.monitor.api.config;

import cn.hutool.json.JSONUtil;
import com.gnetek.monitor.api.bean.GneMqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * @Description 消息生产者客户端配置
 * @Author Darren Huang
 * @Date 2024-04-28 13:19
 */
@Slf4j
@Configuration
@ConditionalOnProperty(name = "spring.mqtt.enable", havingValue = "true")
public class MqttProviderClient {

    @Autowired
    private GneMqtt gneMqtt;


    private MqttClient mqttClient;

    /**
     * 在bean初始化后连接到服务器
     */
    @PostConstruct
    public void init(){
        connect();
    }

    /**
     * 客户端连接服务端
     */
    public void connect(){
        try{
            //创建MQTT客户端对象
            mqttClient = new MqttClient(gneMqtt.getUrl(), gneMqtt.getProviderId(), new MemoryPersistence());
            //连接设置
            MqttConnectOptions options = new MqttConnectOptions();
            //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
            //设置为true表示每次连接服务器都是以新的身份
            options.setCleanSession(true);
            //设置连接用户名
            options.setUserName(gneMqtt.getUsername());
            //设置连接密码
            options.setPassword(gneMqtt.getPassword().toCharArray());
            //设置超时时间,单位为秒
            options.setConnectionTimeout(100);
            //设置心跳时间 单位为秒,表示服务器每隔 1.5*30秒的时间向客户端发送心跳判断客户端是否在线
            options.setKeepAliveInterval(30);
            // 自动重连  setCallback需要实现 MqttCallbackExtended
            options.setAutomaticReconnect(true);
            //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
//            options.setWill("willTopic", (mqttClient + "与服务器断开连接").getBytes(),0,false);
            //设置回调
            mqttClient.setCallback(new MqttProviderCallBack());
            mqttClient.connect(options);
        } catch(MqttException e){
            e.printStackTrace();
        }
    }

    /**
     * 发布
     *
     * @param topic   主题
     * @param message 消息
     */
    public void publish(String topic, Object message) {
        publish(topic, message, true, 0);
    }


    /**
     * 发布
     *
     * @param topic    主题
     * @param message  消息
     * @param retained 保留
     * @param qos      qos 最多一次 (QoS0) 至少一次 (QoS1) 有且仅有一次 (QoS2)
     */
    public void publish(String topic, Object message, boolean retained, int qos){
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        byte[] payload = JSONUtil.toJsonStr(message).getBytes();
        mqttMessage.setPayload(payload);
        //主题的目的地,用于发布/订阅信息
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        //提供一种机制来跟踪消息的传递进度
        //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
        MqttDeliveryToken token;
        try {
            //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
            //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
            token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void destroy() {
        try {
            if (mqttClient.isConnected()) {
                mqttClient.disconnect();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

 mqtt客户端监听:

注意如果你需要与服务器断开连接后重新连接,需要实现MqttCallbackExtended方法,

// 自动重连  setCallback需要实现 MqttCallbackExtended
 options.setAutomaticReconnect(true);

package com.gnetek.monitor.api.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @Description 生产者客户端消息回调
 * @Author Darren Huang
 * @Date 2024-04-28 13:28
 */
@Slf4j
public class MqttProviderCallBack implements MqttCallbackExtended {

    /**
     * 连接丢失
     *
     * @param throwable throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.debug("与服务器断开连接,可重连");
        throwable.printStackTrace();
    }

    /**
     * 消息到达
     *
     * @param topic       主题
     * @param mqttMessage mqtt消息
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.debug("接收消息的主题 : {}", topic);
        log.debug("接收消息的内容 : {}", new String(mqttMessage.getPayload()));
        log.debug("接收消息的Qos : {}", mqttMessage.getQos());
        log.debug("接收消息的retained : {}", mqttMessage.isRetained());
    }

    /**
     * 交付完成
     *
     * @param iMqttDeliveryToken MQTT交付令牌
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.debug("发送消息成功");
    }

    /**
     * 连接完成
     *
     * @param reconnect 重新连接
     * @param serverURI 服务器uri
     */
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        // 可以做订阅主题 if(reconnect) 如果是重连 订阅主题
        log.debug("与服务器断连接完成 是否是重连={}, serverURI={}", reconnect, serverURI);
    }
}

4、android连接mqtt服务

参考:Android 使用 Kotlin 连接 MQTT | EMQ (emqx.com)

里面用到了

implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' 

高版本的android需要修改一些代码,它的源码地址是:eclipse/paho.mqtt.android: MQTT Android (github.com)