前言
背景
最近被分配了一个站内信模块,由自己单独负责这个模块;这个模块主要功能就是提供一个接口给调用方,然后将传送的消息推送至登录的相关的用户的客户端;然后就是用户对这条消息的操作了,就是写一些curd的接口供前端调用;
技术选用
由于之前用netty做过一个项目,而且一位大佬也写了很多关于netty的文章,第一时间就想到去看他写的设计一个百万级的消息推送系统; 然后仔细对比了一下,我负责这个模块:
- 用户量不大,因为针对的是运维人员,而且不是所有运维人,是有针对性的;
- 不用安全验证,因为这个项目是在内网中运行;
- 这个模块不用分布式,只是一个微服务中的一部分;
最后选用了netty-socketio这个框架;而且网上的文章也不少;
正文
springboot整合netty-socketio
pom
首先导入包,我导入的版本是1.7.11;我最开始导入的是跟前边的一个版本,但是出现了一个问题,就是OnEvent事件无法监听,所以我换了更高的版本,然后就可以了;
<dependency>
<groupId></groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.11</version>
</dependency>
复制代码
整合
这个整合和之前的springboot整合netty一样的;
- 实现CommandLineRunner 接口,在springboot启动前启动服务
- 使用@Configuration注解,将服务以bean的方式启动
- 实现springboot中各种接口,如:实现InitializingBean接口,*Aware接口类等;
一句话总结起来就是netty的启动是以注入的方式启动,而不是以new的方式;当然也可以以new的方式启动,只是这样的话就无法直接以注入的方式调用其他类了;
新建ChatServer类
该类主要是启动服务; 我这里使用了实现InitializingBean接口启动服务,类上注意声明@Component,当然也可以用其他的方式;只要springboot在启动时能扫描到这个类就行了;
@Component
public class PushServer implements InitializingBean {
@Resource
private EventListenner eventListenner;
@Value("${}")
private int serverPort;
@Override
public void afterPropertiesSet() throws Exception {
Configuration config = new Configuration();
(serverPort);
SocketConfig socketConfig = new SocketConfig();
(true);
(true);
(0);
(socketConfig);
("localhost");
SocketIOServer server = new SocketIOServer(config);
(eventListenner);
();
("启动正常");
}
}
复制代码
新建EventListennerlei
该类主要是监听客户端的连接及断开,然后进行处理; 在这里,我对请求地址附带了用户的ID;
@Component
public class EventListenner {
@Resource
private ClientCache clientCache;
/**
* 客户端连接
* @param client
*/
@OnConnect
public void onConnect(SocketIOClient client) {
String userId = ().getSingleUrlParam("userId");
UUID sessionId = ();
(userId,sessionId,client);
("建立连接");
}
/**
* 客户端断开
* @param client
*/
@OnDisconnect
public void onDisconnect(SocketIOClient client) {
String userId = ().getSingleUrlParam("userId");
(userId,());
("关闭连接");
}
//消息接收入口,当接收到消息后,查找发送目标客户端,并且向该客户端发送消息,且给自己发送消息
// 暂未使用
@OnEvent("messageevent")
public void onEvent(SocketIOClient client, AckRequest request) {
}
}
复制代码
缓存
由于需要向指定用户推送消息,所以需要将连接信息与用户绑定,所以前端在登陆的同时需要发送用户ID;然后在连接事件的监听中将连接信息与用户绑定;连接信息与用户实现一对一对应,但是这样会出现一个问题,当用户打开多个页面时,新开的页面通道连接会将旧页面的通道连接信息覆盖,造成无法全部页面推送,所以将用户与通道信息改成一对多的关系; 如何将用户与通道信息保存,我这里使用了两个map集合:
- 里层map存储页面sessionID对应的通道信息,以sessionID为key,通道连接信息为value
- 外层map存储用户ID对应的里层数据,以用户ID为key,里层数据为value;
客户端断开时则需要用户ID及sessid; 代码如下:
@Component
public class ClientCache {
//本地缓存
private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
/**
* 存入本地缓存
* @param userId 用户ID
* @param sessionId 页面sessionID
* @param socketIOClient 页面对应的通道连接信息
*/
public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
HashMap<UUID, SocketIOClient> sessionIdClientCache=(userId);
if(sessionIdClientCache==null){
sessionIdClientCache = new HashMap<>();
}
(sessionId,socketIOClient);
(userId,sessionIdClientCache);
}
/**
* 根据用户ID获取所有通道信息
* @param userId
* @return
*/
public HashMap<UUID, SocketIOClient> getUserClient(String userId){
return (userId);
}
/**
* 根据用户ID及页面sessionID删除页面链接信息
* @param userId
* @param sessionId
*/
public void deleteSessionClient(String userId,UUID sessionId){
(userId).remove(sessionId);
}
}
复制代码
推送接口PushController类
该类主要是提供给别人调用,向用户推送消息,这里就贴直接推送的代码了,具体的推送业务就不贴出来了;这里需要注意,推送事件的命名需要与web端监听命名一致;
@RestController
@RequestMapping("/push")
public class PushController {
@Resource
private ClientCache clientCache;
@GetMapping("/user/{userId}")
public String pushTuUser(@PathVariable("userId") String userId){
HashMap<UUID, SocketIOClient> userClient = (userId);
((uuid, socketIOClient) -> {
//向客户端推送消息
("chatevent","服务端推送消息");
});
return "success";
}
}
复制代码
客户端(web端)
由于web端代码及依赖较多;就不提供代码;可以去官网下载:/mrniko/nett… 下载下来后,打开client文件下的;我修改的地方:
运行结果
- 客户端 这里同时打开三个页面
- 服务端
- 推送消息
调用推送接口,三个页面同时收到推送消息
GitHub地址
总结
- 在刚开始设计时,我设计用户与通道的关系就是一对一的关系,最后在提交流程图的时候,项目组的各位大佬都给我提了许多建议,果然姜还是老的辣啊,在这里十分感谢项目组的各位大佬提出的宝贵的建议;
- 这是新年的第一篇博客;也是新的一个起点,希望自己能坚持把博客写下去;