本示例为Paho Java客户端,主体实现行情数据推送
Paho Java客户端提供了两个API:MqttAsyncClient提供了一个完全异步的API,通过已注册的回调通知完成活动。 MqttClient是MqttAsyncClient的一个同步包装,其中函数与应用程序同步。
- 项目网站:Eclipse Paho | The Eclipse Foundation
- Paho Java:/paho/clients/java/
- GitHub:GitHub - eclipse/: Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
发送消息
import .slf4j.Slf4j;
import .;
import .;
import .;
import .;
import .;
import .;
import .;
@Slf4j
public class MqttClient {
public static . mqttClient = null;
private static MemoryPersistence memoryPersistence = null;
private static MqttConnectOptions mqttConnectOptions = null;
private static MqttClient instance = null;
public static MqttClient getInstance() throws Exception {
if (instance == null) {
synchronized () {
if (instance == null) {
instance = new MqttClient();
}
}
}
return instance;
}
public MqttClient(){
init("admin");
}
public void init(String clientId) {
//初始化连接设置对象
mqttConnectOptions = new MqttConnectOptions();
//初始化MqttClient
if(null != mqttConnectOptions) {
// true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
(true);
// 设置连接超时
(30);
// 设置持久化方式
memoryPersistence = new MemoryPersistence();
if(null != memoryPersistence && null != clientId) {
try {
mqttClient = new .("tcp://47.111.102.176:1883", clientId,memoryPersistence);
} catch (MqttException e) {
// TODO Auto-generated catch block
();
}
}else {
}
}else {
("mqttConnectOptions对象为空");
}
//设置连接和回调
if(null != mqttClient) {
if(!()) {
try {
("创建连接:" + ());
(mqttConnectOptions);
} catch (MqttException e) {
// TODO Auto-generated catch block
();
}
}
}else {
("mqttClient为空");
}
}
// 关闭连接
public void closeConnect() {
//关闭存储方式
if(null != memoryPersistence) {
try {
();
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
();
}
}else {
("memoryPersistence is null");
}
// 关闭连接
if(null != mqttClient) {
if(()) {
try {
();
();
} catch (MqttException e) {
// TODO Auto-generated catch block
();
}
}else {
("mqttClient is not connect");
}
}else {
("mqttClient is null");
}
}
// 发布消息
public void publishMessage(String pubTopic,String message,int qos) {
if(null != mqttClient&& ()) {
MqttMessage mqttMessage = new MqttMessage();
(qos);
(());
MqttTopic topic = (pubTopic);
if(null != topic) {
try {
MqttDeliveryToken publish = (mqttMessage);
if(!()) {
//("消息发布成功");
}
} catch (MqttException e) {
// TODO Auto-generated catch block
();
}
}
}else {
reConnect();
}
}
// 重新连接
public void reConnect() {
if(null != mqttClient) {
if(!()) {
if(null != mqttConnectOptions) {
try {
(mqttConnectOptions);
} catch (MqttException e) {
// TODO Auto-generated catch block
();
}
}else {
("mqttConnectOptions is null");
}
}else {
("mqttClient is null or connect");
}
}else {
init("admin");
}
}
// 订阅主题
public void subTopic(String topic) {
if(null != mqttClient&& ()) {
try {
(topic, 1);
} catch (MqttException e) {
// TODO Auto-generated catch block
();
}
}else {
("mqttClient is error");
}
}
// 清空主题
public void cleanTopic(String topic) {
if(null != mqttClient&& !()) {
try {
(topic);
} catch (MqttException e) {
// TODO Auto-generated catch block
();
}
}else {
("mqttClient is error");
}
}
public static void main(String [] args){
MqttClient mqttClient = new MqttClient();
("marketAll", "12312312312", 1);
}
}