Springboot整合MQTT
1.首先在中添加mqtt的配置
2.在文件中倒入依赖
<dependency>
<groupId></groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId></groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.0.</version>
</dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
3.编写mqtt推送消息类
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
@Value("${}")
private String username;
@Value("${}")
private String password;
@Value("${}")
private String hostUrl;
@Value("${}")
private String clientId;
@Value("${}")
private String defaultTopic;
@Value("${}")
private int completionTimeout ; //连接超时
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
(true);
(10);
(90);
(true);
(username);
(());
(new String[]{hostUrl});
(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
(getMqttConnectOptions());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
(true);
(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
4.编写mqtt订阅类
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttReceiveConfig {
@Value("${}")
private String username;
@Value("${}")
private String password;
@Value("${}")
private String hostUrl;
@Value("${}")
private String clientId;
@Value("${}")
private String defaultTopic;
@Value("${}")
private int completionTimeout ; //连接超时
@Autowired
private MqttGateway mqttGateway;
@Autowired
private BTempChargesService bTempChargesService;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
(true);
(10);
(90);
(true);
(username);
(());
(new String[]{hostUrl});
(2);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
(getMqttConnectOptions());
return factory;
}
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
defaultTopic);
(completionTimeout);
(new DefaultPahoMessageConverter());
(1);
(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
("主题:{},消息接收到的数据:{}", ().get("mqtt_receivedTopic"), ());
}
};
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
5.测试,这里使用的是工具
publish就可以看到订阅回复的信息(这里我省略了部分代码)。