java实现mqtt_JAVA 实现MQTT发送消息

时间:2025-03-23 09:57:32

本示例为Paho Java客户端,主体实现行情数据推送

Paho Java客户端提供了两个API:MqttAsyncClient提供了一个完全异步的API,通过已注册的回调通知完成活动。 MqttClient是MqttAsyncClient的一个同步包装,其中函数与应用程序同步。

项目网站:/paho

Paho Java:/paho/clients/java/

GitHub:/eclipse/

发送消息

import .slf4j.Slf4j;

import .;

import .;

import .;

import .;

import .;

import .;

import .;

@Slf4j

public class MqttClient {

public static . mqttClient = null;

private static MemoryPersistence memoryPersistence = null;

private static MqttConnectOptions mqttConnectOptions = null;

private static MqttClient instance = null;

public static MqttClient getInstance() throws Exception {

if (instance == null) {

synchronized () {

if (instance == null) {

instance = new MqttClient();

}

}

}

return instance;

}

public MqttClient(){

init("admin");

}

public void init(String clientId) {

//初始化连接设置对象

mqttConnectOptions = new MqttConnectOptions();

//初始化MqttClient

if(null != mqttConnectOptions) {

//true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态

(true);

//设置连接超时

(30);

//设置持久化方式

memoryPersistence = new MemoryPersistence();

if(null != memoryPersistence && null != clientId) {

try {

mqttClient = new .("tcp://47.111.102.176:1883", clientId,memoryPersistence);

} catch (MqttException e) {

// TODO Auto-generated catch block

();

}

}else {

}

}else {

("mqttConnectOptions对象为空");

}

//设置连接和回调

if(null != mqttClient) {

if(!()) {

try {

("创建连接:" + ());

(mqttConnectOptions);

} catch (MqttException e) {

// TODO Auto-generated catch block

();

}

}

}else {

("mqttClient为空");

}

}

//关闭连接

public void closeConnect() {

//关闭存储方式

if(null != memoryPersistence) {

try {

();

} catch (MqttPersistenceException e) {

// TODO Auto-generated catch block

();

}

}else {

("memoryPersistence is null");

}

//关闭连接

if(null != mqttClient) {

if(()) {

try {

();

();

} catch (MqttException e) {

// TODO Auto-generated catch block

();

}

}else {

("mqttClient is not connect");

}

}else {

("mqttClient is null");

}

}

//发布消息

public void publishMessage(String pubTopic,String message,int qos) {

if(null != mqttClient&& ()) {

MqttMessage mqttMessage = new MqttMessage();

(qos);

(());

MqttTopic topic = (pubTopic);

if(null != topic) {

try {

MqttDeliveryToken publish = (mqttMessage);

if(!()) {

//("消息发布成功");

}

} catch (MqttException e) {

// TODO Auto-generated catch block

();

}

}

}else {

reConnect();

}

}

//重新连接

public void reConnect() {

if(null != mqttClient) {

if(!()) {

if(null != mqttConnectOptions) {

try {

(mqttConnectOptions);

} catch (MqttException e) {

// TODO Auto-generated catch block

();

}

}else {

("mqttConnectOptions is null");

}

}else {

("mqttClient is null or connect");

}

}else {

init("admin");

}

}

//订阅主题

public void subTopic(String topic) {

if(null != mqttClient&& ()) {

try {

(topic, 1);

} catch (MqttException e) {

// TODO Auto-generated catch block

();

}

}else {

("mqttClient is error");

}

}

//清空主题

public void cleanTopic(String topic) {

if(null != mqttClient&& !()) {

try {

(topic);

} catch (MqttException e) {

// TODO Auto-generated catch block

();

}

}else {

("mqttClient is error");

}

}

public static void main(String [] args){

MqttClient mqttClient = new MqttClient();

("marketAll", "12312312312", 1);

}

}