java实现websocket server/client

时间:2025-03-28 14:24:46
  • @Component
  • @ServerEndpoint(value = "/ws/{userId}")
  • public class WebSocketServer
  • {
  • /**
  • * 日志
  • */
  • private static final Logger LOGGER = (WebSocketServer.class);
  • // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
  • private static int onlineCount = 0;
  • // concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
  • private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
  • // 保存允许建立连接的id
  • private static List<String> idList = ();
  • private String id = "";
  • // 与某个客户端的连接会话,需要通过它来给客户端发送数据
  • private Session session;
  • /**
  • * <关闭连接>
  • *
  • * @param userId userId
  • * @throws
  • */
  • public void closeConn(String userId)
  • {
  • // 关闭连接
  • try
  • {
  • WebSocketServer socket = webSocketMap.get(userId);
  • if (null != socket)
  • {
  • if (())
  • {
  • .close();
  • }
  • }
  • }
  • catch (IOException e)
  • {
  • LOGGER.error("error, cause: ", e);
  • }
  • (userId);
  • (userId);
  • }
  • /**
  • * <连接/注册时去重>
  • *
  • * @param userId userId
  • * @throws
  • */
  • public void conn(String userId)
  • {
  • // 去重
  • if (!idList.contains(userId))
  • {
  • idList.add(userId);
  • }
  • }
  • /**
  • * <获取注册在websocket进行连接的id>
  • *
  • * @return 结果
  • * @throws
  • */
  • public static List<String> getIdList()
  • {
  • return idList;
  • }
  • /**
  • * <初始化方法>
  • *
  • * @throws
  • */
  • @PostConstruct
  • public void init()
  • {
  • try
  • {
  • /**
  • * TODO 这里的设计是在项目启动时从DB或者缓存中获取注册了允许建立连接的id
  • */
  • // TODO 初始化时将刚注入的对象进行静态保存
  • }
  • catch (Exception e)
  • {
  • // TODO 项目启动错误信息
  • LOGGER.error("error, cause: ", e);
  • }
  • }
  • // /**
  • // * 连接启动时查询是否有滞留的新邮件提醒
  • // *
  • // * @param id
  • // * @throws IOException
  • // * @author caoting
  • // * @date 2019228
  • // */
  • // private void selectOfflineMail(String id)
  • // throws IOException
  • // {
  • // // 查询缓存中是否存在离线邮件消息
  • // jedis = ();
  • // try
  • // {
  • // List<String> mails = (Constant.MAIL_OFFLINE + id, 0, -1);
  • // if (!(mails))
  • // {
  • // for (String mailuuid : mails)
  • // {
  • // String mail = jedis.get(mailuuid);
  • // if (!(mail))
  • // {
  • // sendToUser(Constant.MESSAGE_MAIL + mail, id);
  • // }
  • // (1000);
  • // }
  • // // 发送完成从缓存中移除
  • // (Constant.MAIL_OFFLINE + id);
  • // }
  • // }
  • // catch (InterruptedException e)
  • // {
  • // ();
  • // }
  • // finally
  • // {
  • // jedis.close();
  • // }
  • // }
  • /**
  • * <连接建立成功调用的方法>
  • *
  • * @param userId userId
  • * @param session session
  • * @throws
  • */
  • @OnOpen
  • public void onOpen(@PathParam(value = "userId") String userId, Session session)
  • {
  • try
  • {
  • // 注:admin是管理员内部使用通道 不受监控 谨慎使用
  • // if (!id.contains("admin"))
  • // {
  • // = session;
  • // = id;//接收到发送消息的人员编号
  • // // 验证id是否在允许
  • // if (idList.contains(id))
  • // {
  • // // 判断是否已存在相同id
  • // WebSocketServer socket = webSocketSet.get(id);
  • // if (socket == null)
  • // {
  • // (id, this); //加入set
  • // addOnlineCount(); // 在线数加1
  • //
  • // ("Hello:::" + id);
  • // ("用户" + id + "加入!当前在线人数为" + getOnlineCount());
  • //
  • // // 检查是否存在离线推送消息
  • // selectOfflineMail(id);
  • // }
  • // else
  • // {
  • // ("连接id重复--连接即将关闭");
  • // .close();
  • // }
  • // }
  • // else
  • // {
  • // // 查询数据库中是否存在数据
  • // WsIds wsIds = (id);
  • // if (null != wsIds)
  • // {
  • // idList.add(id);
  • // (id, this); //加入set
  • //
  • // addOnlineCount(); // 在线数加1
  • // ("Hello:::" + id);
  • // ("用户" + id + "加入!当前在线人数为" + getOnlineCount());
  • //
  • // // 检查是否存在离线推送消息
  • // selectOfflineMail(id);
  • //
  • // }
  • // else
  • // {
  • // // 关闭
  • // ("暂无连接权限,连接即将关闭,请确认连接申请是否过期!");
  • // .close();
  • // ("有异常应用尝试与服务器进行长连接 使用id为:" + id);
  • // }
  • // }
  • // }
  • // else
  • // {
  • = session;
  • = userId;//接收到发送消息的人员编号
  • (userId, this); //加入set
  • addOnlineCount(); // 在线数加1
  • ("Hello:::" + userId);
  • ("用户" + userId + "加入!当前在线人数为" + getOnlineCount());
  • // }
  • }
  • catch (IOException e)
  • {
  • LOGGER.error("error, cause: ", e);
  • }
  • }
  • /**
  • * 连接关闭调用的方法
  • */
  • @OnClose
  • public void onClose()
  • {
  • (); //set中删除
  • ();
  • subOnlineCount(); // 在线数减1
  • ("有一连接关闭!当前在线人数为" + getOnlineCount());
  • }
  • /**
  • * 收到客户端消息后调用的方法
  • *
  • * @param message 客户端发送过来的消息
  • */
  • @OnMessage
  • public void onMessage(String message, Session session)
  • {
  • ("来自客户端的消息:" + message);
  • // TODO 收到客户端消息后的操作
  • }
  • /**
  • * 发生错误时调用
  • */
  • @OnError
  • public void onError(Session session, Throwable error)
  • {
  • ("发生错误");
  • LOGGER.error("error, cause: ", error);
  • }
  • /**
  • * <发送message>
  • *
  • * @param message message
  • * @throws
  • */
  • public synchronized void sendMessage(String message)
  • throws IOException
  • {
  • ().sendText(message);
  • }
  • /**
  • * 发送信息给指定ID用户,如果用户不在线则返回不在线信息给自己
  • *
  • * @param message
  • * @param sendUserId
  • * @throws IOException
  • */
  • public Boolean sendToUser(String message, String sendUserId)
  • throws IOException
  • {
  • Boolean flag = true;
  • WebSocketServer socket = webSocketMap.get(sendUserId);
  • if (socket != null)
  • {
  • try
  • {
  • if (())
  • {
  • (message);
  • }
  • else
  • {
  • flag = false;
  • }
  • }
  • catch (Exception e)
  • {
  • flag = false;
  • LOGGER.error("error, cause: ", e);
  • }
  • }
  • else
  • {
  • flag = false;
  • ("【" + sendUserId + "】 该用户不在线");
  • }
  • return flag;
  • }
  • /**
  • * <群发自定义消息>
  • *
  • * @param message message
  • * @throws
  • */
  • public void sendToAll(String message)
  • {
  • List<String> userIdList = new ArrayList<>(());
  • for (int i = 0; i < userIdList.size(); i++)
  • {
  • try
  • {
  • WebSocketServer socket = webSocketMap.get(userIdList.get(i));
  • if (())
  • {
  • (message);
  • }
  • }
  • catch (Exception e)
  • {
  • LOGGER.error("sendToAll error. cause: ", e);
  • //异常重传
  • i--;
  • continue;
  • }
  • }
  • }
  • /**
  • * <获取在线人数>
  • *
  • * @return 在线人数
  • * @throws
  • */
  • public static synchronized int getOnlineCount()
  • {
  • return onlineCount;
  • }
  • /**
  • * <增加在线人数>
  • *
  • * @throws
  • */
  • public static synchronized void addOnlineCount()
  • {
  • ++;
  • }
  • /**
  • * <减少在线人数>
  • *
  • * @throws
  • */
  • public static synchronized void subOnlineCount()
  • {
  • if ( > 0)
  • {
  • --;
  • }
  • }
  • }