实时通讯技术是一项基于web开发的重要技术,网站是需要前后端通讯的,因此数据刷新的时间就是获取信息的时间,为了能准确而有快速的获取信息需要尽可能的提高信息的刷新效率。
常见的实时通讯技术:
通讯方式 | Ajax | Comet | WebSocket | SSE |
---|---|---|---|---|
描述 | 短轮询是浏览器端提交表单查询 | 长轮询是服务器收到请求后如果有数据, 立刻响应请求; 如果没有数据就会 等待 一段时间,直到有数据后立刻响应请求; 如果时间到了还没有数据, 则响应 http 请求(定时刷新) | WebSocket的实现了一次连接,双方通信的功能。首先由客户端发出WebSocket请求,服务器端进行响应,TCP三次握手。这个连接一旦建立起来,就保持在客户端和服务器之间,两者之间可以直接的进行数据的互相传送。 | 在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式。sse 是单通道,只能服务端向客户端发消息 |
通讯协议 | http | http | websocket | http |
Ajax实现方案
Ajax及时通讯是比较方便实现的,通过浏览器DOM元素设定页面定时刷新就可以动态获取后端的数据。
- 添加setinterval()函数
<script language="javascript">
setInterval(function(){
window.location.reload();
},3000); //每隔3000毫秒刷新一次
</script>
- mata添加content元素间隔刷新
<meta http-equiv="refresh" content="20">
- 定时跳转转页面
<mata http-equiv="refresh" content"3,url=#">
//定时跳转也会刷新数据
通过js的动态刷新技术,再在刷新同添加Ajax通讯技术就可实现动态刷新了。
基于Ajax的长轮询方式
/*
setInterval(() => {
mychart2.clear();
axios({
method:'get',
url:'http://localhost:8100/json',
}).then(function (response) {
console.log(response)
temp=response.data
})
i=i+1;
option.xAxis.data.push(i);
option.xAxis.data.shift();
option.series[0].data.push(temp.data);
option.series[0].data.shift();
mychart2.setOption(option);
}, 3000);
*/
上函数通过在定时刷新里面添加了axios技术在间隔时间内查询数据并添加到列表中以实现动态刷新。如下图所示:
这种Ajax长轮询技术(Comet)的缺点是无法满足即时通信等富交互式应用的实时更新数据的要求。
基于Iframe的长轮询方式
该方法就是返回一个新的html,标记为src属性的变化。服务端将返回的数据作为回调函数的参数,浏览器在收到数据后就会执行脚本。
WebSocket实现方案
websocket通过握手连接,形成半双工的全通道通讯,由服务器和客户端都可以主动互相通讯。WebSocket可以让服务器直接给客户端发送信息,而不是先等客户端发起请求后、服务器才返回信息。(比起轮询,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。
后台编写:
package com.example;
import java.io.IOException;
import java.util.logging.Logger;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
/**
* @Class: Test
* @Description: 简单websocket demo
*/
@ServerEndpoint(value="/websocketTest/{userId}")
public class WsTest {
private Logger logger = Logger.getLogger("WebSocket");
private static String userId;
int i=0;
//连接时执行
@OnOpen
public void onOpen(@PathParam("userId") String userId,Session session) throws IOException{
this.userId = userId;
logger.info("有新的链接!");
System.out.println("新连接:"+userId);
}
//关闭时执行
@OnClose
public void onClose(){
logger.info("有链接关闭!");
System.out.println("连接:"+this.userId);
}
//收到消息时执行
@OnMessage
public void onMessage(String message, Session session) throws IOException {
System.out.println("收到用户"+this.userId+"的消息"+message);
//session.getBasicRemote().sendText("服务器 "+this.userId+" 发送来一条消息的消息 "); //回复用户
i++;
session.getBasicRemote().sendText(String.valueOf(i)); //返回数据
}
//连接错误时执行
@OnError
public void onError(Session session, Throwable error){
System.out.println("用户id为:"+this.userId+"的连接发送错误");
error.printStackTrace();
}
}
前端编写:
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title></title>
</head>
<body>
websocket Demo---- user000 <br />
<input id="text" type="text" />
<button onclick="send()"> Send </button>
<button onclick="closeWebSocket()"> Close </button>
<div id="message"> </div>
<script type="text/javascript">
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
websocket = new WebSocket("ws://localhost:8080/demo/websocketTest/user000");
console.log("link success")
}else{
alert('Not support websocket')
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
console.log("-----")
//接收到消息的回调方法
websocket.onmessage = function(event){
setMessageInnerHTML(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭连接
function closeWebSocket(){
websocket.close();
}
//发送消息
function send(){
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</body>
</html>
启动tomcat服务器访问websocket前端界面后返回数据:
SSE实现方案
SseEmitter
的用法是使用 HTTP 做服务端数据推送应用的技术。SSE ( Server-sent Events )是 WebSocket 的一种轻量代替方案,使用 HTTP 协议。
SSE 和 WebSocket 做的是同一件事情。当你需要用新数据局部更新网络应用时,SSE 可以做到不需要用户执行任何操作,便可以完成。SSE 是单向通道,只能服务器向客户端发送消息,如果客户端需要向服务器发送消息,则需要一个新的 HTTP 请求。 这对比 WebSocket 的双工通道来说,会有更大的开销。
这是SpringMVC提供的一种技术,可以实现服务端向客户端实时推送数据.用法非常简单,只需要在Controller提供一个接口,创建并返回SseEmitter对象,发送数据可以在另一个接口调用其send方法发送数据。因此sse需要在spring mvc或更集成的框架中使用,在servlet中无法使用。
SpringMVC内置SseEmitter类内置了几个方法,可以令我相当方便地使用服务器推送事件(Server Sent Event)。
在使用前有几个关于sse的前端知识需要了解:
-
sse机制不同于传统的“请求-响应”模型,在前端必须使用新建的
EventSource
对象请求一个sse,
然后监听此对象的message
事件以接收后端推送的值。 -
在前端请求一个sse时,在后端未主动关闭和事件未超时前服务器都不会对此请求做出响应以实现“连接”的效果, 一旦后端响应这个请求(后端主动或“连接”自动超时)则代表“连接”结束;
-
前端一旦接收到响应则EventSource对象会立刻自动重新连接以保证连接的有效性。
const source = new EventSource('http://localhost/sse');
source.addEventListener('message', (message: any) => {
console.log(message.data);
});
使用SseEmitter的步骤:
- 直接创建SseEmitter对象即可,创建时可设置一个超时时间(默认为30000毫秒),当达到此时间则立刻对请求响应(连接失效)。
SseEmitter sse() throws IOException {
SseEmitter event = new SseEmitter(10000L);
// 添加一些额外配置
event.send(
SseEmitter.event()
.reconnectTime(1000L)
.id("123")
);
concurrentHashMap.put(1, event);
return event;
}
-
发送事件可以调用创建好的对象的
send
方法,发送的数据即为在前端的接收到的message
事件中的data
属性值 -
发送的数据默认被识别为字符串,如果发送Map则在前端需要用JSON对象解析才能得到json数据
event.send(
SseEmitter.event()
.data("值")
// 更改原来的“message”事件名称
.name("event")
);
- 对于服务器端返回的响应,浏览器端需要在 JavaScript 中使用
EventSource
对象来进行处理。EventSource 使用的是标准的事件监听器方式,只需要在对象上添加相应的事件处理方法即可。EventSource 提供了三个标准事件。
var es = new EventSource('events');
es.onmessage = function(e) {
console.log(e.data);
};
es.addEventListener('myevent', function(e) {
console.log(e.data);
});
在指定 URL 创建出 EventSource 对象之后,可以通过 onmessage 和 addEventListener 方法来添加事件处理方法。当服务器端有新的事件产生,相应的事件处理方法会被调用。EventSource 对象的 onmessage 属性的作用类似于 addEventListener( ‘ message ’ ),不过 onmessage 属性只支持一个事件处理方法。
- SSE对象是服务端向客户端发送信息,因此需要周期性执行,需要借助线程功能。
ScheduledExecutorService
是基于ExecutorService的功能实现的延迟和周期执行任务的功能。每个任务以及每个任务的每个周期都会提交到线程池中由线程去执行,所以任务在不同周期内执行它的线程可能是不同的。ScheduledExecutorService接口的默认实现类是ScheduledThreadPoolExecutor。在周期执行的任务中,如果任务执行时间大于周期时间,则会以任务时间优先,等任务执行完毕后才会进入下一次周期。如下面所示:
/*
数据实时推送sse数据推送
*/
//ScheduledExecutorService```是基于ExecutorService的功能实现的延迟和周期执行任务的功能
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@GetMapping(value = "/timeclick")
public SseEmitter subscribeCC(){
//获取数据库点击量数据
//List<ClickTimes> clickTimes = null;
SseEmitter sseEmitter = new SseEmitter(0L);
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
sseEmitter.send(opt1Mapper.select_all_times());
} catch (IOException e) {
e.printStackTrace();
}
}
}, 10,5, TimeUnit.SECONDS); //每个任务10执行时间,每个5秒执行一次
return sseEmitter;
}
借助
ScheduledExecutorService
的scheduleWithFixedDelay
方法周期性执行任务。
SSE案例
//接口是在spring boot的controller下实现的
/*
数据实时推送sse数据推送
*/
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
@GetMapping(value = "/timeclick")
public SseEmitter subscribeCC(){
//获取数据库点击量数据
//List<ClickTimes> clickTimes = null;
SseEmitter sseEmitter = new SseEmitter(0L);
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
sseEmitter.send(opt1Mapper.select_all_times());
} catch (IOException e) {
e.printStackTrace();
}
}
}, 10,5, TimeUnit.SECONDS); //每个任务10执行时间,每个5秒执行一次
return sseEmitter;
}
function initEventSource() {
//请求地址,静态
const url = "/timeclick"
//接受实时推送数据的监听EventSource对象接收
const dataSource = new EventSource(url);
dataSource.onmessage= function (event){
console.log("SSE--->收到数据")
const vo = JSON.parse(event.data)
//调用更新实时数据方法
updateClickChart(vo)
}
}
在timeclick接口下,后台借助
ScheduledExecutorService
周期性返回数据,前端借助EventSource
对象接收,再调用其他方法将接受的数据更新。所以sse技术适用用可视化方案。
上述的单车点击项目即sse的项目,放在资源里了。