Springboot集成WebSocket实现消息推送功能

时间:2024-03-08 09:31:12

项目结构

导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.3.7.RELEASE</spring-boot.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--websocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>

        <!--工具包-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.5</version>
        </dependency>


    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.7.RELEASE</version>
                <configuration>
                    <mainClass>com.example.demo.DemoApplication</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

application.yml配置文件

server:
  port: 8080

spring:
  http:
    encoding:UTF-8

  freemarker:
    request-context-attribute: request
    #prefix: /templates/
    suffix: .html
    content-type: text/html
    enabled: true
    cache: false
    charset: UTF-8
    allow-request-override: false
    expose-request-attributes: true
    expose-session-attributes: true
    expose-spring-macro-helpers: true

WebSocketConfig: 用于开启WebSocket支持

package com.example.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author lyd
 * @Description: 开启WebSocket支持
 * @date 15:43
 */
@Configuration
public class WebSocketConfig {

	@Bean
	public ServerEndpointExporter serverEndpointExporter(){
		return new ServerEndpointExporter();
	}


}

WebSocketServer: 核心类,用于开启、关闭连接,以及接收消息等

比较重要的注解 @ServerEndpoint("")@OnOpen@onClose@onMessage

package com.example.demo.server;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author lyd
 * @Description: 因为WebSocket是类似客户端服务端的形式(采用ws协议),那么这里的WebSocketServer其实就相当于一个ws协议的Controller,
 *               所以可以直接在前端调用@ServerEndpoint("")中的路径,相当于掉接口了
 * @date 15:48
 */
@Slf4j
@Component
@ServerEndpoint("/imserver/{userId}")
public class WebSocketServer {
	/**
	 * 静态变量,用来记录当前在线连接数
	 */
	private static int onlineCount = 0;

	/**
	 * Concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象
	 */
	private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();

	/**
	 * 连接会话,给客户端发送数据
	 */
	private Session session;

	/**
	 * 接收userId
	 */
	private String userId = "";

	/**
	 * 连接建立成功调用的方法
	 */
	@OnOpen
	public void onOpen(Session session, @PathParam("userId") String userId) {

		this.session = session;
		this.userId = userId;

		if (webSocketMap.containsKey(userId)) {
			webSocketMap.remove(userId);
			webSocketMap.put(userId, this);
			addOnlineCount();
		} else {
			webSocketMap.put(userId, this);
			addOnlineCount();
		}

		log.info("用户" + userId + "连接,当前在线人数为:" + getOnlineCount());

		try {
			sendMessage("连接成功");
		} catch (IOException e) {
			log.error("用户:" + userId + ",网络异常,哈哈哈");
			e.printStackTrace();
		}

	}

	/**
	 * 连接关闭调用的方法
	 */
	@OnClose
	public void onClose() {
		if (webSocketMap.containsKey(userId)) {
			webSocketMap.remove(userId);
			subOnlineCount();
		}
		log.info("用户:" + userId + "退出成功,当前在线人数为:" + getOnlineCount());

	}

	/**
	 * 收到客户端消息后调用的方法
	 */
	@OnMessage
	public void onMessage(String message, Session session) {
		log.info("用户消息:" + userId + ",报文:" + message);
		//可以群发消息
		//消息保存到数据库、redis
		if (StringUtils.isNotBlank(message)) {
			try {
				// 解析报文
				JSONObject jsonObject = JSON.parseObject(message);
				// 追加发送人(防止串改)
				jsonObject.put("fromUserId", this.userId);
				String toUserId = jsonObject.getString("toUserId");
				if (StringUtils.isNotBlank(toUserId) && webSocketMap.containsKey(toUserId)) {
					webSocketMap.get(toUserId).sendMessage(jsonObject.toJSONString());
				}else {
					log.error("请求的userId:"+toUserId+"不在该服务器上");
				}
			} catch (IOException e) {
				e.printStackTrace();
			}

		}
	}

	/**
	 * 实现服务器消息主动推送
	 *
	 * @param message
	 * @throws IOException
	 */
	public void sendMessage(String message) throws IOException {
		this.session.getBasicRemote().sendText(message);
	}

	/**
	 * 发送自定义消息
	 */
	public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException {
		log.info("发送消息到:" + userId + ",报文:" + message);
		if (StringUtils.isNotBlank(userId) && webSocketMap.containsKey(userId)) {
			webSocketMap.get(userId).sendMessage(message);
		} else {
			log.error("用户" + userId + ",不在线!");
		}
	}

	/**
	 * 获得当前连接数
	 *
	 * @return
	 */
	public static synchronized int getOnlineCount() {
		return onlineCount;
	}

	/**
	 * 在线连接数加1
	 */
	public static synchronized void addOnlineCount() {
		WebSocketServer.onlineCount++;
	}

	/**
	 * 在线连接数减1
	 */
	public static synchronized void subOnlineCount() {
		WebSocketServer.onlineCount--;
	}

	/**
	 * @param session
	 * @param error
	 */
	@OnError
	public void onError(Session session, Throwable error) {
		log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
		error.printStackTrace();
	}
}

消息推送

写个controller接口调用WebSocketServer.sendInfo()

package com.example.demo.controller;

import com.example.demo.server.WebSocketServer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;

import javax.websocket.server.PathParam;
import java.io.IOException;

/**
 * @author lyd
 * @Description:
 * @date 16:31
 */
@RestController
public class DemoController {

	/**
	 * 接口测试
	 * @return
	 */
	@RequestMapping("index")
	public ResponseEntity index(){
		return ResponseEntity.ok("请求成功");
	}

	/**
	 * 页面跳转
	 * @return
	 */
	@RequestMapping("page")
	public ModelAndView page(){
		return new ModelAndView("testPage.html");
	}

	/**
	 * 消息推送
	 * @param message
	 * @param toUserId
	 * @return
	 * @throws IOException
	 */
	@RequestMapping("pushToWeb")
	public ResponseEntity<String> pushToWeb(String message, @PathParam("toUserId") String toUserId) throws IOException {
		WebSocketServer.sendInfo(message,toUserId);
		return ResponseEntity.ok("MSG SEND SUCCESS");
	}


}

前端页面代码

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>websocket通讯</title>
</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script>
    var socket;

    function openSocket() {
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            console.log("您的浏览器支持WebSocket");
            //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
            //等同于socket = new WebSocket("ws://localhost:8888/xxxx/im/25");
            //var socketUrl="${request.contextPath}/im/"+$("#userId").val();
            var socketUrl = "http://localhost:8080/imserver/" + $("#userId").val();
            socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
            console.log(socketUrl);
            if (socket != null) {
                socket.close();
                socket = null;
            }
            socket = new WebSocket(socketUrl);
            //打开事件
            socket.onopen = function () {
                console.log("websocket已打开");
                //socket.send("这是来自客户端的消息" + location.href + new Date());
            };
            //获得消息事件
            socket.onmessage = function (msg) {
                console.log(msg.data);
                //发现消息进入    开始处理前端触发逻辑
            };
            //关闭事件
            socket.onclose = function () {
                console.log("websocket已关闭");
            };
            //发生了错误事件
            socket.onerror = function () {
                console.log("websocket发生了错误");
            }
        }
    }

    function sendMessage() {
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            console.log("您的浏览器支持WebSocket");
            console.log(\'{"toUserId":"\' + $("#toUserId").val() + \'","contentText":"\' + $("#contentText").val() + \'"}\');
            socket.send(\'{"toUserId":"\' + $("#toUserId").val() + \'","contentText":"\' + $("#contentText").val() + \'"}\');
        }
    }
</script>
<body>
<p>【userId】:
<div><input id="userId" name="userId" type="text" value="10"></div>
<p>【toUserId】:
<div><input id="toUserId" name="toUserId" type="text" value="20"></div>
<p>【发送内容】:
<div><input id="contentText" name="contentText" type="text" value="hello websocket"></div>
<p>【操作】:
<div><a onclick="openSocket()">开启socket</a></div>
<p>【操作】:
<div><a onclick="sendMessage()">发送消息</a></div>
</body>

</html>

建立连接

发送消息


项目源码

https://github.com/Wranglery/test-websocket

感谢大佬的代码参考:https://blog.csdn.net/moshowgame/article/details/80275084