Java:实现MQTT发送消息

时间:2025-03-20 10:30:50

本示例为Paho Java客户端,主体实现行情数据推送
Paho Java客户端提供了两个API:MqttAsyncClient提供了一个完全异步的API,通过已注册的回调通知完成活动。 MqttClient是MqttAsyncClient的一个同步包装,其中函数与应用程序同步。

  • 项目网站:Eclipse Paho | The Eclipse Foundation
  • Paho Java:/paho/clients/java/
  • GitHub:GitHub - eclipse/: Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.

发送消息

import .slf4j.Slf4j;
import .;
import .;
import .;
import .;
import .;
import .;
import .;

@Slf4j
public class MqttClient {

	public static . mqttClient = null;
	private static MemoryPersistence memoryPersistence = null;
	private static MqttConnectOptions mqttConnectOptions = null;

	private static MqttClient instance = null;

	public static MqttClient getInstance() throws Exception {
		if (instance == null) {
			synchronized () {
				if (instance == null) {
					instance = new MqttClient();
				}
			}
		}
		return instance;
	}

	public MqttClient(){
		init("admin");
	}

	public void init(String clientId) {
		//初始化连接设置对象
		mqttConnectOptions = new MqttConnectOptions();
		//初始化MqttClient
		if(null != mqttConnectOptions) {
//			true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
			(true);
//			设置连接超时
			(30);
//			设置持久化方式
			memoryPersistence = new MemoryPersistence();
			if(null != memoryPersistence && null != clientId) {
				try {
					mqttClient = new .("tcp://47.111.102.176:1883", clientId,memoryPersistence);
				} catch (MqttException e) {
					// TODO Auto-generated catch block
					();
				}
			}else {

			}
		}else {
			("mqttConnectOptions对象为空");
		}
		//设置连接和回调
		if(null != mqttClient) {
			if(!()) {
				try {
					("创建连接:" + ());
					(mqttConnectOptions);
				} catch (MqttException e) {
					// TODO Auto-generated catch block
					();
				}

			}
		}else {
			("mqttClient为空");
		}
	}

	//	关闭连接
	public void closeConnect() {
		//关闭存储方式
		if(null != memoryPersistence) {
			try {
				();
			} catch (MqttPersistenceException e) {
				// TODO Auto-generated catch block
				();
			}
		}else {
			("memoryPersistence is null");
		}

//		关闭连接
		if(null != mqttClient) {
			if(()) {
				try {
					();
					();
				} catch (MqttException e) {
					// TODO Auto-generated catch block
					();
				}
			}else {
				("mqttClient is not connect");
			}
		}else {
			("mqttClient is null");
		}
	}

	//	发布消息
	public void publishMessage(String pubTopic,String message,int qos) {
		if(null != mqttClient&& ()) {
			MqttMessage mqttMessage = new MqttMessage();
			(qos);
			(());
			MqttTopic topic = (pubTopic);
			if(null != topic) {
				try {
					MqttDeliveryToken publish = (mqttMessage);
					if(!()) {
						//("消息发布成功");
					}
				} catch (MqttException e) {
					// TODO Auto-generated catch block
					();
				}
			}

		}else {
			reConnect();
		}

	}
	//	重新连接
	public  void reConnect() {
		if(null != mqttClient) {
			if(!()) {
				if(null != mqttConnectOptions) {
					try {
						(mqttConnectOptions);
					} catch (MqttException e) {
						// TODO Auto-generated catch block
						();
					}
				}else {
					("mqttConnectOptions is null");
				}
			}else {
				("mqttClient is null or connect");
			}
		}else {
			init("admin");
		}

	}
	//	订阅主题
	public void subTopic(String topic) {
		if(null != mqttClient&& ()) {
			try {
				(topic, 1);
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				();
			}
		}else {
			("mqttClient is error");
		}
	}


	//	清空主题
	public void cleanTopic(String topic) {
		if(null != mqttClient&& !()) {
			try {
				(topic);
			} catch (MqttException e) {
				// TODO Auto-generated catch block
				();
			}
		}else {
			("mqttClient is error");
		}
	}

	public static  void main(String [] args){
		MqttClient mqttClient = new MqttClient();
		("marketAll", "12312312312", 1);
	}
}