springboot集成mqtt的实践开发

时间:2021-11-11 06:49:59

MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。

maven

?
1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-mqtt</artifactId>
</dependency>

配置client factory

?
1
2
3
4
5
6
7
8
  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setServerURIs("tcp://demo:1883");
//    factory.setUserName("guest");
//    factory.setPassword("guest");
    return factory;
  }

配置consumer

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Bean
public IntegrationFlow mqttInFlow() {
  return IntegrationFlows.from(mqttInbound())
      .transform(p -> p + ", received from MQTT")
      .handle(logger())
      .get();
}
 
private LoggingHandler logger() {
  LoggingHandler loggingHandler = new LoggingHandler("INFO");
  loggingHandler.setLoggerName("siSample");
  return loggingHandler;
}
 
@Bean
public MessageProducerSupport mqttInbound() {
  MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
      mqttClientFactory(), "siSampleTopic");
  adapter.setCompletionTimeout(5000);
  adapter.setConverter(new DefaultPahoMessageConverter());
  adapter.setQos(1);
  return adapter;
}

配置producer

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Bean
  public IntegrationFlow mqttOutFlow() {
    //console input
//    return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
//        e -> e.poller(Pollers.fixedDelay(1000)))
//        .transform(p -> p + " sent to MQTT")
//        .handle(mqttOutbound())
//        .get();
    return IntegrationFlows.from(outChannel())
        .handle(mqttOutbound())
        .get();
  }
  
  @Bean
  public MessageChannel outChannel() {
    return new DirectChannel();
  }
 
  @Bean
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("siSampleTopic");
    return messageHandler;
  }

配置MessagingGateway

?
1
2
3
4
@MessagingGateway(defaultRequestChannel = "outChannel")
public interface MsgWriter {
  void write(String note);
}

这样就大功告成了

doc

spring-integration-mqtt 

spring-integration-samples-mqtt

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://segmentfault.com/a/1190000010601548