1
package .one_way;
2
3
import .;
4
import .;
5
import .;
6
import .;
7
import .;
8
import .;
9
import .;
10
import .;
11
/**
12 *
13 * Title:Server
14 * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
15 * @author chenrl
16 * 2016年1月6日下午3:29:28
17 */
18
public
class Server {
19
20
public
static
final String HOST =
"tcp://192.168.1.3:61613";
21
public
static
final String TOPIC =
"toclient/124";
22
public
static
final String TOPIC125 =
"toclient/125";
23
private
static
final String clientid =
"server";
24
25
private MqttClient client;
26
private MqttTopic topic;
27
private MqttTopic topic125;
28
private String userName =
"admin";
29
private String passWord =
"password";
30
31
private MqttMessage message;
32
33
public Server()
throws MqttException {
34
// MemoryPersistence设置clientid的保存形式,默认为以内存保存
35 client =
new MqttClient(HOST, clientid,
new MemoryPersistence());
36 connect();
37 }
38
39
private
void connect() {
40 MqttConnectOptions
options =
new MqttConnectOptions();
41
options.setCleanSession(
false);
42
options.setUserName(userName);
43
options.setPassword(());
44
// 设置超时时间
45
options.setConnectionTimeout(
10);
46
// 设置会话心跳时间
47
options.setKeepAliveInterval(
20);
48
try {
49 (
new PushCallback());
50 (
options);
51 topic = (TOPIC);
52 topic125 = (TOPIC125);
53 }
catch (Exception e) {
54 ();
55 }
56 }
57
58
public
void publish(MqttTopic topic , MqttMessage message)
throws MqttPersistenceException,
59 MqttException {
60 MqttDeliveryToken token = (message);
61 ();
62 .
println(
"message is published completely! "
63 + ());
64 }
65
66
public
static
void main(String[] args)
throws MqttException {
67 Server server =
new Server();
68
69 =
new MqttMessage();
70 (
2);
71 (
true);
72 (
"给客户端124推送的信息".getBytes());
73 ( , );
74
75 =
new MqttMessage();
76 (
2);
77 (
true);
78 (
"给客户端125推送的信息".getBytes());
79 (server.topic125 , );
80
81 .
println(() +
"------ratained状态");
82 }
83 }