1.简介
一个java写的mqtt客户端。项目地址:
https://github.com/fusesource/mqtt-client
2.引入fusesource-mqtt-client库
- File--->Project Structure--->Dependencies
- 点绿色+
- 在弹出的窗口中输入“‘mqtt-client”回车搜索
- 在结果中选择org.fusesource.mqtt-client:mqtt-client:1.xxx
3.示例代码
3.1 参考代码
activeMQ服务端软件内提供的示例代码 | apache-activemq-5.15.0/examples/mqtt/java/ |
dzone提供的示例 | https://dzone.com/articles/android-mqtt-activemq |
github上的示例代码 | https://github.com/fusesource/mqtt-client#using-the-callbackcontinuation-passing-based-api |
3.2 效果
3.3 源码
1 package com.example.tt.mqtt;
2
3 import android.app.NotificationManager;
4 import android.app.PendingIntent;
5 import android.content.Context;
6 import android.content.Intent;
7 import android.os.Bundle;
8 import android.support.v4.app.TaskStackBuilder;
9 import android.support.v7.app.AppCompatActivity;
10 import android.support.v7.app.NotificationCompat;
11 import android.util.Log;
12 import android.view.View;
13 import android.widget.Button;
14 import android.widget.CheckBox;
15 import android.widget.CompoundButton;
16 import android.widget.EditText;
17 import android.widget.TextView;
18 import android.widget.ToggleButton;
19
20 import org.fusesource.hawtbuf.Buffer;
21 import org.fusesource.hawtbuf.UTF8Buffer;
22 import org.fusesource.mqtt.client.BlockingConnection;
23 import org.fusesource.mqtt.client.Callback;
24 import org.fusesource.mqtt.client.CallbackConnection;
25 import org.fusesource.mqtt.client.Listener;
26 import org.fusesource.mqtt.client.MQTT;
27 import org.fusesource.mqtt.client.Message;
28 import org.fusesource.mqtt.client.QoS;
29 import org.fusesource.mqtt.client.Topic;
30
31 import java.net.URISyntaxException;
32
33
34 public class MainActivity extends AppCompatActivity implements View.OnClickListener,CompoundButton.OnCheckedChangeListener {
35
36 final static String TAG = "MQTTClient";
37
38 //UI
39 ToggleButton btnConnect;
40 Button btnPublish, btnSubscribe;
41 EditText edtServer,edtMessage,edtTopic,edtClientID;
42 TextView received;
43 CheckBox cbxPersist;
44
45 //MQTT
46 final static String clientId = "android";
47 final static short keepAlive = 255;
48 final static String host = "192.168.1.101";
49 final static String user = "guest";
50 final static int port = 1883;
51 final static String password = "admin";
52
53 MQTT mqtt ;
54 Listener listener ;
55 CallbackConnection callbackConnection ;
56 Callback<Void> connectCallback ;
57 Callback<byte[]> subscribeCallback ;
58 Callback<Void> publishCallback ;
59 Callback<Void> disconnectCallback ;
60
61 {
62 connectCallback = new Callback<Void>(){
63
64 @Override
65 public void onSuccess(Void value) {
66 Log.d(TAG, "connectCallback : onSuccess");
67 received.post(new Runnable() {
68 @Override
69 public void run() {
70 received.setText("connectCallback success");
71 }
72 });
73
74 }
75 @Override
76 public void onFailure(Throwable value) {
77 value.printStackTrace();
78 Log.d(TAG, "connectCallback : failure");
79 received.post(new Runnable() {
80 @Override
81 public void run() {
82 received.setText("connectCallback failure");
83 }
84 });
85 System.exit(-2);
86 }
87 };
88 disconnectCallback = new Callback<Void>(){
89
90 public void onSuccess(Void value) {
91 received.post(new Runnable() {
92 @Override
93 public void run() {
94 received.setText("disconnect success");
95 }
96 });
97 }
98 public void onFailure(Throwable e) {
99 received.post(new Runnable() {
100 @Override
101 public void run() {
102 received.setText("disconnect failure");
103 }
104 });
105 }
106 };
107
108 listener = new Listener() {
109
110 @Override
111 public void onConnected() {
112 Log.d(TAG, "listener onConnected");
113 received.post(new Runnable() {
114 @Override
115 public void run() {
116 received.setText("listener onConnected");
117 }
118 });
119 }
120
121 @Override
122 public void onDisconnected() {
123 Log.d(TAG, "listener onDisconnected");
124 received.post(new Runnable() {
125 @Override
126 public void run() {
127 received.setText("listener onDisconnected");
128 }
129 });
130 }
131
132 @Override
133 public void onPublish(final UTF8Buffer topic, Buffer msg, Runnable ack) {
134 final String body = msg.utf8().toString();
135 Log.d(TAG, "onPublish: " + body);
136 received.post(new Runnable() {
137 @Override
138 public void run() {
139 makeNotification(topic.toString(),body);
140 received.append("\nreceived : " + body);
141 }
142 });
143 }
144
145 @Override
146 public void onFailure(Throwable value) {
147 Log.d(TAG, "listener failure");
148 received.post(new Runnable() {
149 @Override
150 public void run() {
151 received.setText("listener failure");
152 }
153 });
154 }
155 };
156
157 subscribeCallback = new Callback<byte[]>() {
158
159 public void onSuccess(byte[] qoses) {
160 Log.d(TAG, "subscribe : success");
161
162 received.post(new Runnable() {
163 @Override
164 public void run() {
165 received.setText("subscribe " + edtTopic.getText().toString() + ": success");
166 }
167 });
168 }
169 public void onFailure(Throwable value) {
170 value.printStackTrace();
171 Log.d(TAG, "subscribe : failure");
172 received.post(new Runnable() {
173 @Override
174 public void run() {
175 received.setText("subscribe " + edtTopic.getText().toString() + ": failure");
176 }
177 });
178 System.exit(-2);
179 }
180 };
181 publishCallback = new Callback<Void>() {
182 @Override
183 public void onSuccess(Void value) {
184 Log.d(TAG, "onSuccess: ");
185 }
186
187 @Override
188 public void onFailure(Throwable value) {
189 Log.d(TAG, "onFailure: ");
190 }
191 };
192 }
193
194 void connect(){
195 callbackConnection.connect(connectCallback);
196 }
197
198 void disconnect(){
199 callbackConnection.disconnect(disconnectCallback);
200 }
201
202 void subscribe(){
203
204 String topicName = edtTopic.getText().toString().trim();
205
206 Topic topics[] = new Topic[]{new Topic(topicName,QoS.AT_LEAST_ONCE)};
207
208 callbackConnection.subscribe(topics,subscribeCallback);
209
210 }
211
212 void publish(){
213
214 String data = edtMessage.getText().toString();
215
216 String topicName = edtTopic.getText().toString().trim();
217
218 callbackConnection.publish(topicName,data.getBytes(),QoS.AT_LEAST_ONCE,false,publishCallback);
219
220 }
221
222 void initMqtt(){
223
224 mqtt = new MQTT();
225 try {
226 mqtt.setHost(host, port);
227 mqtt.setUserName(user);
228 mqtt.setPassword(password);
229 mqtt.setKeepAlive(keepAlive);
230 mqtt.getClientId();
231 callbackConnection = mqtt.callbackConnection();
232 callbackConnection.listener(listener);
233
234 } catch (URISyntaxException e) {
235 e.printStackTrace();
236 Log.e(TAG,"-=-=-=-=-=-=------------====\n initMqtt exception : " + e.getMessage());
237 }
238 }
239
240 @Override
241 protected void onCreate(Bundle savedInstanceState) {
242 super.onCreate(savedInstanceState);
243 setContentView(R.layout.activity_main);
244
245 received = (TextView) findViewById(R.id.txt_received);
246 btnSubscribe= (Button) findViewById(R.id.btn_subscribe);
247 btnConnect = (ToggleButton)findViewById(R.id.btn_connect);
248 btnPublish = (Button) findViewById(R.id.btn_publish);
249 edtServer = (EditText) findViewById(R.id.edt_server);
250 edtTopic = (EditText) findViewById(R.id.edt_topic);
251 edtMessage = (EditText) findViewById(R.id.edt_message);
252 edtClientID = (EditText) findViewById(R.id.edt_clientID);
253 cbxPersist = (CheckBox) findViewById(R.id.cbx_persist);
254
255 btnConnect .setOnClickListener(this);
256 btnConnect .setOnCheckedChangeListener(this);
257 cbxPersist .setOnCheckedChangeListener(this);
258 btnSubscribe.setOnClickListener(this);
259 btnPublish .setOnClickListener(this);
260
261 initMqtt();
262
263 }
264 void makeNotification(final String title,final String content){
265
266 NotificationCompat.Builder mBuilder = new NotificationCompat.Builder(this);
267 mBuilder.setSmallIcon(R.drawable.mail_3_small);//must
268 mBuilder.setContentTitle(title);
269 mBuilder.setContentText(content);
270 // Creates an explicit intent for an Activity in your app
271 Intent resultIntent = new Intent(this, MainActivity.class);
272
273 // The stack builder object will contain an artificial back stack for the
274 // started Activity.
275 // This ensures that navigating backward from the Activity leads out of
276 // your app to the Home screen.
277 TaskStackBuilder stackBuilder = TaskStackBuilder.create(this);
278 // Adds the back stack for the Intent (but not the Intent itself)
279 stackBuilder.addParentStack(MainActivity.class);
280 // Adds the Intent that starts the Activity to the top of the stack
281 stackBuilder.addNextIntent(resultIntent);
282 PendingIntent resultPendingIntent = stackBuilder.getPendingIntent(0,PendingIntent.FLAG_UPDATE_CURRENT);
283 mBuilder.setContentIntent(resultPendingIntent);
284 NotificationManager mNotificationManager =
285 (NotificationManager) getSystemService(Context.NOTIFICATION_SERVICE);
286
287 // mNotificationId is a unique integer your app uses to identify the
288 // notification. For example, to cancel the notification, you can pass its ID
289 // number to NotificationManager.cancel().
290 mNotificationManager.notify(R.string.app_name, mBuilder.build());
291 }
292
293 void blocking(){
294 BlockingConnection connection = mqtt.blockingConnection();
295 try {
296 connection.connect();
297 //publish
298 connection.publish("foo", "Hello".getBytes(), QoS.AT_LEAST_ONCE, false);
299
300 //subscribe
301 Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
302 byte[] qoses = connection.subscribe(topics);
303
304 //receive message
305 Message message = connection.receive();
306 System.out.println(message.getTopic());
307 byte[] payload = message.getPayload();
308 // process the message then:
309 message.ack();
310
311 //disconnect
312 connection.disconnect();
313 } catch (Exception e) {
314 e.printStackTrace();
315 }
316 }
317
318 @Override
319 public void onClick(View view) {
320 switch (view.getId()){
321 case R.id.btn_publish : publish(); break;
322 case R.id.btn_subscribe : subscribe(); break;
323 }
324
325 }
326
327 @Override
328 public void onCheckedChanged(CompoundButton compoundButton, boolean b) {
329 switch (compoundButton.getId()){
330 case R.id.btn_connect:
331 if (!b){
332 connect();
333 }else{
334 disconnect();
335 }
336 break;
337 case R.id.cbx_persist:
338 if (mqtt != null) {
339 mqtt.setClientId(edtClientID.getText().toString().trim());
340 mqtt.setCleanSession(!b);
341 }
342 break;
343 }
344 }
345
346 }
3.4 完整下载地址
https://git.oschina.net/xi/mqtt-client-demo.git
4.MQTT 常用方法介绍
setClientId |
Use to set the client Id of the session. This is what an MQTT server uses to identify a session where The id must be 23 characters or less. Defaults to auto generated id (based on your socket address, port and timestamp). 每个客户端id不要相同
|
setCleanSession | Set to false if you want the MQTT server to persist topic subscriptions and ack positions across client sessions. Defaults to true.设置false时,服务端将不清除会话,这样就可以持久保存订阅关系。
|
setKeepAlive |
Configures the Keep Alive timer in seconds. Defines the maximum time interval between messages received from a client. It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout. 设置保活时间,单位是秒,默认为tpc连接时间。
|
setUserName | Sets the user name used to authenticate against the server.设置服务端验证的用户名
|
setPassword | Sets the password used to authenticate against the server.设置验证用户的密码
|
setWillTopic |
If set the server will publish the client's Will message to the specified topics if the client has an unexpected disconnection. 当客户端异常断开时,服务器按这里指定的主题发意愿消息。
|
setWillMessage | The Will message to send. Defaults to a zero length message.意愿消息
|
setWillQos | Sets the quality of service to use for the Will message. Defaults to QoS.AT_MOST_ONCE.意愿消息的QoS
|
setWillRetain | Set to true if you want the Will to be published with the retain option. |
setVersion | Set to "3.1.1" to use MQTT version 3.1.1. Otherwise defaults to the 3.1 protocol version.设置MQTT协议版本
|