采用MQTT协议实现android消息推送(4)选fusesource

时间:2022-02-09 13:24:22

1.简介

  一个java写的mqtt客户端。项目地址:

  https://github.com/fusesource/mqtt-client

2.引入fusesource-mqtt-client库

  • File--->Project Structure--->Dependencies
  • 点绿色
  • 在弹出的窗口中输入“‘mqtt-client”回车搜索
  • 在结果中选择org.fusesource.mqtt-client:mqtt-client:1.xxx

3.示例代码 

3.1 参考代码

activeMQ服务端软件内提供的示例代码 apache-activemq-5.15.0/examples/mqtt/java/
dzone提供的示例 https://dzone.com/articles/android-mqtt-activemq
github上的示例代码 https://github.com/fusesource/mqtt-client#using-the-callbackcontinuation-passing-based-api

3.2 效果

      采用MQTT协议实现android消息推送(4)选fusesource

3.3 源码

  1 package com.example.tt.mqtt;
2
3 import android.app.NotificationManager;
4 import android.app.PendingIntent;
5 import android.content.Context;
6 import android.content.Intent;
7 import android.os.Bundle;
8 import android.support.v4.app.TaskStackBuilder;
9 import android.support.v7.app.AppCompatActivity;
10 import android.support.v7.app.NotificationCompat;
11 import android.util.Log;
12 import android.view.View;
13 import android.widget.Button;
14 import android.widget.CheckBox;
15 import android.widget.CompoundButton;
16 import android.widget.EditText;
17 import android.widget.TextView;
18 import android.widget.ToggleButton;
19
20 import org.fusesource.hawtbuf.Buffer;
21 import org.fusesource.hawtbuf.UTF8Buffer;
22 import org.fusesource.mqtt.client.BlockingConnection;
23 import org.fusesource.mqtt.client.Callback;
24 import org.fusesource.mqtt.client.CallbackConnection;
25 import org.fusesource.mqtt.client.Listener;
26 import org.fusesource.mqtt.client.MQTT;
27 import org.fusesource.mqtt.client.Message;
28 import org.fusesource.mqtt.client.QoS;
29 import org.fusesource.mqtt.client.Topic;
30
31 import java.net.URISyntaxException;
32
33
34 public class MainActivity extends AppCompatActivity implements View.OnClickListener,CompoundButton.OnCheckedChangeListener {
35
36 final static String TAG = "MQTTClient";
37
38 //UI
39 ToggleButton btnConnect;
40 Button btnPublish, btnSubscribe;
41 EditText edtServer,edtMessage,edtTopic,edtClientID;
42 TextView received;
43 CheckBox cbxPersist;
44
45 //MQTT
46 final static String clientId = "android";
47 final static short keepAlive = 255;
48 final static String host = "192.168.1.101";
49 final static String user = "guest";
50 final static int port = 1883;
51 final static String password = "admin";
52
53 MQTT mqtt ;
54 Listener listener ;
55 CallbackConnection callbackConnection ;
56 Callback<Void> connectCallback ;
57 Callback<byte[]> subscribeCallback ;
58 Callback<Void> publishCallback ;
59 Callback<Void> disconnectCallback ;
60
61 {
62 connectCallback = new Callback<Void>(){
63
64 @Override
65 public void onSuccess(Void value) {
66 Log.d(TAG, "connectCallback : onSuccess");
67 received.post(new Runnable() {
68 @Override
69 public void run() {
70 received.setText("connectCallback success");
71 }
72 });
73
74 }
75 @Override
76 public void onFailure(Throwable value) {
77 value.printStackTrace();
78 Log.d(TAG, "connectCallback : failure");
79 received.post(new Runnable() {
80 @Override
81 public void run() {
82 received.setText("connectCallback failure");
83 }
84 });
85 System.exit(-2);
86 }
87 };
88 disconnectCallback = new Callback<Void>(){
89
90 public void onSuccess(Void value) {
91 received.post(new Runnable() {
92 @Override
93 public void run() {
94 received.setText("disconnect success");
95 }
96 });
97 }
98 public void onFailure(Throwable e) {
99 received.post(new Runnable() {
100 @Override
101 public void run() {
102 received.setText("disconnect failure");
103 }
104 });
105 }
106 };
107
108 listener = new Listener() {
109
110 @Override
111 public void onConnected() {
112 Log.d(TAG, "listener onConnected");
113 received.post(new Runnable() {
114 @Override
115 public void run() {
116 received.setText("listener onConnected");
117 }
118 });
119 }
120
121 @Override
122 public void onDisconnected() {
123 Log.d(TAG, "listener onDisconnected");
124 received.post(new Runnable() {
125 @Override
126 public void run() {
127 received.setText("listener onDisconnected");
128 }
129 });
130 }
131
132 @Override
133 public void onPublish(final UTF8Buffer topic, Buffer msg, Runnable ack) {
134 final String body = msg.utf8().toString();
135 Log.d(TAG, "onPublish: " + body);
136 received.post(new Runnable() {
137 @Override
138 public void run() {
139 makeNotification(topic.toString(),body);
140 received.append("\nreceived : " + body);
141 }
142 });
143 }
144
145 @Override
146 public void onFailure(Throwable value) {
147 Log.d(TAG, "listener failure");
148 received.post(new Runnable() {
149 @Override
150 public void run() {
151 received.setText("listener failure");
152 }
153 });
154 }
155 };
156
157 subscribeCallback = new Callback<byte[]>() {
158
159 public void onSuccess(byte[] qoses) {
160 Log.d(TAG, "subscribe : success");
161
162 received.post(new Runnable() {
163 @Override
164 public void run() {
165 received.setText("subscribe " + edtTopic.getText().toString() + ": success");
166 }
167 });
168 }
169 public void onFailure(Throwable value) {
170 value.printStackTrace();
171 Log.d(TAG, "subscribe : failure");
172 received.post(new Runnable() {
173 @Override
174 public void run() {
175 received.setText("subscribe " + edtTopic.getText().toString() + ": failure");
176 }
177 });
178 System.exit(-2);
179 }
180 };
181 publishCallback = new Callback<Void>() {
182 @Override
183 public void onSuccess(Void value) {
184 Log.d(TAG, "onSuccess: ");
185 }
186
187 @Override
188 public void onFailure(Throwable value) {
189 Log.d(TAG, "onFailure: ");
190 }
191 };
192 }
193
194 void connect(){
195 callbackConnection.connect(connectCallback);
196 }
197
198 void disconnect(){
199 callbackConnection.disconnect(disconnectCallback);
200 }
201
202 void subscribe(){
203
204 String topicName = edtTopic.getText().toString().trim();
205
206 Topic topics[] = new Topic[]{new Topic(topicName,QoS.AT_LEAST_ONCE)};
207
208 callbackConnection.subscribe(topics,subscribeCallback);
209
210 }
211
212 void publish(){
213
214 String data = edtMessage.getText().toString();
215
216 String topicName = edtTopic.getText().toString().trim();
217
218 callbackConnection.publish(topicName,data.getBytes(),QoS.AT_LEAST_ONCE,false,publishCallback);
219
220 }
221
222 void initMqtt(){
223
224 mqtt = new MQTT();
225 try {
226 mqtt.setHost(host, port);
227 mqtt.setUserName(user);
228 mqtt.setPassword(password);
229 mqtt.setKeepAlive(keepAlive);
230 mqtt.getClientId();
231 callbackConnection = mqtt.callbackConnection();
232 callbackConnection.listener(listener);
233
234 } catch (URISyntaxException e) {
235 e.printStackTrace();
236 Log.e(TAG,"-=-=-=-=-=-=------------====\n initMqtt exception : " + e.getMessage());
237 }
238 }
239
240 @Override
241 protected void onCreate(Bundle savedInstanceState) {
242 super.onCreate(savedInstanceState);
243 setContentView(R.layout.activity_main);
244
245 received = (TextView) findViewById(R.id.txt_received);
246 btnSubscribe= (Button) findViewById(R.id.btn_subscribe);
247 btnConnect = (ToggleButton)findViewById(R.id.btn_connect);
248 btnPublish = (Button) findViewById(R.id.btn_publish);
249 edtServer = (EditText) findViewById(R.id.edt_server);
250 edtTopic = (EditText) findViewById(R.id.edt_topic);
251 edtMessage = (EditText) findViewById(R.id.edt_message);
252 edtClientID = (EditText) findViewById(R.id.edt_clientID);
253 cbxPersist = (CheckBox) findViewById(R.id.cbx_persist);
254
255 btnConnect .setOnClickListener(this);
256 btnConnect .setOnCheckedChangeListener(this);
257 cbxPersist .setOnCheckedChangeListener(this);
258 btnSubscribe.setOnClickListener(this);
259 btnPublish .setOnClickListener(this);
260
261 initMqtt();
262
263 }
264 void makeNotification(final String title,final String content){
265
266 NotificationCompat.Builder mBuilder = new NotificationCompat.Builder(this);
267 mBuilder.setSmallIcon(R.drawable.mail_3_small);//must
268 mBuilder.setContentTitle(title);
269 mBuilder.setContentText(content);
270 // Creates an explicit intent for an Activity in your app
271 Intent resultIntent = new Intent(this, MainActivity.class);
272
273 // The stack builder object will contain an artificial back stack for the
274 // started Activity.
275 // This ensures that navigating backward from the Activity leads out of
276 // your app to the Home screen.
277 TaskStackBuilder stackBuilder = TaskStackBuilder.create(this);
278 // Adds the back stack for the Intent (but not the Intent itself)
279 stackBuilder.addParentStack(MainActivity.class);
280 // Adds the Intent that starts the Activity to the top of the stack
281 stackBuilder.addNextIntent(resultIntent);
282 PendingIntent resultPendingIntent = stackBuilder.getPendingIntent(0,PendingIntent.FLAG_UPDATE_CURRENT);
283 mBuilder.setContentIntent(resultPendingIntent);
284 NotificationManager mNotificationManager =
285 (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
286
287 // mNotificationId is a unique integer your app uses to identify the
288 // notification. For example, to cancel the notification, you can pass its ID
289 // number to NotificationManager.cancel().
290 mNotificationManager.notify(R.string.app_name, mBuilder.build());
291 }
292
293 void blocking(){
294 BlockingConnection connection = mqtt.blockingConnection();
295 try {
296 connection.connect();
297 //publish
298 connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);
299
300 //subscribe
301 Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
302 byte[] qoses = connection.subscribe(topics);
303
304 //receive message
305 Message message = connection.receive();
306 System.out.println(message.getTopic());
307 byte[] payload = message.getPayload();
308 // process the message then:
309 message.ack();
310
311 //disconnect
312 connection.disconnect();
313 } catch (Exception e) {
314 e.printStackTrace();
315 }
316 }
317
318 @Override
319 public void onClick(View view) {
320 switch (view.getId()){
321 case R.id.btn_publish : publish(); break;
322 case R.id.btn_subscribe : subscribe(); break;
323 }
324
325 }
326
327 @Override
328 public void onCheckedChanged(CompoundButton compoundButton, boolean b) {
329 switch (compoundButton.getId()){
330 case R.id.btn_connect:
331 if (!b){
332 connect();
333 }else{
334 disconnect();
335 }
336 break;
337 case R.id.cbx_persist:
338 if (mqtt != null) {
339 mqtt.setClientId(edtClientID.getText().toString().trim());
340 mqtt.setCleanSession(!b);
341 }
342 break;
343 }
344 }
345
346 }

3.4 完整下载地址

  https://git.oschina.net/xi/mqtt-client-demo.git

4.MQTT 常用方法介绍

setClientId

Use to set the client Id of the session. This is what an MQTT server uses to identify a session where setCleanSession(false); is being used.

The id must be 23 characters or less. Defaults to auto generated id (based on your socket address, port and timestamp).

每个客户端id不要相同
指定id后,才可以调用setCleanSession,持久保存订阅的会话,哪个客户端订阅了哪个主题就保存在某个会话中。
setCleanSession Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. Defaults to true.
设置false时,服务端将不清除会话,这样就可以持久保存订阅关系。
setKeepAlive

Configures the Keep Alive timer in seconds. Defines the maximum time interval between messages received from a client.

It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout.

设置保活时间,单位是秒,默认为tpc连接时间。
setUserName Sets the user name used to authenticate against the server.
设置服务端验证的用户名
setPassword Sets the password used to authenticate against the server.
设置验证用户的密码
setWillTopic

If set the server will publish the client's Will message to the specified topics if the client has an unexpected disconnection.

当客户端异常断开时,服务器按这里指定的主题发意愿消息。
setWillMessage The Will message to send. Defaults to a zero length message.
意愿消息
setWillQos Sets the quality of service to use for the Will message. Defaults to QoS.AT_MOST_ONCE.
意愿消息的QoS
setWillRetain Set to true if you want the Will to be published with the retain option.
setVersion Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the 3.1 protocol version.
设置MQTT协议版本