依赖引入,主要引入下面的包,其它的包略过
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置类
@Slf4j
@Setter
@Configuration
@EnableWebSocketMessageBroker
@ConfigurationProperties(prefix = "websocket")
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer, ApplicationListener<BrokerAvailabilityEvent> {
private final BrokerConfig brokerConfig;
private String[] allowOrigins;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 继承DefaultHandshakeHandler并重写determineUser方法,可以自定义如何确定用户
// 添加方法:registry.addEndpoint("/ws").setHandshakeHandler(handshakeHandler)
registry.addEndpoint("/ws")
.setAllowedOrigins(allowOrigins)
.withSockJS();
}
/**
* 配置消息代理
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
if (brokerConfig.isUseSimpleBroker()) {
// 使用 SimpleBroker
// 配置前缀, 有这些前缀的消息会路由到broker
registry.enableSimpleBroker("/topic", "/queue")
//配置stomp协议里, server返回的心跳
.setHeartbeatValue(new long[]{10000L, 10000L})
//配置发送心跳的scheduler
.setTaskScheduler(new DefaultManagedTaskScheduler());
} else {
// 使用外部 Broker
// 指定前缀,有这些前缀的消息会路由到broker
registry.enableStompBrokerRelay("/topic", "/queue")
// 广播用户目标,如果要推送的用户不在本地,则通过 broker 广播给集群的其他成员
.setUserDestinationBroadcast("/topic/log-unresolved-user")
// 用户注册广播,一旦有用户登录,则广播给集群中的其他成员
.setUserRegistryBroadcast("/topic/log-user-registry")
// 虚拟地址
.setVirtualHost(brokerConfig.getVirtualHost())
// 用户密码
.setSystemLogin(brokerConfig.getUsername())
.setSystemPasscode(brokerConfig.getPassword())
.setClientLogin(brokerConfig.getUsername())
.setClientPasscode(brokerConfig.getPassword())
// 心跳间隔
.setSystemHeartbeatSendInterval(10000)
.setSystemHeartbeatReceiveInterval(10000)
// 使用 setTcpClient 以配置多个 broker 地址,setRelayHost/Port 只能配置一个
.setTcpClient(createTcpClient());
}
}
/**
* 创建 TcpClient 工厂,用于配置多个 broker 地址
*/
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
// BrokerAddressSupplier 用于获取中继地址,一次只使用一个,如果该中继出错,则会获取下一个
client -> client.addressSupplier(brokerConfig.getBrokerAddressSupplier()),
new StompReactorNettyCodec());
}
@Override
public void onApplicationEvent(BrokerAvailabilityEvent event) {
if (!event.isBrokerAvailable()) {
log.warn("stomp broker is not available!!!!!!!!");
} else {
log.info("stomp broker is available");
}
}
}
消息处理
@Slf4j
@Controller
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class StompController {
private final SimpMessageSendingOperations msgOperations;
private final SimpUserRegistry simpUserRegistry;
/**
* 回音消息,将用户发来的消息内容加上 Echo 前缀后推送回客户端
*/
@MessageMapping("/echo")
public void echo(Principal principal, Msg msg) {
String username = principal.getName();
msg.setContent("Echo: " + msg.getContent());
msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);
int userCount = simpUserRegistry.getUserCount();
int sessionCount = simpUserRegistry.getUser(username).getSessions().size();
log.info("当前本系统总在线人数: {}, 当前用户: {}, 该用户的客户端连接数: {}", userCount, username, sessionCount);
}
}