【stomp实战】Springboot+Stomp协议实现聊天功能-后端代码

时间:2024-02-16 16:36:38

依赖引入,主要引入下面的包,其它的包略过

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-websocket</artifactId>
 </dependency>

配置类

@Slf4j
@Setter
@Configuration
@EnableWebSocketMessageBroker
@ConfigurationProperties(prefix = "websocket")
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer, ApplicationListener<BrokerAvailabilityEvent> {
    private final BrokerConfig brokerConfig;
    private String[] allowOrigins;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 继承DefaultHandshakeHandler并重写determineUser方法,可以自定义如何确定用户
        // 添加方法:registry.addEndpoint("/ws").setHandshakeHandler(handshakeHandler)
        registry.addEndpoint("/ws")
                .setAllowedOrigins(allowOrigins)
                .withSockJS();
    }

    /**
     * 配置消息代理
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");

        if (brokerConfig.isUseSimpleBroker()) {
            // 使用 SimpleBroker
            // 配置前缀, 有这些前缀的消息会路由到broker
            registry.enableSimpleBroker("/topic", "/queue")
                    //配置stomp协议里, server返回的心跳
                    .setHeartbeatValue(new long[]{10000L, 10000L})
                    //配置发送心跳的scheduler
                    .setTaskScheduler(new DefaultManagedTaskScheduler());
        } else {
            // 使用外部 Broker
            // 指定前缀,有这些前缀的消息会路由到broker
            registry.enableStompBrokerRelay("/topic", "/queue")
                    // 广播用户目标,如果要推送的用户不在本地,则通过 broker 广播给集群的其他成员
                    .setUserDestinationBroadcast("/topic/log-unresolved-user")
                    // 用户注册广播,一旦有用户登录,则广播给集群中的其他成员
                    .setUserRegistryBroadcast("/topic/log-user-registry")
                    // 虚拟地址
                    .setVirtualHost(brokerConfig.getVirtualHost())
                    // 用户密码
                    .setSystemLogin(brokerConfig.getUsername())
                    .setSystemPasscode(brokerConfig.getPassword())
                    .setClientLogin(brokerConfig.getUsername())
                    .setClientPasscode(brokerConfig.getPassword())
                    // 心跳间隔
                    .setSystemHeartbeatSendInterval(10000)
                    .setSystemHeartbeatReceiveInterval(10000)
                    // 使用 setTcpClient 以配置多个 broker 地址,setRelayHost/Port 只能配置一个
                    .setTcpClient(createTcpClient());
        }
    }

    /**
     * 创建 TcpClient 工厂,用于配置多个 broker 地址
     */
    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        return new ReactorNettyTcpClient<>(
                // BrokerAddressSupplier 用于获取中继地址,一次只使用一个,如果该中继出错,则会获取下一个
                client -> client.addressSupplier(brokerConfig.getBrokerAddressSupplier()),
                new StompReactorNettyCodec());
    }

    @Override
    public void onApplicationEvent(BrokerAvailabilityEvent event) {
        if (!event.isBrokerAvailable()) {
            log.warn("stomp broker is not available!!!!!!!!");
        } else {
            log.info("stomp broker is available");
        }
    }
}

消息处理

@Slf4j
@Controller
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class StompController {
    private final SimpMessageSendingOperations msgOperations;
    private final SimpUserRegistry simpUserRegistry;

    /**
     * 回音消息,将用户发来的消息内容加上 Echo 前缀后推送回客户端
     */
    @MessageMapping("/echo")
    public void echo(Principal principal, Msg msg) {
        String username = principal.getName();
        msg.setContent("Echo: " + msg.getContent());
        msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);
        int userCount = simpUserRegistry.getUserCount();
        int sessionCount = simpUserRegistry.getUser(username).getSessions().size();
        log.info("当前本系统总在线人数: {}, 当前用户: {}, 该用户的客户端连接数: {}", userCount, username, sessionCount);
    }
}