springboot学习(二十一) springboot中websocket使用@MessageMapping接收各种类型数据

时间:2025-03-16 07:36:46

1、发送数据携带用户ID
2、发送JSON数据体
3、将参数携带到发送请求的URL路径中
4、发送header
5、发送Httpsession中的数据

springboot中websocket配置见
/u011943534/article/details/81007002

1、发送数据携带用户ID
携带的用户ID可以直接拿到给MessageMapping注解的函数注入,后端可以使用这个ID双向通信
需要定义一个实体实现Principal,实现getName()方法

@Getter
@Setter
public class User implements Principal {

    private String username;

    private String password;

    private String role;

    private List<Url> urls;

    @Override
    public String getName() {
        return username;
    }
}

定义用户拦截器做认证,并生成User,注入StompHeaderAccessor

/**
 *用户拦截器
 **/
public class UserInterceptor implements ChannelInterceptor {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {

        StompHeaderAccessor accessor = (message, );
        if ((())) {
            Object raw = ().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
            if (raw instanceof Map) {
                //这里就是token
                Object name = ((Map) raw).get(Constants.TOKEN_KEY);
                if (name instanceof LinkedList) {
                    // 设置当前访问器的认证用户
                    String token = ((LinkedList) name).get(0).toString();
                    String username = null;
                    try {
                        Map<String, Claim> claimMap = (token);
                        username = ("username").asString();
                        if(username == null){
                            throw new RuntimeException("websocket认证失败");
                        }
                    } catch (UnsupportedEncodingException e) {
                        ();
                        throw new RuntimeException("websocket认证失败", e);
                    } catch (ValidTokenException e) {
                        ();
                        throw new RuntimeException("websocket认证失败", e);
                    }
                    User user = new User();
                    (username);
                    (user);

//                    User user = new User();
//                    ("lalala");
//                    (user);

                }
            }
        }

        return message;
    }

    @Override
    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {

    }

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {

    }

    @Override
    public boolean preReceive(MessageChannel channel) {
        return false;
    }

    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel channel) {
        return null;
    }

    @Override
    public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {

    }
}
/*将客户端渠道拦截器加入spring ioc容器*/
    @Bean
    public UserInterceptor createUserInterceptor() {
        return new UserInterceptor();
    }

服务端

/**
     * 接收用户信息
     * */
    @MessageMapping(value = "/principal")
    public void test(Principal principal) {
        ("当前在线人数:" + ());
        int i = 1;
        for (SimpUser user : ()) {
            ("用户" + i++ + "---" + user);
        }
        //发送消息给指定用户
        ((), "/queue/message","服务器主动推的数据");
    }

客户端:

 /**
         * 发送用户信息
         * */
        function send0() {
            ("/app/principal", {},
                {});
        }

2、发送JSON数据体
服务端可以直接在函数中注入JavaBean或者Map,List或者String接收

服务端:

/*点对点通信*/
    @MessageMapping(value = "/P2P")
    public void templateTest(Principal principal, Map<String,String> data) {
        ("当前在线人数:" + ());
        int i = 1;
        for (SimpUser user : ()) {
            ("用户" + i++ + "---" + user);
        }
        //发送消息给指定用户
        ((), "/queue/message","服务器主动推的数据");
    }

客户端:

        /**
         * 发送JSON数据体
         * */
        function send() {
            ("/app/P2P", {},
                ({ 'name': 'test' }));
        }

3、将参数携带到发送请求的URL路径中
使用@DestinationVariable注解,类似SpringMVC的@PathVirable

服务端:

/**
     * 接收路径参数
     * */
    @MessageMapping(value = "/path/{name}/{company}")
    public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {
        ("当前在线人数:" + ());
        int i = 1;
        for (SimpUser user : ()) {
            ("用户" + i++ + "---" + user);
        }
        //发送消息给指定用户
        ((), "/queue/message","服务器主动推的数据");
    }

客户端:

        /**
         * 发送路径参数
         * */
        function send2() {
            ("/app/path/zhangsan/XXX公司", {},
                {});
        }

4、发送header
使用@Header注解

服务端:

/**
     * 接收header参数
     * */
    @MessageMapping(value = "/header")
    public void headerTest(Principal principal, @Header String one, @Header String two) {
        ("当前在线人数:" + ());
        int i = 1;
        for (SimpUser user : ()) {
            ("用户" + i++ + "---" + user);
        }
        //发送消息给指定用户
        ((), "/queue/message","服务器主动推的数据");
    }

客户端:

        /**
         * 发送header参数
         * */
        function send3() {
            ("/app/header", {"one":"lalala", "two":"中国"},
                {});
        }

5、发送Httpsession中的数据
这里有一点儿小问题,我理解的是只能发送握手连接时的HttpSession中的数据

注册HttpSessionHandshakeIntercepror

    /**
     * 注册stomp的端点
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域
        // 在网页上我们就可以通过这个链接
        // http://localhost:8080/webSocketServer
        // 来和服务器的WebSocket连接
        ("/webSocketServer")
                .addInterceptors(new HttpSessionHandshakeInterceptor())
                .setAllowedOrigins("*")
                .withSockJS();

    }

服务端:

    /**
     * 接收HttpSession数据
     * */
    @MessageMapping(value = "/httpsession")
    public void httpsession( StompHeaderAccessor accessor) {
        String name = (String) ().get("name");
        (1111);
    }

客户端:

        /**
         * 发送httpsession
         * */
        function send4() {
            ("/app/httpsession", {},
                {});
        }

所有代码

前端JS:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script src="//2.3.3/"></script>
    <script src="/sockjs-client/1.1.4/"></script>
    <script>
        var socket = new SockJS("http://192.168.100.88:7601/demo/webSocketServer");
        var stompClient = (socket);
         = function () {
            connect();
        }
        //订阅消息
        function subscribe() {
            ('/user/queue/message', function (response) {
                ("/user/queue/message 你接收到的消息为:" + response);
            });

        }

        /**
         * 发送用户信息
         * */
        function send0() {
            ("/app/principal", {},
                {});
        }

        /**
         * 发送JSON数据体
         * */
        function send() {
            ("/app/P2P", {},
                ({ 'name': 'test' }));
        }

        /**
         * 发送路径参数
         * */
        function send2() {
            ("/app/path/zhangsan/XXX公司", {},
                {});
        }

        /**
         * 发送header参数
         * */
        function send3() {
            ("/app/header", {"one":"lalala", "two":"中国"},
                {});
        }

        /**
         * 发送httpsession
         * */
        function send4() {
            ("/app/httpsession", {},
                {});
        }

        // /**
        //  * 发送URL中?&参数
        //  * */
        // function send5() {
        //     ("/app/param?name=张三", {},
        //         {});
        // }

        function connect() {

            ({
                Authorization:"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjIxOTg1NjQxNjAsImlhdCI6MTUzMTg5NzUwMCwidXNlcm5hbWUiOiJ6cXcxMSJ9.VFR2EKUx5BTYLDkDogiLA9LfNVoPjOzQ3rTWoEy7He4"
                    //这里可以改成token
                    // name: 'admin' // 携带客户端信息
                },
                function connectCallback(frame) {
                    // 连接成功时(服务器响应 CONNECTED 帧)的回调方法
                    alert("success");
                    subscribe();
                },
                function errorCallBack(error) {
                    // 连接失败时(服务器响应 ERROR 帧)的回调方法
                    alert("error");
                });
        }
        function disconnect() {
            if (stompClient != null) {
                ();
            }
//            setConnected(false);
            ("Disconnected");
        }
    </script>
</head>
<body>
    <input type="text" /><button onclick="send5();">发送</button>
</body>
</html>

后端MessaeMapping处:

package ;

import ;
import .slf4j.Slf4j;
import ;
import .*;
import ;
import ;
import ;
import ;
import ;

import ;
import ;

/**
 * 如有要看例子,请打开注释
 *
 **/
@RestController
@Slf4j
public class WebSoketDemoController {

    //spring提供的发送消息模板
    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;


    /**
     * 接收用户信息
     * */
    @MessageMapping(value = "/principal")
    public void test(Principal principal) {
        ("当前在线人数:" + ());
        int i = 1;
        for (SimpUser user : ()) {
            ("用户" + i++ + "---" + user);
        }
        //发送消息给指定用户
        ((), "/queue/message","服务器主动推的数据");
    }


    /**
     * 接收数据体
    * */
    @MessageMapping(value = "/P2P")
    public void templateTest(Principal principal, Map<String,String> data) {
        ("当前在线人数:" + ());
        int i = 1;
        for (SimpUser user : ()) {
            ("用户" + i++ + "---" + user);
        }
        //发送消息给指定用户
        ((), "/queue/message","服务器主动推的数据");
    }


    /**
     * 接收路径参数
     * */
    @MessageMapping(value = "/path/{name}/{company}")
    public void pathTest(Principal principal, @DestinationVariable String name, @DestinationVariable String company) {
        ("当前在线人数:" + ());
        int i = 1;
        for (SimpUser user : ()) {
            ("用户" + i++ + "---" + user);
        }
        //发送消息给指定用户
        ((), "/queue/message","服务器主动推的数据");
    }

    /**
     * 接收header参数
     * */
    @MessageMapping(value = "/header")
    public void headerTest(Principal principal, @Header String one, @Header String two) {
        ("当前在线人数:" + ());
        int i = 1;
        for (SimpUser user : ()) {
            ("用户" + i++ + "---" + user);
        }
        //发送消息给指定用户
        ((), "/queue/message","服务器主动推的数据");
    }

    /**
     * 接收HttpSession数据
     * */
    @MessageMapping(value = "/httpsession")
    public void httpsession( StompHeaderAccessor accessor) {
        String name = (String) ().get("name");
        (1111);
    }

//    /**
//     * 接收param数据
//     * */
//    @MessageMapping(value = "/param")
//    public void param(String name) {
//        (1111);
//    }

    /*广播*/
    @MessageMapping("/broadcast")
    @SendTo("/topic/getResponse")
    public ResponseEntity topic() throws Exception {
        return new ResponseEntity(200,"success");
    }

}

Websocket配置类:

package ;

import ;
import ;
import ;
import .*;
import ;

/**
 * webscoket配置
 *
* @auth zhuquanwen
 *
 **/

//@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig /*extends AbstractWebSocketMessageBrokerConfigurer*/ implements WebSocketMessageBrokerConfigurer {

    /**
     * 注册stomp的端点
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 允许使用socketJs方式访问,访问点为webSocketServer,允许跨域
        // 在网页上我们就可以通过这个链接
        // http://localhost:8080/webSocketServer
        // 来和服务器的WebSocket连接
        ("/webSocketServer")
                .addInterceptors(new HttpSessionHandshakeInterceptor())
                .setAllowedOrigins("*")
                .withSockJS();

    }

    /**
     * 配置信息代理
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        // 订阅Broker名称
        ("/queue", "/topic");
        // 全局使用的消息前缀(客户端订阅路径上会体现出来)
        ("/app");
        // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        ("/user/");
    }

    /**
     * 配置客户端入站通道拦截器
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        (createUserInterceptor());

    }



     /*将客户端渠道拦截器加入spring ioc容器*/
    @Bean
    public UserInterceptor createUserInterceptor() {
        return new UserInterceptor();
    }


    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        (500 * 1024 * 1024);
        (1024 * 1024 * 1024);
        (200000);
    }


}

用户拦截器:

package ;

import com.;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

import ;
import ;
import ;
import ;

/**
 *用户拦截器
 **/
public class UserInterceptor implements ChannelInterceptor {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {

        StompHeaderAccessor accessor = (message, );
        if ((())) {
            Object raw = ().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);
            if (raw instanceof Map) {
                //这里就是token
                Object name = ((Map) raw).get(Constants.TOKEN_KEY);
                if (name instanceof LinkedList) {
                    // 设置当前访问器的认证用户
//                    String token = ((LinkedList) name).get(0).toString();
//                    String username = null;
//                    try {
//                        Map<String, Claim> claimMap = (token);
//                        username = ("username").asString();
//                        if(username == null){
//                            throw new RuntimeException("websocket认证失败");
//                        }
//                    } catch (UnsupportedEncodingException e) {
//                        ();
//                        throw new RuntimeException("websocket认证失败", e);
//                    } catch (ValidTokenException e) {
//                        ();
//                        throw new RuntimeException("websocket认证失败", e);
//                    }
//                    User user = new User();
//                    (username);
//                    (user);

                    User user = new User();
                    ("lalala");
                    (user);

                }
            }
        } else if ((())) {
            //发送数据

        }

        return message;
    }

    @Override
    public void postSend(Message<?> message, MessageChannel channel, boolean sent) {

    }

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {

    }

    @Override
    public boolean preReceive(MessageChannel channel) {
        return false;
    }

    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel channel) {
        return null;
    }

    @Override
    public void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex) {

    }
}