概述
基于 socket.io 来说,采用 node 实现更加合适,本文使用两个后端的Java开源框架实现。
- 服务端使用 netty-socketio
- 客户端使用 -client-java
业务需求是将之前通过轮询方式调动RESTFul API改成使用WebSocket长连接方式,实现要服务器实时的推送消息,另外还要实时监控POS机的在线状态等。
引入依赖
-
<!-- netty-socketio-->
-
<dependency>
-
<groupId></groupId>
-
<artifactId>netty-socketio</artifactId>
-
<version>1.7.13</version>
-
</dependency>
-
<dependency>
-
<groupId></groupId>
-
<artifactId>netty-resolver</artifactId>
-
<version>4.1.</version>
-
</dependency>
-
<dependency>
-
<groupId></groupId>
-
<artifactId>netty-transport</artifactId>
-
<version>4.1.</version>
-
</dependency>
-
<dependency>
-
<groupId></groupId>
-
<artifactId>-client</artifactId>
-
<version>1.0.0</version>
-
</dependency>
先来服务端程序爽一把,话不多说,先上代码:
服务器代码
-
public class NamespaceSocketServer {
-
private static final Logger logger = ();
-
-
public static void main(String[] args) {
-
/*
-
* 创建Socket,并设置监听端口
-
*/
-
Configuration config = new Configuration();
-
//设置主机名
-
("localhost");
-
//设置监听端口
-
(9092);
-
// 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
-
(10000);
-
// Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
-
(180000);
-
// Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
-
(60000);
-
// 连接认证,这里使用token更合适
-
(new AuthorizationListener() {
-
@Override
-
public boolean isAuthorized(HandshakeData data) {
-
// String token = ("token");
-
// String username = (token);
-
// return (token, "secret");
-
return true;
-
}
-
});
-
-
final SocketIOServer server = new SocketIOServer(config);
-
-
/*
-
* 添加连接监听事件,监听是否与客户端连接到服务器
-
*/
-
(new ConnectListener() {
-
@Override
-
public void onConnect(SocketIOClient client) {
-
// 判断是否有客户端连接
-
if (client != null) {
-
("连接成功。clientId=" + ().toString());
-
(().getSingleUrlParam("appid"));
-
} else {
-
("并没有人连接上。。。");
-
}
-
}
-
});
-
-
/*
-
* 添加监听事件,监听客户端的事件
-
* 1.第一个参数eventName需要与客户端的事件要一致
-
* 2.第二个参数eventClase是传输的数据类型
-
* 3.第三个参数listener是用于接收客户端传的数据,数据类型需要与eventClass一致
-
*/
-
("login", , new DataListener<LoginRequest>() {
-
@Override
-
public void onData(SocketIOClient client, LoginRequest data, AckRequest ackRequest) {
-
("接收到客户端login消息:code = " + () + ",body = " + ());
-
// check is ack requested by client, but it's not required check
-
if (()) {
-
// send ack response with data to client
-
("已成功收到客户端登录请求", "yeah");
-
}
-
// 向客户端发送消息
-
List<String> list = new ArrayList<>();
-
("登录成功,sessionId=" + ());
-
// 第一个参数必须与eventName一致,第二个参数data必须与eventClass一致
-
String room = ().getSingleUrlParam("appid");
-
(room).sendEvent("login", ());
-
}
-
});
-
//启动服务
-
();
-
}
-
}
老规矩,先上代码爽爽
Android客户端
-
public class SocketClient {
-
private static Socket socket;
-
private static final Logger logger = ();
-
-
public static void main(String[] args) throws URISyntaxException {
-
IO.Options options = new IO.Options();
-
= new String[]{"websocket"};
-
= 2; // 重连尝试次数
-
= 1000; // 失败重连的时间间隔(ms)
-
= 20000; // 连接超时时间(ms)
-
= true;
-
= "username=test1&password=test1&appid=.apay2";
-
socket = ("http://localhost:9092/", options);
-
(Socket.EVENT_CONNECT, new Emitter.Listener() {
-
@Override
-
public void call(Object... args) {
-
// 客户端一旦连接成功,开始发起登录请求
-
LoginRequest message = new LoginRequest(12, "这是客户端消息体");
-
("login", (message), (Ack) args1 -> {
-
("回执消息=" + (args1).map(Object::toString).collect((",")));
-
});
-
}
-
}).on("login", new Emitter.Listener() {
-
@Override
-
public void call(Object... args) {
-
("接受到服务器房间广播的登录消息:" + (args));
-
}
-
}).on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
-
@Override
-
public void call(Object... args) {
-
("Socket.EVENT_CONNECT_ERROR");
-
();
-
}
-
}).on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() {
-
@Override
-
public void call(Object... args) {
-
("Socket.EVENT_CONNECT_TIMEOUT");
-
();
-
}
-
}).on(Socket.EVENT_PING, new Emitter.Listener() {
-
@Override
-
public void call(Object... args) {
-
("Socket.EVENT_PING");
-
}
-
}).on(Socket.EVENT_PONG, new Emitter.Listener() {
-
@Override
-
public void call(Object... args) {
-
("Socket.EVENT_PONG");
-
}
-
}).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
-
@Override
-
public void call(Object... args) {
-
("-----------接受到消息啦--------" + (args));
-
}
-
}).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
-
@Override
-
public void call(Object... args) {
-
("客户端断开连接啦。。。");
-
();
-
}
-
});
-
();
-
}
-
}
关于心跳机制
根据 文档 解释, 客户端会定期发送心跳包,并触发一个ping事件和一个pong事件,如下:
-
ping
Fired when a ping packet is written out to the server. -
pong
Fired when a pong is received from the server. Parameters:-
Number
number of ms elapsed sinceping
packet (.: latency)
-
这里最重要的两个服务器参数如下:
- pingTimeout (Number): how many ms without a pong packet to consider the connection closed (60000)
- pingInterval (Number): how many ms before sending a new ping packet (25000).
也就是说握手协议的时候,客户端从服务器拿到这两个参数,一个是ping消息的发送间隔时间,一个是从服务器返回pong消息的超时时间, 客户端会在超时后断开连接。心跳包发送方向是客户端向服务器端发送,以维持在线状态。
关于断线和超时
关闭浏览器、直接关闭客户端程序、kill进程、主动执行disconnect方法都会导致立刻产生断线事件。 而客户端把网络断开,服务器端在 pingTimeout
ms后产生断线事件、客户端在 pingTimeout
ms后也产生断线事件。
实际上,超时后会产生一个断线事件,叫”disconnect”。客户端和服务器端都可以对这个事件作出应答,释放连接。
客户端代码:
-
.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
-
@Override
-
public void call(Object... args) {
-
("客户端断开连接啦。。。");
-
();
-
}
-
});
下面是客户端日志:连上服务器后,断开网络。超过了心跳超时时间后,产生断线事件。如果客户端不主动断开连接的话,会自动重连, 这时候发现连接不上,又产生连接错误事件,然后重试2次,都失败后自动断开连接了。
|
|
服务器端代码:
-
(new DisconnectListener() {
-
@Override
-
public void onDisconnect(SocketIOClient client) {
-
("服务器收到断线通知... sessionId=" + ());
-
}
-
});
服务器逻辑是,如果在心跳超时后,就直接断开这个连接,并且产生一个断开连接事件。
服务器通过netty处理心跳包ping/pong的日志如下:
|
|
浏览器客户端演示
对于netty-socketio
有一个demo工程,里面通过一个网页的聊天小程序演示了各种使用方法。
demo地址:netty-socketio-demo
SpringBoot集成
最后重点讲一下如何在SpringBoot中集成。
修改配置
首先maven依赖之前已经讲过了,先修改下配置文件来配置下几个参数,比如主机、端口、心跳时间等等。
|
|
添加Bean配置
然后增加一个SocketServer的Bean配置:
-
@Configuration
-
public class NettySocketConfig {
-
-
@Resource
-
private MyProperties myProperties;
-
-
@Resource
-
private ApiService apiService;
-
@Resource
-
private ManagerInfoService managerInfoService;
-
-
private static final Logger logger = ();
-
-
@Bean
-
public SocketIOServer socketIOServer() {
-
/*
-
* 创建Socket,并设置监听端口
-
*/
-
.Configuration config = new com.();
-
// 设置主机名,默认是0.0.0.0
-
// ("localhost");
-
// 设置监听端口
-
(9096);
-
// 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间
-
(10000);
-
// Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
-
(60000);
-
// Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
-
(180000);
-
// 这个版本0.9.0不能处理好namespace和query参数的问题。所以为了做认证必须使用全局默认命名空间
-
(new AuthorizationListener() {
-
@Override
-
public boolean isAuthorized(HandshakeData data) {
-
// 可以使用如下代码获取用户密码信息
-
String username = ("username");
-
String password = ("password");
-
("连接参数:username=" + username + ",password=" + password);
-
ManagerInfo managerInfo = (username);
-
// MD5盐
-
String salt = ();
-
String encodedPassword = ShiroKit.md5(password, username + salt);
-
// 如果认证不通过会返回一个Socket.EVENT_CONNECT_ERROR事件
-
return (());
-
}
-
});
-
-
final SocketIOServer server = new SocketIOServer(config);
-
-
return server;
-
}
-
-
@Bean
-
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
-
return new SpringAnnotationScanner(socketServer);
-
}
-
}
后面还有个SpringAnnotationScanner
的定义不能忘记。注意,我在AuthorizationListener
里面通过调用service做了用户名和密码的认证。通过注解方式可以注入service, 执行相应的连接授权动作。
添加消息结构类
预先定义好客户端和服务器端直接传递的消息类型,使用简单的JavaBean即可,比如
-
public class ReportParam {
-
/**
-
* IMEI码
-
*/
-
private String imei;
-
/**
-
* 位置
-
*/
-
private String location;
-
-
public String getImei() {
-
return imei;
-
}
-
-
public void setImei(String imei) {
-
this.imei = imei;
-
}
-
-
public String getLocation() {
-
return location;
-
}
-
-
public void setLocation(String location) {
-
this.location = location;
-
}
-
}
这里才是最核心的接口处理类,所有接口处理逻辑都应该写在这里面,我只举了一个例子,就是POS上传位置接口:
添加消息处理类
-
/**
-
* 消息事件处理器
-
*
-
* @version 1.0
-
* @since 2018/1015
-
*/
-
@Component
-
public class MessageEventHandler {
-
-
private final SocketIOServer server;
-
private final ApiService apiService;
-
-
private static final Logger logger = ();
-
-
@Autowired
-
public MessageEventHandler(SocketIOServer server, ApiService apiService) {
-
this.server = server;
-
this.apiService = apiService;
-
}
-
-
//添加connect事件,当客户端发起连接时调用
-
@OnConnect
-
public void onConnect(SocketIOClient client) {
-
if (client != null) {
-
String imei = ().getSingleUrlParam("imei");
-
String applicationId = ().getSingleUrlParam("appid");
-
("连接成功, applicationId=" + applicationId + ", imei=" + imei +
-
", sessionId=" + ().toString() );
-
(applicationId);
-
// 更新POS监控状态为在线
-
ReportParam param = new ReportParam();
-
(imei);
-
(param, ().toString(), 1);
-
} else {
-
("客户端为空");
-
}
-
}
-
-
//添加@OnDisconnect事件,客户端断开连接时调用,刷新客户端信息
-
@OnDisconnect
-
public void onDisconnect(SocketIOClient client) {
-
String imei = ().getSingleUrlParam("imei");
-
("客户端断开连接, imei=" + imei + ", sessionId=" + ().toString());
-
// 更新POS监控状态为离线
-
ReportParam param = new ReportParam();
-
(imei);
-
(param, "", 2);
-
();
-
}
-
-
// 消息接收入口
-
@OnEvent(value = Socket.EVENT_MESSAGE)
-
public void onEvent(SocketIOClient client, AckRequest ackRequest, Object data) {
-
("接收到客户端消息");
-
if (()) {
-
// send ack response with data to client
-
("服务器回答Socket.EVENT_MESSAGE", "好的");
-
}
-
}
-
-
// 广播消息接收入口
-
@OnEvent(value = "broadcast")
-
public void onBroadcast(SocketIOClient client, AckRequest ackRequest, Object data) {
-
("接收到广播消息");
-
// 房间广播消息
-
String room = ().getSingleUrlParam("appid");
-
(room).sendEvent("broadcast", "广播啦啦啦啦");
-
}
-
-
/**
-
* 报告地址接口
-
* @param client 客户端
-
* @param ackRequest 回执消息
-
* @param param 报告地址参数
-
*/
-
@OnEvent(value = "doReport")
-
public void onDoReport(SocketIOClient client, AckRequest ackRequest, ReportParam param) {
-
("报告地址接口 start....");
-
BaseResponse result = postReport(param);
-
(result);
-
}
-
-
/*----------------------------------------下面是私有方法-------------------------------------*/
-
private BaseResponse postReport(ReportParam param) {
-
BaseResponse result = new BaseResponse();
-
int r = (param);
-
if (r > 0) {
-
(true);
-
("报告地址成功");
-
} else {
-
(false);
-
("该POS机还没有入网,报告地址失败。");
-
}
-
return result;
-
}
-
}
还有一个步骤就是添加启动器,在SpringBoot启动之后立马执行:
添加ServerRunner
-
/**
-
* SpringBoot启动之后执行
-
*
-
*
-
* @version 1.0
-
* @since 2018/10/15
-
*/
-
@Component
-
@Order(1)
-
public class ServerRunner implements CommandLineRunner {
-
private final SocketIOServer server;
-
private static final Logger logger = ();
-
-
@Autowired
-
public ServerRunner(SocketIOServer server) {
-
this.server = server;
-
}
-
-
@Override
-
public void run(String... args) throws Exception {
-
("ServerRunner 开始启动啦...");
-
();
-
}
-
}
要实现通过域名并走标准80或443端口的话,最好使用nginx做反向代理,跟正常的http反向代理基本一致, 不过websocket需要增加一个upgrade的配置。
nginx反向代理
下面我以一个实际使用例子来说明如何配置nginx的https访问websocket,并且开启301自动http跳转https。
首先要有一个域名,比如,然后申请letsencrypt的免费证书,这个过程我不讲了
配置如下:
|
|
注意这其中和普通HTTP代理的关键不同是:
|
|
参考文章
- Android端与Java服务端交互——SocketIO
- Spring Boot 集成 后端实现消息实时通信
- Spring Boot实战之netty-socketio实现简单聊天室
GitHub源码
springboot-socketio