spring websocket源码分析

时间:2022-06-15 06:29:07

什么是websocket?

摘录于wiki【1】:

WebSocket is a protocol providing full-duplex communication channels over a single TCP connection. The WebSocket protocol was standardized by the IETF as RFC 6455 in 2011, and the WebSocket API in Web IDL is being standardized by theW3C.

WebSocket is designed to be implemented in web browsers and web servers, but it can be used by any client or server application. The WebSocket Protocol is an independent TCP-based protocol. Its only relationship to HTTP is that its handshake is interpreted by HTTP servers as an Upgrade request.[1] The WebSocket protocol makes more interaction between a browser and a website possible, facilitating the real-time data transfer from and to the server. This is made possible by providing a standardized way for the server to send content to the browser without being solicited by the client, and allowing for messages to be passed back and forth while keeping the connection open. In this way a two-way (bi-directional) ongoing conversation can take place between a browser and the server. The communications are done over TCP port number 80, which is of benefit for those environments which block non-web Internet connections using a firewall. Similar two-way browser-server communications have been achieved in non-standardized ways using stopgap technologies such as Comet.

The WebSocket protocol is currently supported in most major browsers including Google ChromeInternet ExplorerFirefoxSafari and Opera. WebSocket also requires web applications on the server to support it.

Unlike HTTP, WebSocket provides full-duplex communication. Additionally, WebSocket enables streams of messages on top of TCP. TCP alone deals with streams of bytes with no inherent concept of a message. Before WebSocket, port 80 full-duplex communication was attainable using Comet channels; however, Comet implementation is nontrivial, and due to the TCP handshake and HTTP header overhead, it is inefficient for small messages. WebSocket protocol aims to solve these problems without compromising security assumptions of the web.

The WebSocket protocol specification defines ws and wss as two new uniform resource identifier (URI) schemes that are used for unencrypted and encrypted connections, respectively. Apart from the scheme name and fragment (# is not supported), the rest of the URI components are defined to use URI generic syntax.

Using the Google Chrome Developer Tools, developers can inspect the WebSocket handshake as well as the WebSocket frames.

简单来说,websocket是一个基于tcp的双工协议,分客户端和服务器端,不同于http的协议。作用是使浏览器和站点之间的交互更便利,使实时数据可以传送到或者发送出服务器端。

什么是sockjs

SockJS 是一个浏览器上运行的 JavaScript 库,如果浏览器不支持 WebSocket,该库可以模拟对 WebSocket 的支持,实现浏览器和 Web 服务器之间低延迟、全双工、跨域的通讯通道【2】。

SockJS family:

Work in progress:

websocket消息

抽象接口WebSocketMessage:A message that can be handled or sent on a WebSocket connection.

public interface WebSocketMessage<T> {

    /**
* Returns the message payload. This will never be {@code null}.
*/
T getPayload(); /**
* Return the number of bytes contained in the message.
*/
int getPayloadLength(); /**
* When partial message support is available and requested via
* {@link org.springframework.web.socket.WebSocketHandler#supportsPartialMessages()},
* this method returns {@code true} if the current message is the last part of the
* complete WebSocket message sent by the client. Otherwise {@code false} is returned
* if partial message support is either not available or not enabled.
*/
boolean isLast(); }

其中,Payload有效信息

spring websocket源码分析

websocket客户端

抽象接口WebSocketClient定义了初始化一个websocket请求的规范。当应用启动时,启动一个websocket连接到预配置的url时可以考虑使用WebSocketConnectionManager。

WebSocketConnectionManager在指定uri,WebsocketClient和websocketHandler时通过start()和stop()方法连接到websocket服务器。如果setAutoStartup(boolean)设置为true,spring ApplicationContext刷新时会自动连接。

ConnectionManagerSupport是WebSocketConnectionManager的基类

    /**
* Start the websocket connection. If already connected, the method has no impact.
*/
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
startInternal();
}
}
} protected void startInternal() {
synchronized (lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Starting " + this.getClass().getSimpleName());
}
this.isRunning = true;
openConnection();
}
} protected abstract void openConnection();

我们来看看StandardWebSocketClient怎么工作的?

StandardWebSocketClient通过标准的java websocket api编程式初始化一个连接到websocket服务器的websocket请求。

它有两个私有成员:

    private final WebSocketContainer webSocketContainer;

    private AsyncListenableTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();

其中,WebSocketContainer使用java本身提供的api获取:

    public StandardWebSocketClient() {
this.webSocketContainer = ContainerProvider.getWebSocketContainer();
}

其中,SimpleAsyncTaskExecutor为每个task触发一个新的线程来异步的执行。

/**
* Executes the given task, within a concurrency throttle
* if configured (through the superclass's settings).
* @see #doExecute(Runnable)
*/
@Override
public void execute(Runnable task) {
execute(task, TIMEOUT_INDEFINITE);
} /**
* Executes the given task, within a concurrency throttle
* if configured (through the superclass's settings).
* <p>Executes urgent tasks (with 'immediate' timeout) directly,
* bypassing the concurrency throttle (if active). All other
* tasks are subject to throttling.
* @see #TIMEOUT_IMMEDIATE
* @see #doExecute(Runnable)
*/
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(task));
}
else {
doExecute(task);
}
}
/**
* Template method for the actual execution of a task.
* <p>The default implementation creates a new Thread and starts it.
* @param task the Runnable to execute
* @see #setThreadFactory
* @see #createThread
* @see java.lang.Thread#start()
*/
protected void doExecute(Runnable task) {
Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
thread.start();
}

接下来,我们看一下StandardWebSocketClient的握手过程:

@Override
protected ListenableFuture<WebSocketSession> doHandshakeInternal(WebSocketHandler webSocketHandler,
HttpHeaders headers, final URI uri, List<String> protocols,
List<WebSocketExtension> extensions, Map<String, Object> attributes) { int port = getPort(uri);
InetSocketAddress localAddress = new InetSocketAddress(getLocalHost(), port);
InetSocketAddress remoteAddress = new InetSocketAddress(uri.getHost(), port); final StandardWebSocketSession session = new StandardWebSocketSession(headers,
attributes, localAddress, remoteAddress); final ClientEndpointConfig.Builder configBuilder = ClientEndpointConfig.Builder.create();
configBuilder.configurator(new StandardWebSocketClientConfigurator(headers));
configBuilder.preferredSubprotocols(protocols);
configBuilder.extensions(adaptExtensions(extensions));
final Endpoint endpoint = new StandardWebSocketHandlerAdapter(webSocketHandler, session); Callable<WebSocketSession> connectTask = new Callable<WebSocketSession>() {
@Override
public WebSocketSession call() throws Exception {
webSocketContainer.connectToServer(endpoint, configBuilder.build(), uri);
return session;
}
}; if (this.taskExecutor != null) {
return this.taskExecutor.submitListenable(connectTask);
}
else {
ListenableFutureTask<WebSocketSession> task = new ListenableFutureTask<WebSocketSession>(connectTask);
task.run();
return task;
}
}

websocket服务器端

注解:EnableWebSocketMessageBroker

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketMessageBrokerConfiguration.class)
public @interface EnableWebSocketMessageBroker { }

注解:EnableWebSocket

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketConfiguration.class)
public @interface EnableWebSocket {
}

示例程序

1.客户端【3】index.html

<!DOCTYPE html>
<html>
<head>
<title>Hello WebSocket</title>
<script src="sockjs-0.3.4.js"></script>
<script src="stomp.js"></script>
<script type="text/javascript">
var stompClient = null; function setConnected(connected) {
document.getElementById('connect').disabled = connected;
document.getElementById('disconnect').disabled = !connected;
document.getElementById('conversationDiv').style.visibility = connected ? 'visible' : 'hidden';
document.getElementById('response').innerHTML = '';
} function connect() {
var socket = new SockJS('/hello');
stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
setConnected(true);
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/greetings', function(greeting){
showGreeting(JSON.parse(greeting.body).content);
});
});
} function disconnect() {
if (stompClient != null) {
stompClient.disconnect();
}
setConnected(false);
console.log("Disconnected");
} function sendName() {
var name = document.getElementById('name').value;
stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name }));
} function showGreeting(message) {
var response = document.getElementById('response');
var p = document.createElement('p');
p.style.wordWrap = 'break-word';
p.appendChild(document.createTextNode(message));
response.appendChild(p);
}
</script>
</head>
<body onload="disconnect()">
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websocket relies on Javascript being enabled. Please enable
Javascript and reload this page!</h2></noscript>
<div>
<div>
<button id="connect" onclick="connect();">Connect</button>
<button id="disconnect" disabled="disabled" onclick="disconnect();">Disconnect</button>
</div>
<div id="conversationDiv">
<label>What is your name?</label><input type="text" id="name" />
<button id="sendName" onclick="sendName();">Send</button>
<p id="response"></p>
</div>
</div>
</body>
</html>

2.服务端程序【3】WebSocketConfig.java

package hello;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry; @Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
} @Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/hello").withSockJS();
} }

参考文献:

【1】https://en.wikipedia.org/wiki/WebSocket

【2】http://www.oschina.net/p/sockjs/similar_projects?lang=0&sort=time&p=1

【3】https://spring.io/guides/gs/messaging-stomp-websocket/