springboot集成SocketIO实时通信

时间:2024-10-13 07:59:16

概述

基于 socket.io 来说,采用 node 实现更加合适,本文使用两个后端的Java开源框架实现。

  • 服务端使用 netty-socketio
  • 客户端使用 -client-java

业务需求是将之前通过轮询方式调动RESTFul API改成使用WebSocket长连接方式,实现要服务器实时的推送消息,另外还要实时监控POS机的在线状态等。

引入依赖

 

  1. <!-- netty-socketio-->
  2. <dependency>
  3. <groupId></groupId>
  4. <artifactId>netty-socketio</artifactId>
  5. <version>1.7.13</version>
  6. </dependency>
  7. <dependency>
  8. <groupId></groupId>
  9. <artifactId>netty-resolver</artifactId>
  10. <version>4.1.</version>
  11. </dependency>
  12. <dependency>
  13. <groupId></groupId>
  14. <artifactId>netty-transport</artifactId>
  15. <version>4.1.</version>
  16. </dependency>
  17. <dependency>
  18. <groupId></groupId>
  19. <artifactId>-client</artifactId>
  20. <version>1.0.0</version>
  21. </dependency>

 


先来服务端程序爽一把,话不多说,先上代码:

服务器代码

 

  1. public class NamespaceSocketServer {
  2. private static final Logger logger = ();
  3. public static void main(String[] args) {
  4. /*
  5. * 创建Socket,并设置监听端口
  6. */
  7. Configuration config = new Configuration();
  8. //设置主机名
  9. ("localhost");
  10. //设置监听端口
  11. (9092);
  12. // 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
  13. (10000);
  14. // Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  15. (180000);
  16. // Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  17. (60000);
  18. // 连接认证,这里使用token更合适
  19. (new AuthorizationListener() {
  20. @Override
  21. public boolean isAuthorized(HandshakeData data) {
  22. // String token = ("token");
  23. // String username = (token);
  24. // return (token, "secret");
  25. return true;
  26. }
  27. });
  28. final SocketIOServer server = new SocketIOServer(config);
  29. /*
  30. * 添加连接监听事件,监听是否与客户端连接到服务器
  31. */
  32. (new ConnectListener() {
  33. @Override
  34. public void onConnect(SocketIOClient client) {
  35. // 判断是否有客户端连接
  36. if (client != null) {
  37. ("连接成功。clientId=" + ().toString());
  38. (().getSingleUrlParam("appid"));
  39. } else {
  40. ("并没有人连接上。。。");
  41. }
  42. }
  43. });
  44. /*
  45. * 添加监听事件,监听客户端的事件
  46. * 1.第一个参数eventName需要与客户端的事件要一致
  47. * 2.第二个参数eventClase是传输的数据类型
  48. * 3.第三个参数listener是用于接收客户端传的数据,数据类型需要与eventClass一致
  49. */
  50. ("login", , new DataListener<LoginRequest>() {
  51. @Override
  52. public void onData(SocketIOClient client, LoginRequest data, AckRequest ackRequest) {
  53. ("接收到客户端login消息:code = " + () + ",body = " + ());
  54. // check is ack requested by client, but it's not required check
  55. if (()) {
  56. // send ack response with data to client
  57. ("已成功收到客户端登录请求", "yeah");
  58. }
  59. // 向客户端发送消息
  60. List<String> list = new ArrayList<>();
  61. ("登录成功,sessionId=" + ());
  62. // 第一个参数必须与eventName一致,第二个参数data必须与eventClass一致
  63. String room = ().getSingleUrlParam("appid");
  64. (room).sendEvent("login", ());
  65. }
  66. });
  67. //启动服务
  68. ();
  69. }
  70. }


老规矩,先上代码爽爽

Android客户端

 

  1. public class SocketClient {
  2. private static Socket socket;
  3. private static final Logger logger = ();
  4. public static void main(String[] args) throws URISyntaxException {
  5. IO.Options options = new IO.Options();
  6. = new String[]{"websocket"};
  7. = 2; // 重连尝试次数
  8. = 1000; // 失败重连的时间间隔(ms)
  9. = 20000; // 连接超时时间(ms)
  10. = true;
  11. = "username=test1&password=test1&appid=.apay2";
  12. socket = ("http://localhost:9092/", options);
  13. (Socket.EVENT_CONNECT, new Emitter.Listener() {
  14. @Override
  15. public void call(Object... args) {
  16. // 客户端一旦连接成功,开始发起登录请求
  17. LoginRequest message = new LoginRequest(12, "这是客户端消息体");
  18. ("login", (message), (Ack) args1 -> {
  19. ("回执消息=" + (args1).map(Object::toString).collect((",")));
  20. });
  21. }
  22. }).on("login", new Emitter.Listener() {
  23. @Override
  24. public void call(Object... args) {
  25. ("接受到服务器房间广播的登录消息:" + (args));
  26. }
  27. }).on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
  28. @Override
  29. public void call(Object... args) {
  30. ("Socket.EVENT_CONNECT_ERROR");
  31. ();
  32. }
  33. }).on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() {
  34. @Override
  35. public void call(Object... args) {
  36. ("Socket.EVENT_CONNECT_TIMEOUT");
  37. ();
  38. }
  39. }).on(Socket.EVENT_PING, new Emitter.Listener() {
  40. @Override
  41. public void call(Object... args) {
  42. ("Socket.EVENT_PING");
  43. }
  44. }).on(Socket.EVENT_PONG, new Emitter.Listener() {
  45. @Override
  46. public void call(Object... args) {
  47. ("Socket.EVENT_PONG");
  48. }
  49. }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
  50. @Override
  51. public void call(Object... args) {
  52. ("-----------接受到消息啦--------" + (args));
  53. }
  54. }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
  55. @Override
  56. public void call(Object... args) {
  57. ("客户端断开连接啦。。。");
  58. ();
  59. }
  60. });
  61. ();
  62. }
  63. }

关于心跳机制

根据 文档 解释, 客户端会定期发送心跳包,并触发一个ping事件和一个pong事件,如下:

  • ping Fired when a ping packet is written out to the server.
  • pong Fired when a pong is received from the server. Parameters:
    • Number number of ms elapsed since ping packet (.: latency)

这里最重要的两个服务器参数如下:

  1. pingTimeout (Number): how many ms without a pong packet to consider the connection closed (60000)
  2. pingInterval (Number): how many ms before sending a new ping packet (25000).

也就是说握手协议的时候,客户端从服务器拿到这两个参数,一个是ping消息的发送间隔时间,一个是从服务器返回pong消息的超时时间, 客户端会在超时后断开连接。心跳包发送方向是客户端向服务器端发送,以维持在线状态。

关于断线和超时

关闭浏览器、直接关闭客户端程序、kill进程、主动执行disconnect方法都会导致立刻产生断线事件。 而客户端把网络断开,服务器端在 pingTimeout ms后产生断线事件、客户端在 pingTimeout ms后也产生断线事件。

实际上,超时后会产生一个断线事件,叫”disconnect”。客户端和服务器端都可以对这个事件作出应答,释放连接。

客户端代码:

 

  1. .on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
  2. @Override
  3. public void call(Object... args) {
  4. ("客户端断开连接啦。。。");
  5. ();
  6. }
  7. });


下面是客户端日志:连上服务器后,断开网络。超过了心跳超时时间后,产生断线事件。如果客户端不主动断开连接的话,会自动重连, 这时候发现连接不上,又产生连接错误事件,然后重试2次,都失败后自动断开连接了。

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  1. SocketClient - 回执消息=服务器已成功收到客户端登录请求,yeah
  2. SocketClient - Socket.EVENT_PING
  3. SocketClient - Socket.EVENT_PONG
  4. SocketClient - 客户端断开连接啦。。。
  5. SocketClient - Socket.EVENT_CONNECT_ERROR

服务器端代码:

  1. (new DisconnectListener() {
  2. @Override
  3. public void onDisconnect(SocketIOClient client) {
  4. ("服务器收到断线通知... sessionId=" + ());
  5. }
  6. });

 

服务器逻辑是,如果在心跳超时后,就直接断开这个连接,并且产生一个断开连接事件。

服务器通过netty处理心跳包ping/pong的日志如下:

  1. 1
  2. 2
  3. 3
  1. WebSocket08FrameDecoder - Decoding WebSocket Frame opCode=1
  2. WebSocket08FrameDecoder - Decoding WebSocket Frame length=1
  3. WebSocket08FrameEncoder - Encoding WebSocket Frame opCode=1 length=1

浏览器客户端演示

对于netty-socketio有一个demo工程,里面通过一个网页的聊天小程序演示了各种使用方法。

demo地址:netty-socketio-demo

SpringBoot集成

最后重点讲一下如何在SpringBoot中集成。

修改配置

首先maven依赖之前已经讲过了,先修改下配置文件来配置下几个参数,比如主机、端口、心跳时间等等。

  1. 1
  2. 2
  3. 3
  4. 4
  1. ################### 自定义项目配置 ###################
  2. xncoding:
  3. socket-hostname: localhost
  4. socket-port: 9096

添加Bean配置

然后增加一个SocketServer的Bean配置:

 

  1. @Configuration
  2. public class NettySocketConfig {
  3. @Resource
  4. private MyProperties myProperties;
  5. @Resource
  6. private ApiService apiService;
  7. @Resource
  8. private ManagerInfoService managerInfoService;
  9. private static final Logger logger = ();
  10. @Bean
  11. public SocketIOServer socketIOServer() {
  12. /*
  13. * 创建Socket,并设置监听端口
  14. */
  15. .Configuration config = new com.();
  16. // 设置主机名,默认是0.0.0.0
  17. // ("localhost");
  18. // 设置监听端口
  19. (9096);
  20. // 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间
  21. (10000);
  22. // Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
  23. (60000);
  24. // Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
  25. (180000);
  26. // 这个版本0.9.0不能处理好namespace和query参数的问题。所以为了做认证必须使用全局默认命名空间
  27. (new AuthorizationListener() {
  28. @Override
  29. public boolean isAuthorized(HandshakeData data) {
  30. // 可以使用如下代码获取用户密码信息
  31. String username = ("username");
  32. String password = ("password");
  33. ("连接参数:username=" + username + ",password=" + password);
  34. ManagerInfo managerInfo = (username);
  35. // MD5盐
  36. String salt = ();
  37. String encodedPassword = ShiroKit.md5(password, username + salt);
  38. // 如果认证不通过会返回一个Socket.EVENT_CONNECT_ERROR事件
  39. return (());
  40. }
  41. });
  42. final SocketIOServer server = new SocketIOServer(config);
  43. return server;
  44. }
  45. @Bean
  46. public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
  47. return new SpringAnnotationScanner(socketServer);
  48. }
  49. }


后面还有个SpringAnnotationScanner的定义不能忘记。注意,我在AuthorizationListener里面通过调用service做了用户名和密码的认证。通过注解方式可以注入service, 执行相应的连接授权动作。

添加消息结构类

预先定义好客户端和服务器端直接传递的消息类型,使用简单的JavaBean即可,比如

 

  1. public class ReportParam {
  2. /**
  3. * IMEI码
  4. */
  5. private String imei;
  6. /**
  7. * 位置
  8. */
  9. private String location;
  10. public String getImei() {
  11. return imei;
  12. }
  13. public void setImei(String imei) {
  14. this.imei = imei;
  15. }
  16. public String getLocation() {
  17. return location;
  18. }
  19. public void setLocation(String location) {
  20. this.location = location;
  21. }
  22. }


这里才是最核心的接口处理类,所有接口处理逻辑都应该写在这里面,我只举了一个例子,就是POS上传位置接口:

添加消息处理类

 

  1. /**
  2. * 消息事件处理器
  3. *
  4. * @version 1.0
  5. * @since 2018/1015
  6. */
  7. @Component
  8. public class MessageEventHandler {
  9. private final SocketIOServer server;
  10. private final ApiService apiService;
  11. private static final Logger logger = ();
  12. @Autowired
  13. public MessageEventHandler(SocketIOServer server, ApiService apiService) {
  14. this.server = server;
  15. this.apiService = apiService;
  16. }
  17. //添加connect事件,当客户端发起连接时调用
  18. @OnConnect
  19. public void onConnect(SocketIOClient client) {
  20. if (client != null) {
  21. String imei = ().getSingleUrlParam("imei");
  22. String applicationId = ().getSingleUrlParam("appid");
  23. ("连接成功, applicationId=" + applicationId + ", imei=" + imei +
  24. ", sessionId=" + ().toString() );
  25. (applicationId);
  26. // 更新POS监控状态为在线
  27. ReportParam param = new ReportParam();
  28. (imei);
  29. (param, ().toString(), 1);
  30. } else {
  31. ("客户端为空");
  32. }
  33. }
  34. //添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息
  35. @OnDisconnect
  36. public void onDisconnect(SocketIOClient client) {
  37. String imei = ().getSingleUrlParam("imei");
  38. ("客户端断开连接, imei=" + imei + ", sessionId=" + ().toString());
  39. // 更新POS监控状态为离线
  40. ReportParam param = new ReportParam();
  41. (imei);
  42. (param, "", 2);
  43. ();
  44. }
  45. // 消息接收入口
  46. @OnEvent(value = Socket.EVENT_MESSAGE)
  47. public void onEvent(SocketIOClient client, AckRequest ackRequest, Object data) {
  48. ("接收到客户端消息");
  49. if (()) {
  50. // send ack response with data to client
  51. ("服务器回答Socket.EVENT_MESSAGE", "好的");
  52. }
  53. }
  54. // 广播消息接收入口
  55. @OnEvent(value = "broadcast")
  56. public void onBroadcast(SocketIOClient client, AckRequest ackRequest, Object data) {
  57. ("接收到广播消息");
  58. // 房间广播消息
  59. String room = ().getSingleUrlParam("appid");
  60. (room).sendEvent("broadcast", "广播啦啦啦啦");
  61. }
  62. /**
  63. * 报告地址接口
  64. * @param client 客户端
  65. * @param ackRequest 回执消息
  66. * @param param 报告地址参数
  67. */
  68. @OnEvent(value = "doReport")
  69. public void onDoReport(SocketIOClient client, AckRequest ackRequest, ReportParam param) {
  70. ("报告地址接口 start....");
  71. BaseResponse result = postReport(param);
  72. (result);
  73. }
  74. /*----------------------------------------下面是私有方法-------------------------------------*/
  75. private BaseResponse postReport(ReportParam param) {
  76. BaseResponse result = new BaseResponse();
  77. int r = (param);
  78. if (r > 0) {
  79. (true);
  80. ("报告地址成功");
  81. } else {
  82. (false);
  83. ("该POS机还没有入网,报告地址失败。");
  84. }
  85. return result;
  86. }
  87. }


还有一个步骤就是添加启动器,在SpringBoot启动之后立马执行:

 

添加ServerRunner

 

  1. /**
  2. * SpringBoot启动之后执行
  3. *
  4. *
  5. * @version 1.0
  6. * @since 2018/10/15
  7. */
  8. @Component
  9. @Order(1)
  10. public class ServerRunner implements CommandLineRunner {
  11. private final SocketIOServer server;
  12. private static final Logger logger = ();
  13. @Autowired
  14. public ServerRunner(SocketIOServer server) {
  15. this.server = server;
  16. }
  17. @Override
  18. public void run(String... args) throws Exception {
  19. ("ServerRunner 开始启动啦...");
  20. ();
  21. }
  22. }


要实现通过域名并走标准80或443端口的话,最好使用nginx做反向代理,跟正常的http反向代理基本一致, 不过websocket需要增加一个upgrade的配置。

nginx反向代理

下面我以一个实际使用例子来说明如何配置nginx的https访问websocket,并且开启301自动http跳转https。

首先要有一个域名,比如,然后申请letsencrypt的免费证书,这个过程我不讲了

配置如下:

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 9
  10. 10
  11. 11
  12. 12
  13. 13
  14. 14
  15. 15
  16. 16
  17. 17
  18. 18
  19. 19
  20. 20
  21. 21
  22. 22
  23. 23
  24. 24
  25. 25
  26. 26
  27. 27
  28. 28
  29. 29
  30. 30
  31. 31
  32. 32
  33. 33
  34. 34
  35. 35
  36. 36
  37. 37
  38. 38
  39. 39
  40. 40
  1. map $http_upgrade $connection_upgrade {
  2. default upgrade;
  3. '' close;
  4. }
  5. server {
  6. server_name ;
  7. location / {
  8. proxy_pass http://localhost:9096;
  9. proxy_read_timeout 300s;
  10. proxy_set_header Host $host;
  11. proxy_set_header X-Real-IP $remote_addr;
  12. proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  13. proxy_http_version 1.1;
  14. proxy_set_header Upgrade $http_upgrade;
  15. proxy_set_header Connection $connection_upgrade;
  16. }
  17. #root /opt/www/;
  18. #index ;
  19. error_page 404 /;
  20. location = / {
  21. }
  22. error_page 500 502 503 504 /;
  23. location = / {
  24. }
  25. listen 443 ssl; # managed by Certbot
  26. ssl_certificate /etc/letsencrypt/live//; # managed by Certbot
  27. ssl_certificate_key /etc/letsencrypt/live//; # managed by Certbot
  28. include /etc/letsencrypt/; # managed by Certbot
  29. ssl_dhparam /etc/letsencrypt/; # managed by Certbot
  30. }
  31. server {
  32. listen 80;
  33. server_name ;
  34. return 301 https://$host$request_uri; # managed by Certbot
  35. }

注意这其中和普通HTTP代理的关键不同是:

  1. 1
  2. 2
  1. proxy_set_header Upgrade $http_upgrade;
  2. proxy_set_header Connection $connection_upgrade;

参考文章

  • Android端与Java服务端交互——SocketIO
  • Spring Boot 集成 后端实现消息实时通信
  • Spring Boot实战之netty-socketio实现简单聊天室

GitHub源码

springboot-socketio

相关文章