MQTT发布订阅程序完整代码

时间:2025-03-02 08:06:08
  • 工具类,包含有发布者方法和订阅者方法。
    package ;
    
    import .mqttv3.*;
    import .;
    
    import ;
    
    /**
     * mqtt的发布和订阅
     *
     * @author wzq
     */
    public class PublishSubscribe {
    
    
        private static String serviceURI = "tcp://172.16.22.160:1883";
        private static String clientID = ().toString();
        private static MqttClientPersistence persistence = new MemoryPersistence();
        //如果mqtt服务配置了匿名访问,则不需要使用用户名和密码就可以实现消息的订阅和发布
    //    private static String username = "username";
    //    private static String password = "password";
        private static String topic = "cebPark";
        /*
            消息服务质量,一共有三个:
            0:尽力而为。消息可能会丢,但绝不会重复传输
            1:消息绝不会丢,但可能会重复传输
            2:恰好一次。每条消息肯定会被传输一次且仅传输一次
         */
        private static int qos = 0;
    
        /**
         * 消息发布
         *
         * @author wzq
         **/
        public static void publish() {
            try {
                MqttClient client = new MqttClient(serviceURI, clientID, persistence);
                MqttConnectOptions connectOptions = new MqttConnectOptions();
    //            (username);
    //            (());
                (false);
                //发布者连接服务
                (connectOptions);
                ("发布者连接状态: " + ());
                MqttTopic mqttTopic = (topic);
                //MqttMessage mqttMessage = new MqttMessage(());
                MqttMessage mqttMessage = new MqttMessage();
                (qos);
                int i = 1;
                String message = "hello,智能公厕-";
                while (true) {
                    String payLoad = message + i++;
                    (());
                    MqttDeliveryToken deliveryToken = (mqttMessage);
                    if (!()) {
                        ("发布者发布消息: " + payLoad + " 失败");
                        ();
                    } else {
                        ("发布者发布消息: " + payLoad + " 成功");
                    }
                }
            } catch (Exception e) {
                ();
            }
        }
    
        /**
         * 消息订阅
         *
         * @author wzq
         **/
        public static void subscribe() {
            try {
                MqttClient client = new MqttClient(serviceURI, clientID, persistence);
                (new MqttCallback() {
                    @Override
                    public void connectionLost(Throwable cause) {
                        ("订阅者连接丢失...");
                        (());
                    }
    
                    @Override
                    public void messageArrived(String topic, MqttMessage message) {
                        ("订阅者接收到消息: " + ());
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken token) {
                    }
                });
                MqttConnectOptions connectOptions = new MqttConnectOptions();
    //            (username);
    //            (());
                (false);
                //订阅者连接订阅主题
                (connectOptions);
                (topic, qos);
                ("订阅者连接状态: " + ());
            } catch (MqttException e) {
                ();
            }
    
        }
    
    
    }
    
  • 发布者
    package ;
    
    /**
     * mqtt发布
     * @author: wzq
     * @time: 2018-07-27 16:43
     */
    public class Publish {
        public static void main(String[] args) {
            ();
        }
    }
    
  • 订阅者
    package ;
    
    /**
     * mqtt订阅
     * @author: wzq
     * @time: 2018-07-27 16:43
     */
    public class Subscribe {
        public static void main(String[] args) {
            ();
        }
    }