package ;
import .slf4j.Slf4j;
import .;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
public class MqttConfiguration {
public String appid;
//订阅主题
public String[] inputTopic;
// @Value("${}")
// private String[] outTopic;//发布主题
//服务器地址以及端口
public String[] mqttServices;
//用户名
public String user;
//密码
public String password;
//心跳时间
public Integer KeepAliveInterval;
//是否不保持session,默认为session保持
public Boolean CleanSession;
//是否自动重联,默认为开启自动重联
public Boolean AutomaticReconnect;
//连接超时,默认为30秒
public Long CompletionTimeout;
//通信质量
public Integer Qos;
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类
MqttConnectOptions options = new MqttConnectOptions();//连接参数
(mqttServices);//连接地址
if (user != null) {
(user);//用户名
}
if (password != null) {
(());//密码
}
(KeepAliveInterval);//心跳时间
(AutomaticReconnect);//断开是否自动重联
(CleanSession);//保持session
(options);
return factory;
}
private MqttReceiveHandle mqttReceiveHandle;
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid + "__consumer",
mqttClientFactory(), inputTopic);
(60000);
(new DefaultPahoMessageConverter());
(10000);
(0);
(mqttInputChannel());
return adapter;
}
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* mqtt订阅者使用信道名称 mqttInboundChannel
*
* @return
*/
public MessageHandler handler() {
return new MessageHandler() {
public void handleMessage(Message<?> message) throws MessagingException {
(message);
}
};
}
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(appid + "__producer", mqttClientFactory());
// 如果设置成true,发送消息时将不会阻塞。
(true);
(inputTopic[0]);
return messageHandler;
}
public MessageChannel mqttOutboundChannel() {
// DirectChannel dc =new DirectChannel();
// (mqttOutbound());
return new DirectChannel();
}
/**
* mqtt发布者信道名称 mqttOutboundChannel
* @return
*/
/* @Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
("AAAAA");
(message);
}
};
}*/
}