@Component
@ServerEndpoint(value = "/ws/{userId}")
public class WebSocketServer
{
/**
* 日志
*/
private static final Logger LOGGER = (WebSocketServer.class);
// 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
// concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
// 保存允许建立连接的id
private static List<String> idList = ();
private String id = "";
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
/**
* <关闭连接>
*
* @param userId userId
* @throws
*/
public void closeConn(String userId)
{
// 关闭连接
try
{
WebSocketServer socket = webSocketMap.get(userId);
if (null != socket)
{
if (())
{
.close();
}
}
}
catch (IOException e)
{
LOGGER.error("error, cause: ", e);
}
(userId);
(userId);
}
/**
* <连接/注册时去重>
*
* @param userId userId
* @throws
*/
public void conn(String userId)
{
// 去重
if (!idList.contains(userId))
{
idList.add(userId);
}
}
/**
* <获取注册在websocket进行连接的id>
*
* @return 结果
* @throws
*/
public static List<String> getIdList()
{
return idList;
}
/**
* <初始化方法>
*
* @throws
*/
@PostConstruct
public void init()
{
try
{
/**
* TODO 这里的设计是在项目启动时从DB或者缓存中获取注册了允许建立连接的id
*/
// TODO 初始化时将刚注入的对象进行静态保存
}
catch (Exception e)
{
// TODO 项目启动错误信息
LOGGER.error("error, cause: ", e);
}
}
// /**
// * 连接启动时查询是否有滞留的新邮件提醒
// *
// * @param id
// * @throws IOException
// * @author caoting
// * @date 2019年2月28日
// */
// private void selectOfflineMail(String id)
// throws IOException
// {
// // 查询缓存中是否存在离线邮件消息
// jedis = ();
// try
// {
// List<String> mails = (Constant.MAIL_OFFLINE + id, 0, -1);
// if (!(mails))
// {
// for (String mailuuid : mails)
// {
// String mail = jedis.get(mailuuid);
// if (!(mail))
// {
// sendToUser(Constant.MESSAGE_MAIL + mail, id);
// }
// (1000);
// }
// // 发送完成从缓存中移除
// (Constant.MAIL_OFFLINE + id);
// }
// }
// catch (InterruptedException e)
// {
// ();
// }
// finally
// {
// jedis.close();
// }
// }
/**
* <连接建立成功调用的方法>
*
* @param userId userId
* @param session session
* @throws
*/
@OnOpen
public void onOpen(@PathParam(value = "userId") String userId, Session session)
{
try
{
// 注:admin是管理员内部使用通道 不受监控 谨慎使用
// if (!id.contains("admin"))
// {
// = session;
// = id;//接收到发送消息的人员编号
// // 验证id是否在允许
// if (idList.contains(id))
// {
// // 判断是否已存在相同id
// WebSocketServer socket = webSocketSet.get(id);
// if (socket == null)
// {
// (id, this); //加入set中
// addOnlineCount(); // 在线数加1
//
// ("Hello:::" + id);
// ("用户" + id + "加入!当前在线人数为" + getOnlineCount());
//
// // 检查是否存在离线推送消息
// selectOfflineMail(id);
// }
// else
// {
// ("连接id重复--连接即将关闭");
// .close();
// }
// }
// else
// {
// // 查询数据库中是否存在数据
// WsIds wsIds = (id);
// if (null != wsIds)
// {
// idList.add(id);
// (id, this); //加入set中
//
// addOnlineCount(); // 在线数加1
// ("Hello:::" + id);
// ("用户" + id + "加入!当前在线人数为" + getOnlineCount());
//
// // 检查是否存在离线推送消息
// selectOfflineMail(id);
//
// }
// else
// {
// // 关闭
// ("暂无连接权限,连接即将关闭,请确认连接申请是否过期!");
// .close();
// ("有异常应用尝试与服务器进行长连接 使用id为:" + id);
// }
// }
// }
// else
// {
= session;
= userId;//接收到发送消息的人员编号
(userId, this); //加入set中
addOnlineCount(); // 在线数加1
("Hello:::" + userId);
("用户" + userId + "加入!当前在线人数为" + getOnlineCount());
// }
}
catch (IOException e)
{
LOGGER.error("error, cause: ", e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose()
{
(); // 从set中删除
();
subOnlineCount(); // 在线数减1
("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session)
{
("来自客户端的消息:" + message);
// TODO 收到客户端消息后的操作
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error)
{
("发生错误");
LOGGER.error("error, cause: ", error);
}
/**
* <发送message>
*
* @param message message
* @throws
*/
public synchronized void sendMessage(String message)
throws IOException
{
().sendText(message);
}
/**
* 发送信息给指定ID用户,如果用户不在线则返回不在线信息给自己
*
* @param message
* @param sendUserId
* @throws IOException
*/
public Boolean sendToUser(String message, String sendUserId)
throws IOException
{
Boolean flag = true;
WebSocketServer socket = webSocketMap.get(sendUserId);
if (socket != null)
{
try
{
if (())
{
(message);
}
else
{
flag = false;
}
}
catch (Exception e)
{
flag = false;
LOGGER.error("error, cause: ", e);
}
}
else
{
flag = false;
("【" + sendUserId + "】 该用户不在线");
}
return flag;
}
/**
* <群发自定义消息>
*
* @param message message
* @throws
*/
public void sendToAll(String message)
{
List<String> userIdList = new ArrayList<>(());
for (int i = 0; i < userIdList.size(); i++)
{
try
{
WebSocketServer socket = webSocketMap.get(userIdList.get(i));
if (())
{
(message);
}
}
catch (Exception e)
{
LOGGER.error("sendToAll error. cause: ", e);
//异常重传
i--;
continue;
}
}
}
/**
* <获取在线人数>
*
* @return 在线人数
* @throws
*/
public static synchronized int getOnlineCount()
{
return onlineCount;
}
/**
* <增加在线人数>
*
* @throws
*/
public static synchronized void addOnlineCount()
{
++;
}
/**
* <减少在线人数>
*
* @throws
*/
public static synchronized void subOnlineCount()
{
if ( > 0)
{
--;
}
}
}