SpringBoot实现MQTT消息发送和接收

时间:2025-04-02 21:50:34
  • package ;
  • import .slf4j.Slf4j;
  • import .;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • import ;
  • @Configuration
  • @IntegrationComponentScan
  • @Slf4j
  • public class MqttConfiguration {
  • @Value("${}")
  • public String appid;
  • @Value("${}")
  • //订阅主题
  • public String[] inputTopic;
  • // @Value("${}")
  • // private String[] outTopic;//发布主题
  • @Value("${}")
  • //服务器地址以及端口
  • public String[] mqttServices;
  • @Value("${}")
  • //用户名
  • public String user;
  • @Value("${}")
  • //密码
  • public String password;
  • @Value("${}")
  • //心跳时间
  • public Integer KeepAliveInterval;
  • @Value("${}")
  • //是否不保持session,默认为session保持
  • public Boolean CleanSession;
  • @Value("${}")
  • //是否自动重联,默认为开启自动重联
  • public Boolean AutomaticReconnect;
  • @Value("${}")
  • //连接超时,默认为30秒
  • public Long CompletionTimeout;
  • @Value("${}")
  • //通信质量
  • public Integer Qos;
  • @Bean
  • public MqttPahoClientFactory mqttClientFactory() {
  • DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//连接工厂类
  • MqttConnectOptions options = new MqttConnectOptions();//连接参数
  • (mqttServices);//连接地址
  • if (user != null) {
  • (user);//用户名
  • }
  • if (password != null) {
  • (());//密码
  • }
  • (KeepAliveInterval);//心跳时间
  • (AutomaticReconnect);//断开是否自动重联
  • (CleanSession);//保持session
  • (options);
  • return factory;
  • }
  • @Autowired
  • private MqttReceiveHandle mqttReceiveHandle;
  • @Bean
  • public MessageProducerSupport mqttInbound() {
  • MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid + "__consumer",
  • mqttClientFactory(), inputTopic);
  • (60000);
  • (new DefaultPahoMessageConverter());
  • (10000);
  • (0);
  • (mqttInputChannel());
  • return adapter;
  • }
  • @Bean
  • public MessageChannel mqttInputChannel() {
  • return new DirectChannel();
  • }
  • /**
  • * mqtt订阅者使用信道名称 mqttInboundChannel
  • *
  • * @return
  • */
  • @Bean
  • @ServiceActivator(inputChannel = "mqttInputChannel")
  • public MessageHandler handler() {
  • return new MessageHandler() {
  • @Override
  • public void handleMessage(Message<?> message) throws MessagingException {
  • (message);
  • }
  • };
  • }
  • @Bean
  • @ServiceActivator(inputChannel = "mqttOutboundChannel")
  • public MessageHandler mqttOutbound() {
  • MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(appid + "__producer", mqttClientFactory());
  • // 如果设置成true,发送消息时将不会阻塞。
  • (true);
  • (inputTopic[0]);
  • return messageHandler;
  • }
  • @Bean
  • public MessageChannel mqttOutboundChannel() {
  • // DirectChannel dc =new DirectChannel();
  • // (mqttOutbound());
  • return new DirectChannel();
  • }
  • /**
  • * mqtt发布者信道名称 mqttOutboundChannel
  • * @return
  • */
  • /* @Bean
  • @ServiceActivator(inputChannel = "mqttOutboundChannel")
  • public MessageHandler outHandler() {
  • return new MessageHandler() {
  • @Override
  • public void handleMessage(Message<?> message) throws MessagingException {
  • ("AAAAA");
  • (message);
  • }
  • };
  • }*/
  • }