MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

时间:2023-03-08 21:52:53

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现

callback方式接收没有成功。所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因。

采用Callback式 发布主题

  1. package com.etrip.mqtt.callback;
  2. import java.net.URISyntaxException;
  3. import org.fusesource.hawtbuf.Buffer;
  4. import org.fusesource.hawtbuf.UTF8Buffer;
  5. import org.fusesource.mqtt.client.Callback;
  6. import org.fusesource.mqtt.client.CallbackConnection;
  7. import org.fusesource.mqtt.client.Listener;
  8. import org.fusesource.mqtt.client.MQTT;
  9. import org.fusesource.mqtt.client.QoS;
  10. import org.fusesource.mqtt.client.Topic;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. /**
  14. *
  15. * MQTT moquette 的Server 段用于并发布主题信息
  16. *
  17. * 采用Callback式 发布主题
  18. *
  19. * @author longgangbai
  20. */
  21. public class MQTTCallbackServer {
  22. private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackServer.class);
  23. private final static String CONNECTION_STRING = "tcp://localhost:1883";
  24. private final static boolean CLEAN_START = true;
  25. private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
  26. public  static Topic[] topics = {
  27. new Topic("china/beijing", QoS.EXACTLY_ONCE),
  28. new Topic("china/tianjin", QoS.AT_LEAST_ONCE),
  29. new Topic("china/henan", QoS.AT_MOST_ONCE)};
  30. public final  static long RECONNECTION_ATTEMPT_MAX=6;
  31. public final  static long RECONNECTION_DELAY=2000;
  32. public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
  33. public static void main(String[] args)   {
  34. //创建MQTT对象
  35. MQTT mqtt = new MQTT();
  36. try {
  37. //设置mqtt broker的ip和端口
  38. mqtt.setHost(CONNECTION_STRING);
  39. //连接前清空会话信息
  40. mqtt.setCleanSession(CLEAN_START);
  41. //设置重新连接的次数
  42. mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
  43. //设置重连的间隔时间
  44. mqtt.setReconnectDelay(RECONNECTION_DELAY);
  45. //设置心跳时间
  46. mqtt.setKeepAlive(KEEP_ALIVE);
  47. //设置缓冲的大小
  48. mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
  49. //获取mqtt的连接对象BlockingConnection
  50. final CallbackConnection connection = mqtt.callbackConnection();
  51. //添加连接的监听事件
  52. connection.listener(new Listener() {
  53. public void onDisconnected() {
  54. }
  55. public void onConnected() {
  56. }
  57. public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
  58. // You can now process a received message from a topic.
  59. // Once process execute the ack runnable.
  60. ack.run();
  61. System.out.println("topic"+topic.toString()+"="+new String(payload.getData()));
  62. }
  63. public void onFailure(Throwable value) {
  64. }
  65. });
  66. //添加连接事件
  67. connection.connect(new Callback<Void>() {
  68. /**
  69. * 连接失败的操作
  70. */
  71. public void onFailure(Throwable value) {
  72. // If we could not connect to the server.
  73. System.out.println("MQTTCallbackServer.CallbackConnection.connect.onFailure"+"连接失败......"+value.getMessage());
  74. value.printStackTrace();
  75. }
  76. /**
  77. * 连接成功的操作
  78. * @param v
  79. */
  80. public void onSuccess(Void v) {
  81. int count=1;
  82. while(true){
  83. count++;
  84. // 用于发布消息,目前手机段不需要向服务端发送消息
  85. //主题的内容
  86. final String message="hello "+count+"chinese people !";
  87. final String topic = "china/beijing";
  88. System.out.println("MQTTCallbackServer  publish  topic="+topic+" message :"+message);
  89. connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
  90. public void onSuccess(Void v) {
  91. // the pubish operation completed successfully.
  92. }
  93. public void onFailure(Throwable value) {
  94. value.printStackTrace();
  95. }
  96. });
  97. try {
  98. Thread.sleep(2000);
  99. } catch (InterruptedException e) {
  100. // TODO Auto-generated catch block
  101. e.printStackTrace();
  102. }
  103. }
  104. //                  //连接断开
  105. //                  connection.disconnect(new Callback<Void>() {
  106. //                      public void onSuccess(Void v) {
  107. //                        // called once the connection is disconnected.
  108. //                          System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onSuccess", "called once the connection is disconnected.");
  109. //                      }
  110. //                      public void onFailure(Throwable value) {
  111. //                        // Disconnects never fail.
  112. //                          System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onFailure", "Disconnects never fail."+value.getMessage());
  113. //                          value.printStackTrace();
  114. //                      }
  115. //                  });
  116. }
  117. });
  118. Thread.sleep(10000000000L);
  119. } catch (URISyntaxException e) {
  120. // TODO Auto-generated catch block
  121. e.printStackTrace();
  122. } catch (Exception e) {
  123. // TODO Auto-generated catch block
  124. e.printStackTrace();
  125. }finally{
  126. }
  127. }
  128. }

采用Callback式 订阅主题

  1. package com.etrip.mqtt.callback;
  2. import java.net.URISyntaxException;
  3. import org.fusesource.hawtbuf.Buffer;
  4. import org.fusesource.hawtbuf.UTF8Buffer;
  5. import org.fusesource.mqtt.client.Callback;
  6. import org.fusesource.mqtt.client.CallbackConnection;
  7. import org.fusesource.mqtt.client.Listener;
  8. import org.fusesource.mqtt.client.MQTT;
  9. import org.fusesource.mqtt.client.QoS;
  10. import org.fusesource.mqtt.client.Topic;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. /**
  14. *
  15. * MQTT moquette 的Client 段用于订阅主题,并接收主题信息
  16. *
  17. * 采用Callback式 订阅主题
  18. *
  19. * @author longgangbai
  20. */
  21. public class MQTTCallbackClient {
  22. private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackClient.class);
  23. private final static String CONNECTION_STRING = "tcp://localhost:1883";
  24. private final static boolean CLEAN_START = true;
  25. private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
  26. public  static Topic[] topics = {
  27. new Topic("china/beijing", QoS.AT_MOST_ONCE),
  28. new Topic("china/tianjin", QoS.AT_LEAST_ONCE),
  29. new Topic("china/henan", QoS.AT_MOST_ONCE)};
  30. public final  static long RECONNECTION_ATTEMPT_MAX=6;
  31. public final  static long RECONNECTION_DELAY=2000;
  32. final String topic = "china/beijing";
  33. public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
  34. public static void main(String[] args)   {
  35. //创建MQTT对象
  36. MQTT mqtt = new MQTT();
  37. //设置mqtt broker的ip和端口
  38. try {
  39. mqtt.setHost(CONNECTION_STRING);
  40. } catch (URISyntaxException e1) {
  41. e1.printStackTrace();
  42. }
  43. //连接前清空会话信息
  44. mqtt.setCleanSession(CLEAN_START);
  45. //设置重新连接的次数
  46. mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
  47. //设置重连的间隔时间
  48. mqtt.setReconnectDelay(RECONNECTION_DELAY);
  49. //设置心跳时间
  50. mqtt.setKeepAlive(KEEP_ALIVE);
  51. //设置缓冲的大小
  52. mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
  53. //获取mqtt的连接对象CallbackConnection
  54. final CallbackConnection connection= mqtt.callbackConnection();
  55. try {
  56. //添加连接的监听事件
  57. connection.listener(new Listener() {
  58. public void onDisconnected() {
  59. }
  60. public void onConnected() {
  61. System.out.println(" 连接成功!");
  62. }
  63. public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
  64. }
  65. public void onFailure(Throwable value) {
  66. }
  67. });
  68. //添加连接事件
  69. connection.connect(new Callback<Void>() {
  70. /**
  71. * 连接失败的操作
  72. */
  73. public void onFailure(Throwable value) {
  74. // If we could not connect to the server.
  75. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.onFailure  连接失败......"+value.getMessage());
  76. value.printStackTrace();
  77. }
  78. /**
  79. * 连接成功的操作
  80. * @param v
  81. */
  82. public void onSuccess(Void v) {
  83. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.onSuccess 订阅连接成功......");
  84. //订阅相关的主题
  85. connection.subscribe(topics, new Callback<byte[]>() {
  86. public void onSuccess(byte[] qoses) {
  87. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.subscribe.onSuccess 订阅主题成功......");
  88. }
  89. public void onFailure(Throwable value) {
  90. // subscribe failed.
  91. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.subscribe.onSuccess 订阅主题失败!"+value.getMessage());
  92. value.printStackTrace();
  93. }
  94. });
  95. }
  96. });
  97. Thread.sleep(100000000000L);
  98. } catch (Exception e) {
  99. // TODO Auto-generated catch block
  100. e.printStackTrace();
  101. }finally{
  102. //            //连接断开
  103. connection.disconnect(new Callback<Void>() {
  104. public void onSuccess(Void v) {
  105. // called once the connection is disconnected.
  106. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onSuccess called once the connection is disconnected.");
  107. }
  108. public void onFailure(Throwable value) {
  109. // Disconnects never fail.
  110. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onFailure  Disconnects never fail."+value.getMessage());
  111. value.printStackTrace();
  112. }
  113. });
  114. }
  115. }
  116. }