Go实现基于WebSocket的弹幕服务
## 拉模式和推模式 ### 拉模式 1、数据更新频率低,则大多数请求是无效的 2、在线用户量多,则服务端的查询负载高 3、定时轮询拉取,实时性低 ### 推模式 1、仅在数据更新时才需要推送 2、需要维护大量的在线长连接 3、数据更新后可以立即推送 ## 基于webSocket推送 1、浏览器支持的socket编程,轻松维持服务端长连接 2、基于TCP可靠传输之上的协议,无需开发者关心通讯细节 3、提供了高度抽象的编程接口,业务开发成本较低 ## webSocket协议与交互 ### 通讯流程 客户端->upgrade->服务端 客户端<-switching<-服务端 客户端->message->服务端 客户端<-message<-服务端 ## 实现http服务端 1、webSocket是http协议upgrade而来 2、使用http标准库快速实现空接口:/ws ## webSocket握手 1、使用webSocket.Upgrader完成协议握手,得到webSocket长连接 2、操作webSocket api,读取客户端消息,然后原样发送回去 ## 封装webSocket ### 缺乏工程化设计 1、其他代码模块,无法直接操作webSocket连接 2、webSocket连接非线程安全,并发读/写需要同步手段 ### 隐藏细节,封装api 1、封装Connection结构,隐藏webSocket底层连接 2、封装Connection的api,提供Send/Read/Close等线程安全接口 ### api原理(channel是线程安全的) 1、SendMessage将消息投递到out channel 2、ReadMessage从in channel读取消息 ## 内部原理 1、启动读协程,循环读取webSocket,将消息投递到in channel 2、启动写协程,循环读取out channel,将消息写给webSocket ```go // server.go package main import ( "net/http" "github.com/gorilla/websocket" "./impl" "time" ) var ( upgrader = websocket.Upgrader{ //允许跨域 CheckOrigin: func(r *http.Request) bool { return true }, } ) func wsHandler(w http.ResponseWriter, r *http.Request) { var ( wsConn *websocket.Conn err error conn *impl.Connection data []byte ) //Upgrade:websocket if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil { return } if conn, err = impl.InitConnection(wsConn); err != nil { goto ERR } go func() { var ( err error ) for { if err =conn.WriteMessage([]byte("heartbeat")); err != nil { return } time.Sleep(1 * time.Second) } }() for { if data, err = conn.ReadMessage(); err != nil { goto ERR } if err = conn.WriteMessage(data); err != nil { goto ERR } } ERR: //关闭连接 conn.Close() } func main() { //http:localhost:7777/ws http.HandleFunc("/ws", wsHandler) http.ListenAndServe("0.0.0.0:7777", nil) } ``` ```go // connection.go package impl import ( "github.com/gorilla/websocket" "sync" "github.com/influxdata/platform/kit/errors" ) var once sync.Once type Connection struct { wsConn *websocket.Conn inChan chan []byte outChan chan []byte closeChan chan byte isClosed bool mutex sync.Mutex } func InitConnection(wsConn *websocket.Conn) (conn *Connection, err error) { conn = &Connection{ wsConn:wsConn, inChan:make(chan []byte, 1000), outChan:make(chan []byte, 1000), closeChan:make(chan byte, 1), } //启动读协程 go conn.readLoop() //启动写协程 go conn.writeLoop() return } //API func (conn *Connection) ReadMessage() (data []byte, err error) { select { case data = <- conn.inChan: case <- conn.closeChan: err = errors.New("connection is closed") } return } func (conn *Connection) WriteMessage(data []byte) (err error) { select { case conn.outChan <- data: case <- conn.closeChan: err = errors.New("connection is closed") } return } func (conn *Connection) Close() { // 线程安全的close,可重入 conn.wsConn.Close() conn.mutex.Lock() if !conn.isClosed { close(conn.closeChan) conn.isClosed = true } conn.mutex.Unlock() } //内部实现 func (conn *Connection) readLoop() { var ( data []byte err error ) for { if _, data, err = conn.wsConn.ReadMessage(); err != nil { goto ERR } //阻塞在这里,等待inChan有空位置 //但是如果writeLoop连接关闭了,这边无法得知 //conn.inChan <- data select { case conn.inChan <- data: case <-conn.closeChan: //closeChan关闭的时候,会进入此分支 goto ERR } } ERR: conn.Close() } func (conn *Connection) writeLoop() { var ( data []byte err error ) for { select { case data = <- conn.outChan: case <- conn.closeChan: goto ERR } if err = conn.wsConn.WriteMessage(websocket.TextMessage, data); err != nil { goto ERR } conn.outChan <- data } ERR: conn.Close() } ```