eclipse paho java实现mqtt消息的发送与订阅

时间:2025-03-23 09:50:28

eclipse paho java实现mqtt消息的发送与订阅

  • 定义
  • 代码实现
    • 2.1发送端
    • 2.2客户端

定义

mqtt是什么这里就不做解释了,自行百度。这里直接上代码。

代码实现

2.1发送端

发送端工具类:

public class SendMQTT {
	private static Logger logger = ();
	  // 配置类,自动读取properties中的配置信息,例如:url,用户民,密码等 
	private static MqttConfig mqttConfig;
	
	static{
		ApplicationContext a = ();
		mqttConfig = ();
	}
	// 客户端
	private static MqttClient client;
	private static MqttTopic topic11;

	// 创建链接
	public static void connect(MqttConfig mqttConfig){
		try {
			// MemoryPersistence设置clientid的保存形式,默认为以内存保存
			if(client == null){
				synchronized () {
					if(client == null){
							// 建立链接,()获得服务器地址,()获取clientId,客户端唯一标示,new MemoryPersistence()设置clientId的保存形式,默认为以内存保存
							
							client = new MqttClient((), (), new MemoryPersistence());
							// 设置回调类,后面会有详细说明
							(new PushCallback());
					}
				}
			}
			MqttConnectOptions options = getOption();
			(options);
			("成功链接服务器。。。");
			topic11 = (());
		} catch (MqttException e) {
			// TODO Auto-generated catch block
			();
		}
	}

	/**
	 * 推送消息
	 * @param topic
	 * @param message
	 * @throws MqttPersistenceException
	 * @throws MqttException
	 */
	public static void publish(String body)
			throws MqttPersistenceException, MqttException {
		MqttMessage message = new MqttMessage();
		(1);// 消息质量
		(true);// 断开链接是否保存消息,true保存
		(());
		MqttDeliveryToken token = (message);
		();
		("message is published completely! "
				+ ());
	}
	
	// 重连
	public static void reconnect(){
		try {
			();
		} catch (MqttException e) {
			// TODO Auto-generated catch block
			();
		}
	}
	
	private static MqttConnectOptions getOption(){
		MqttConnectOptions options = new MqttConnectOptions();
		(false);
		(());
		(().toCharArray());
		// 设置超时时间
		(10);
		// 设置会话心跳时间
		(20);
		return options;
	}

}

回调函数:

public class PushCallback implements MqttCallbackExtended{
	
	Logger logger = ();

	@Override
	public void connectionLost(Throwable cause) {
		("=======================连接断开,进行重连");
		();
		
	}
    	  
	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
      //		("======================推送成功触发该方法");
		
	}

	@Override
	public void connectComplete(boolean reconnect, String serverURI) {
	//		("======================链接成功后触发该方法");	   		
		("断开重连成功");
		
	}
	
	@Override
	public void messageArrived(String topic, MqttMessage message)
			throws Exception {
	  //		("======================这里是客户端接受消息的地方");	   		
	}
}

测试:

可以写一个线程池循环发送消息进行测试。

 public void testMqtt() {
	 String topic = "topic/gj/respirator_data";
	 (topic);
	 (mqttConfig);
 	 ScheduledExecutorService timer = ();
	 Task timerTask = new Task(topic); // 任务需要 2000 ms 才能执行完毕
	 // 延时 1 秒后,按 3 秒的周期执行任务
	 (timerTask, 1000, 5000, );
		
}
 

public class Task implements Runnable{
@Override
		public void run() {
			(“消息内容。。。。。。”);
		}
}

2.2客户端

客户端订阅入口:

ClientMQTT .connect(); ,调用完改法会触发回调函数进行订阅,详情看()类的实现。

客户端工具类:

public class ClientMQTT {
    	
    	private static Logger logger = ();
    	
    	// 订阅主题
    	private static String[] topic = {"topic/gj/#"};
    	// 消息质量
    	private static int[] Qos  = {1};
    	// 配置类,读取properties文件配置,包括:url,用户名,密码等信息
    	private static MqttConfig mqttConfig;
    	
    	static{
    		ApplicationContext a = ();
    		mqttConfig = ();
    	}
    	
    	private static MqttClient client = null;
    	
    	// 连接mqtt服务器
    	public static void connect(){
    	    try {
    	    	if(client == null){
    	    		synchronized () {
    	    			if(client == null){
    	    				// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
    				        client = new MqttClient((), (), new MemoryPersistence());
    				        // 设置回调
    				        (new PushCallback());
    	    			}
    				}
    	    	}
    	    	MqttConnectOptions options = getOption();
    	    	if (!()) {
    	    		(options);
    	    		("客户端成功建立连接。。。");
    	    	}else {//这里的逻辑是如果连接成功就重新连接
    	    		();
    	    		(options);
    	    		("客户端成功建立连接。。。");
    	    	}
    	        
    	    } catch (Exception e) {
    	        ();
    	    }
    	    
    	}
    	
    	// 订阅
    	public static void subscribe() {
    		try {
    			(topic, Qos);
    			("客户端成功订阅。。。");
    		} catch (MqttException e) {
    			();
    		}
           
         }
    	
    	// 重新链接
    	public static void reconnect(){
    		try {
    			();
    		} catch (MqttException e) {
    			();
    		}
    	}
    	        	      	
    	private static MqttConnectOptions getOption(){
    		// MQTT的连接设置
    		MqttConnectOptions options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            (true);
            (());
            (().toCharArray());
            // 设置超时时间 单位为秒
            (10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            (20);
            //设置断开后重新连接 
            (true);
            return options;
    	}
    }

详细的过程都在注释里说明,大致就是先创建链接,然后发起订阅。我的订阅放在回调方法()中调用的,每当连接建立,都会触发该方法。并且回调类中做了重连的处理connectionLost(),如果重连成功,同样会调用connectComplete()方法,重新订阅。
注意,订阅多个主题,则是通过通配符#(表示n层)或+(表示一层)来实现,例如:"topic/#"可以订阅:“topic” ,“topic/data_1”,“topic/data_2”,“topic/secod/data_3”

回调类:

public class PushCallback implements MqttCallbackExtended{
	
	Logger logger = ();

	@Override
	public void connectionLost(Throwable cause) {
		("=======================连接断开,进行重连");
		();
		
	}

	@Override
	public void messageArrived(String topic, MqttMessage message)
			throws Exception {
//		("接收消息主题 : " + topic);
//    	("接收消息Qos : " + ());
    	("接收消息内容 : " + new String(()));
    	Message mess = (new String(()), );
    	(());
		
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		
		
	}

	@Override
	public void connectComplete(boolean reconnect, String serverURI) {
		();//具体订阅代码
		
	}

}

配置类:

@Configuration
@PropertySource(value={"classpath:"})
public class MqttConfig {

	@Value("${}")
	private String host;
	
	@Value("${}")
	private String userName;

	@Value("${}")
	private String passWord;
	
	@Value("${}")
	private String clientId;

	@Value("${}")
	private String receiveTopic;


	@Bean
	public static PropertySourcesPlaceholderConfigurer propertyConfigInDev() {
		return new PropertySourcesPlaceholderConfigurer();
	}


	public String getHost() {
		return host;
	}


	public void setHost(String host) {
		 = host;
	}


	public String getUserName() {
		return userName;
	}


	public void setUserName(String userName) {
		 = userName;
	}


	public String getPassWord() {
		return passWord;
	}


	public void setPassWord(String passWord) {
		 = passWord;
	}


	public String getClientId() {
		return clientId;
	}


	public void setClientId(String clientId) {
		 = clientId;
	}


	public String getReceiveTopic() {
		return receiveTopic;
	}


	public void setReceiveTopic(String receiveTopic) {
		 = receiveTopic;
	}
	
}

其实发送端和客户端的回调类和配置类是可以共用的,我这里是分开写的,起了两个服务来分别作为发送端和客户端。