基于SpringBoot实现MQTT消息收发

时间:2024-10-14 20:05:14
@Log4j2 @Data @Configuration @IntegrationComponentScan public class Subscriber { @Resource MqttConfiguration configuration; @Bean public MqttConnectOptions connectOptions() { return configuration.connectionOptions(); } // 初始化连接工厂 @Bean public MqttPahoClientFactory mqttPahoClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(connectOptions()); return factory; } // 建立输入通道 @Bean("mqttInboundChannel") public MessageChannel mqttInboundChannel() { return new DirectChannel() {{ this.subscribe(handler()); }}; } private static String clusterClientId() { try { return "subscriber" + "-" + InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException he) { log.warn("unknown ip address, cause: {}", he.getMessage()); } return "subscriber" + "-" + System.currentTimeMillis(); } // 绑定TOPICs @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( clusterClientId(), mqttPahoClientFactory(), configuration.getTopics().toArray(new String[0])); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(configuration.getQos()); adapter.setOutputChannel(mqttInboundChannel()); return adapter; } // 消息处理定义 @Bean public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { log.info("HandleMessage headers: {}", message.getHeaders()); log.info("HandleMessage payload: {}", message.getPayload()); } }; }

相关文章