MQTT--Java整合EMQX

时间:2024-10-07 16:28:56
package com.itxhj.emqxdemo.io; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class App { public static void main(String[] args) { String subTopic = "testtopic/#"; String pubTopic = "testtopic/1"; String content = "Hello World"; int qos = 2; String broker = "tcp://192.168.176.128:1883"; // 地址修改成你开启EMQX的主机地址 String clientId = "emqx_test"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(broker, clientId, persistence); // MQTT 连接选项 MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName("admin"); // 客户端的账号,并非EMQX的 connOpts.setPassword("123456".toCharArray()); // 客户端的密码,并非EMQX的 // 保留会话 connOpts.setCleanSession(true); // 设置回调 client.setCallback(new OnMessageCallback()); // 建立连接 System.out.println("Connecting to broker: " + broker); client.connect(connOpts); System.out.println("Connected"); System.out.println("Publishing message: " + content); // 订阅 client.subscribe(subTopic); // 消息发布所需参数 MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(pubTopic, message); System.out.println("Message published"); Thread.sleep(10000); // 因为断开连接EMQX那边就看不见连接了,所以sleep一会 client.disconnect(); System.out.println("Disconnected"); client.close(); System.exit(0); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }