写过web的同学们应该对Session这个东西很熟悉。浏览器第一次与服务器建立连接的时候,服务器就会自动为之分配一个Session。Session可以用来判断用户是否经过登录验证,也可以保存用户的各种信息。
其实,Session是很常用的技术。不管是WEB,还是游戏服务,还是联网的桌面程序,都有session的身影。有了Session,我们可以向里面保存各种个人参数,还可以利用session来向客户端发送消息。极大方便了程序对客户端的管理。
Mina IO框架默认有IoSession这个对象,Netty可就没有了。所以我们可以自己创建一个Session抽象。
关于Session,我们希望它有这样的作用。
1. 客户端链路第一次激活时,服务端为之创建一个Session;
2. 可以使用Session向用户发送消息;
3. 可以保存一些重要的且不需要持久化的用户信息;
4. 只能由服务端控制它的生命周期消亡;
public class IoSession {
private static final Logger logger = LoggerFactory.getLogger(IoSession.class);
/** 网络连接channel */
private Channel channel;
private User user;
/** ip地址 */
private String ipAddr;
private boolean reconnected;
/** 拓展用,保存一些个人数据 */
private Map<String, Object> attrs = new HashMap<>();
public IoSession() {
}
public IoSession(Channel channel) {
this.channel = channel;
this.ipAddr = ChannelUtils.getIp(channel);
}
public void setUser(User user) {
this.user = user;
}
/**
* 向客户端发送消息
* @param packet
*/
public void sendPacket(Packet packet) {
if (packet == null) {
return;
}
if (channel != null) {
channel.writeAndFlush(packet);
}
}
public String getIpAddr() {
return ipAddr;
}
public void setIpAddr(String ipAddr) {
this.ipAddr = ipAddr;
}
public boolean isReconnected() {
return reconnected;
}
public void setReconnected(boolean reconnected) {
this.reconnected = reconnected;
}
public User getUser() {
return user;
}
public boolean isClose() {
if (channel == null) {
return true;
}
return !channel.isActive() ||
!channel.isOpen();
}
/**
* 关闭session
* @param reason {@link SessionCloseReason}
*/
public void close(SessionCloseReason reason) {
try{
if (this.channel == null) {
return;
}
if (channel.isOpen()) {
channel.close();
logger.info("close session[{}], reason is {}", getUser().getUserId(), reason);
}else{
logger.info("session[{}] already close, reason is {}", getUser().getUserId(), reason);
}
}catch(Exception e){
}
}
}
Session被关闭可以有一系列原因,所以我们最后有一个枚举保存各种原因,像这样
package com.kingston.net;public enum SessionCloseReason {/** 正常退出 */NORMAL,/** 链接超时 */OVER_TIME,}
在Netty,channel是通讯的载体,为了方便对channel的各种操作,加了一个channel的工具类(ChannelUtils.java)
public final class ChannelUtils {public static AttributeKey<IoSession> SESSION_KEY = AttributeKey.valueOf("session");/** * 添加新的会话 * @param channel * @param session * @return */public static boolean addChannelSession(Channel channel, IoSession session) {Attribute<IoSession> sessionAttr = channel.attr(SESSION_KEY);return sessionAttr.compareAndSet(null, session);}public static IoSession getSessionBy(Channel channel) {Attribute<IoSession> sessionAttr = channel.attr(SESSION_KEY);return sessionAttr.get() ;}public static String getIp(Channel channel) {return ((InetSocketAddress)channel.remoteAddress()).getAddress().toString().substring(1);}}
使用了IoSession,先前用于管理用户通讯的工具类,也相应发生变化
public enum ServerManager {INSTANCE;private Logger logger = LoggerFactory.getLogger(ServerManager.class);/** 缓存通信上下文环境对应的登录用户(主要用于服务) */ private Map<IoSession, Long> session2UserIds = new ConcurrentHashMap<>();/** 缓存用户id与对应的会话 */private ConcurrentMap<Long, IoSession> userId2Sessions = new ConcurrentHashMap<>();public void sendPacketTo(Packet pact,Long userId){if(pact == null || userId <= 0) return;IoSession session = userId2Sessions.get(userId);if (session != null) {session.sendPacket(pact);}}/** * 向所有在线用户发送数据包 */public void sendPacketToAllUsers(Packet pact){if(pact == null ) return;userId2Sessions.values().forEach( (session) -> session.sendPacket(pact));}/** * 向单一在线用户发送数据包 */public void sendPacketTo(Packet pact,ChannelHandlerContext targetContext ){if(pact == null || targetContext == null) return;targetContext.writeAndFlush(pact);}public IoSession getSessionBy(long userId) {return this.userId2Sessions.get(userId);}public boolean registerSession(User user, IoSession session) {session.setUser(user);userId2Sessions.put(user.getUserId(), session);logger.info("[{}] registered...", user.getUserId());return true;}/** * 注销用户通信渠道 */public void ungisterUserContext(Channel context ){if(context == null){return;}IoSession session = ChannelUtils.getSessionBy(context);Long userId = session2UserIds.remove(session);userId2Sessions.remove(userId);if (session != null) {session.close(SessionCloseReason.OVER_TIME);}}}
加入IoSession后,先前的业务需要做点修改,比如在客户端链路建立后,需要创建新的session对象。
在MessageTransportHandler类增加方法
@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {if (!ChannelUtils.addChannelSession(ctx.channel(), new IoSession(ctx.channel()))) {ctx.channel().close();logger.error("Duplicate session,IP=[{}]",ChannelUtils.getIp(ctx.channel()));}}
全部代码已在github上托管
服务端代码请移步 --> netty聊天室服务器
客户端代码请移步 --> netty聊天室客户端