序
MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。
maven
1
2
3
4
5
6
7
8
9
10
11
12
|
< dependency >
< groupId >org.springframework.boot</ groupId >
< artifactId >spring-boot-starter-integration</ artifactId >
</ dependency >
< dependency >
< groupId >org.springframework.integration</ groupId >
< artifactId >spring-integration-stream</ artifactId >
</ dependency >
< dependency >
< groupId >org.springframework.integration</ groupId >
</ dependency >
|
配置client factory
1
2
3
4
5
6
7
8
|
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs( "tcp://demo:1883" );
// factory.setUserName("guest");
// factory.setPassword("guest");
return factory;
}
|
配置consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", received from MQTT" )
.handle(logger())
.get();
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler( "INFO" );
loggingHandler.setLoggerName( "siSample" );
return loggingHandler;
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( "siSampleConsumer" ,
mqttClientFactory(), "siSampleTopic" );
adapter.setCompletionTimeout( 5000 );
adapter.setConverter( new DefaultPahoMessageConverter());
adapter.setQos( 1 );
return adapter;
}
|
配置producer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@Bean
public IntegrationFlow mqttOutFlow() {
//console input
// return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
// e -> e.poller(Pollers.fixedDelay(1000)))
// .transform(p -> p + " sent to MQTT")
// .handle(mqttOutbound())
// .get();
return IntegrationFlows.from(outChannel())
.handle(mqttOutbound())
.get();
}
@Bean
public MessageChannel outChannel() {
return new DirectChannel();
}
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( "siSamplePublisher" , mqttClientFactory());
messageHandler.setAsync( true );
messageHandler.setDefaultTopic( "siSampleTopic" );
return messageHandler;
}
|
配置MessagingGateway
1
2
3
4
|
@MessagingGateway (defaultRequestChannel = "outChannel" )
public interface MsgWriter {
void write(String note);
}
|
这样就大功告成了
doc
spring-integration-samples-mqtt
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://segmentfault.com/a/1190000010601548