本文介绍一个基于websocket实现的远程实时日志系统,可以通过浏览器查看远程移动设备的实时运行日志。
系统由三个部分组成:
1. 服务器:与移动设备和浏览器建立websocket连接,将移动设备websocket上读取的实时日志转发到对应的浏览器的websocket中
2. 浏览器日志查看页面:与服务器建立websocket连接,通过websocket接收指定设备的实时运行日志并显示
3. 移动设备:与服务器建立websocket连接,将运行日志通过websocket连接上传至服务器
服务器端实现
Tomcat 7.0.27 开始支持Websocket了。本文的服务器端Servlet程序是搭建在Tomcat上的。关于在Tomcat上面实现支持websocket的servlet,可以参考
服务器Servlet源码:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServletRequest;
import org.apache.catalina.websocket.MessageInbound;
import org.apache.catalina.websocket.StreamInbound;
import org.apache.catalina.websocket.WebSocketServlet;
import org.apache.catalina.websocket.WsOutbound;
/**
* Servlet implementation class WebLogcat
*/
@WebServlet("/WebLogcat")
public class WebLogcat extends WebSocketServlet {
private static final long serialVersionUID = 1L;
private final Setconnections =
new CopyOnWriteArraySet();
private final Mapdevices =
new ConcurrentHashMap();
private final Map<String, Set> browsers =new ConcurrentHashMap<String, Set >();
@Override
protected StreamInbound createWebSocketInbound(String arg0,
HttpServletRequest arg1) {
String id = arg1.getParameter("id");
String type = arg1.getParameter("type");
if( id != null && type != null ) {
if( type.equalsIgnoreCase("device") ) {
return new DeviceMessageInbound( id );
} else if( type.equalsIgnoreCase("browser") ) {
return new BrowserMessageInbound( id );
}
}
// return NULL will lead to Exception
return new LogMessageInbound();
}
private final class DeviceMessageInbound extends MessageInbound {
private String _id;
DeviceMessageInbound(String id) {
_id = id;
}
@Override
protected void onClose(int status) {
// remove me from device hash map
devices.remove(_id);
super.onClose(status);
}
@Override
protected void onOpen(WsOutbound outbound) {
// add me to device hash map
devices.put(_id, this);
super.onOpen(outbound);
}
@Override
protected void onBinaryMessage(ByteBuffer arg0) throws IOException {
}
@Override
protected void onTextMessage(CharBuffer arg0) throws IOException {
// broadcast to all browser with the same id as me
String message = new String(arg0.array());
Setlist = browsers.get( _id );if( list != null ) {
for (BrowserMessageInbound connection : list) {
try {
CharBuffer buffer = CharBuffer.wrap(message);
connection.getWsOutbound().writeTextMessage(buffer);
} catch (IOException ignore) {
// Ignore
}
}
}
}
}
private final class BrowserMessageInbound extends MessageInbound {
private String _id;
@Override
protected void onClose(int status) {
synchronized( browsers ) {
Setlist = browsers.get( _id ); if( list != null ) {
list.remove(this);
if( list.isEmpty() ) {
browsers.remove(_id);
}
}
}
super.onClose(status);
}
@Override
protected void onOpen(WsOutbound outbound) {
synchronized( browsers ) {
if( browsers.containsKey(_id) ) {
browsers.get(_id).add(this);
} else {
Setlist = new CopyOnWriteArraySet ();list.add(this);
browsers.put(_id, list);
}
}
super.onOpen(outbound);
}
BrowserMessageInbound(String id) {
_id = id;
}
@Override
protected void onBinaryMessage(ByteBuffer arg0) throws IOException {
}
@Override
protected void onTextMessage(CharBuffer arg0) throws IOException {
}
}
private final class LogMessageInbound extends MessageInbound {
@Override
protected void onClose(int status) {
connections.remove(this);
super.onClose(status);
}
@Override
protected void onOpen(WsOutbound outbound) {
super.onOpen(outbound);
connections.add(this);
}
@Override
protected void onBinaryMessage(ByteBuffer arg0) throws IOException {
}
@Override
protected void onTextMessage(CharBuffer arg0) throws IOException {
}
}
}
要实现支持websocket的servlet,需要继承WebSocketServlet, 实现createWebSocketInbound函数,返回代表websocket连接的对象。
本文中,程序接收请求中传递的id和type参数,iid代表了设备的id,或者浏览器想要查看实时日志的设备的id。type代表了连接请求是来自设备还是浏览器。Servlet根据type的值创建对应类型的websocket连接。如果连接请求是来自设备device,则创建DeviceMessageInbound。如果设备请求是来自浏览器browser,则创建BrowserMessageInbound。
DeviceMessageInbound 和 BrowserMessageInbound 都继承自MessageInbound
对于DeviceMessageInbound,
在onOpen()中,将自己加入到设备Map表,
在onClose()中,将自己从设备Map表中移除。
onTextMessage()就是Servlet收到了来自设备的文本消息。Servlet根据设备id,找到所有正在监听此设备实时日志的browser,把文本消息转发给browser。
对于BrowserMessageInbound,
在onOpen()中,要将自己加入到监听对于id的browser列表中。由于针对同一个设备id,允许多个浏览器同时查看其实时日志。所以要先判断是否已经存在对应id的浏览器连接。如果不存在,则创建一个列表,并将此列表插入到browsers这个Map中。如果已经存在,则根据id找到列表,将自己加入到此列表中。
在onClose中,通过id找到列表,将自己从列表中移除。移除后如果列表为空,在将列表从browsers Map中移除。
浏览器端实现
<!DOCTYPE html>
<html>
<head>
<title>WebLogcat</title>
<style type="text/css">
input#chat {
width: 410px
}
#console-container {
width: 400px;
}
#console {
border: 1px solid #CCCCCC;
border-right-color: #999999;
border-bottom-color: #999999;
height: 170px;
overflow-y: scroll;
padding: 5px;
width: 100%;
}
#console p {
padding: 0;
margin: 0;
}
</style>
<script type="text/javascript">
var Chat = {};
Chat.socket = null;
Chat.connect = (function(host) {
if ('WebSocket' in window) {
Chat.socket = new WebSocket(host);
} else if ('MozWebSocket' in window) {
Chat.socket = new MozWebSocket(host);
} else {
Console.log('Error: WebSocket is not supported by this browser.');
return;
}
Chat.socket.onopen = function () {
Console.log('Info: WebSocket connection opened.');
document.getElementById('chat').onkeydown = function(event) {
if (event.keyCode == 13) {
Chat.sendMessage();
}
};
};
Chat.socket.onclose = function () {
document.getElementById('chat').onkeydown = null;
Console.log('Info: WebSocket closed.');
};
Chat.socket.onmessage = function (message) {
Console.log(message.data);
};
});
Chat.initialize = function() {
if (window.location.protocol == 'http:') {
Chat.connect('ws://' + window.location.host + '/WebLogcat/WebLogcat?id=fv0557&type=browser');
} else {
Chat.connect('wss://' + window.location.host + '/WebLogcat/WebLogcat?id=fv0557&type=browser');
}
};
Chat.sendMessage = (function() {
var message = document.getElementById('chat').value;
if (message != '') {
Chat.socket.send(message);
document.getElementById('chat').value = '';
}
});
var Console = {};
Console.log = (function(message) {
var console = document.getElementById('console');
var p = document.createElement('p');
p.style.wordWrap = 'break-word';
p.innerHTML = message;
console.appendChild(p);
while (console.childNodes.length > 25) {
console.removeChild(console.firstChild);
}
console.scrollTop = console.scrollHeight;
});
Chat.initialize();
</script>
</head>
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being enabled. Please enable
Javascript and reload this page!</h2></noscript>
<div>
<p>
<input type="text" placeholder="type and press enter to chat" id="chat">
</p>
<div id="console-container">
<div id="console"></div>
</div>
</div>
</body>
</html>
这其实是一个HTML5的页面,同样是需要部署到服务器上的。只不过用户通过浏览器访问,在浏览器上运行而已。
这个其实修改自Tomcat自带的一个Websocket的例子,叫Chat。里面关于device id是hard code为fv0557。
设备端实现
本文中的设备指的是一个嵌入式设备,是运行linux的ARM系统。所以选用libwebsocket来实现一个websocket的客户端。
#include <libwebsockets.h>
#include <stdio.h>
static int was_closed = 0;
static int deny_deflate;
static int deny_mux;
/* dumb_increment protocol */
static int
callback_weblogcat(struct libwebsocket_context *this,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason,
void *user, void *in, size_t len)
{
unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 +
LWS_SEND_BUFFER_POST_PADDING];
int l;
switch (reason) {
case LWS_CALLBACK_CLOSED:
fprintf(stderr, "LWS_CALLBACK_CLOSED\n");
was_closed = 1;
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
/*
* LWS_CALLBACK_CLIENT_WRITEABLE will come next service
*/
fprintf(stderr, "LWS_CALLBACK_CLIENT_ESTABLISHED\n");
libwebsocket_callback_on_writable(this, wsi);
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
((char *)in)[len] = '\0';
fprintf(stderr, "rx %d '%s'\n", (int)len, (char *)in);
break;
case LWS_CALLBACK_CLIENT_WRITEABLE:
l = sprintf((char *)&buf[LWS_SEND_BUFFER_PRE_PADDING],
"c #%06X %d %d %d;",
(int)random() & 0xffffff,
(int)random() % 500,
(int)random() % 250,
(int)random() % 24);
libwebsocket_write(wsi,
&buf[LWS_SEND_BUFFER_PRE_PADDING], l, LWS_WRITE_TEXT);
/* get notified as soon as we can write again */
libwebsocket_callback_on_writable(this, wsi);
sleep(3);
break;
/* because we are protocols[0] ... */
case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
if ((strcmp(in, "deflate-stream") == 0) && deny_deflate) {
fprintf(stderr, "denied deflate-stream extension\n");
return 1;
}
if ((strcmp(in, "x-google-mux") == 0) && deny_mux) {
fprintf(stderr, "denied x-google-mux extension\n");
return 1;
}
break;
default:
break;
}
return 0;
}
static struct libwebsocket_protocols protocols[] = {
{
NULL,
callback_weblogcat,
0,
},
{ /* end of list */
NULL,
NULL,
0
}
};
int main( int argc, char* argv[])
{
struct libwebsocket_context *context;
struct libwebsocket *wsi_weblogcat;
const char *address = "192.168.xxx.xxx";
int port = 8080;
int use_ssl = 0;
int n = 0;
context = libwebsocket_create_context(CONTEXT_PORT_NO_LISTEN, NULL,
protocols, NULL,
NULL, NULL, NULL, -1, -1, 0, NULL);
if (context == NULL) {
fprintf(stderr, "Creating libwebsocket context failed\n");
return 1;
}
/* create a client websocket using weblogcat protocol */
wsi_weblogcat = libwebsocket_client_connect(context, address, port, use_ssl,
"/WebLogcat/WebLogcat?id=fv0557&type=device", address, address,
protocols[0].name, -1);
if (wsi_weblogcat == NULL) {
fprintf(stderr, "libwebsocket weblogcat connect failed\n");
return -1;
}
fprintf(stderr, "Websocket weblogcat connections opened\n");
n = 0;
while (n >= 0 && !was_closed) {
n = libwebsocket_service(context, 1000);
if (n < 0)
continue;
}
fprintf(stderr, "Exiting\n");
libwebsocket_context_destroy(context);
return 0;
}
这个也是修改自libwebsocket中test-client.c 文件。这个只是模拟每隔一段时间,发送一些数据到server。这里只是一个demo,用来测试在设备上运行一个websocket的client。要实现真正的实时日志系统,这个进程应该提供一个接口用来给系统中的其他进程向其发送日志。然后这个进程再通过websocket发送给服务器。或者象Android一样,系统中的程序将日志记录到Log设备中,然后这个进程从Log设备中读取数据,再通过Websocket发送给服务器。