在类似网关这种海量连接, 很高的并发的场景, 比如有 10W+ 连接, go 开始变得吃力. 因为频繁的 goroutine 调度和 gc 导致程序性能很差. 这个时候我们可以考虑用经典的 reactor 网络模型来应对这种需求
常见网络模型
下面是目前常见的网络模型
go 原生网络模型
go 通过 IO 多路复用构建了一套简洁而高性能原生网络模型, 让开发者可以使用同步的模式编写异步的逻辑(给每个连接开一个协程处理), 极大降低开发的难度
goroutine 从 fd Read() 数据但是又没有数据的时候, 会将当前的 goroutine 给 pack 住, 直到这个 fd 发生读事件这个 goroutine 才会重新 ready 激活(再次发生读事件就是 epoll 之类 IO 多路复用触发的)
贴一个 给每个连接开一个协程处理 的 demo 代码
package main
import "net"
func main() {
l, _ := net.Listen("tcp", ":5000")
for {
// 新的连接
conn, _ := l.Accept()
// 给每个连接开启一个协程处理
go handler(conn)
}
}
func handler(conn net.Conn) {
for {
// 读取数据
// 业务处理
// 写入返回数据
}
}
但是在类似网关这种海量连接, 很高的并发的场景, 比如有 10W+ 连接, go 开始变得吃力. 因为频繁的 goroutine 调度和 gc 导致程序性能很差. 这个时候我们可以考虑用经典的 reactor 来应对这种需求
go 原生网络模型不是讨论的重点, 学习请参考 reference
Proactor
第五个 Proactor 是从底层支持的真正的异步 IO, 底层在 IO 完成后会通知应用层
其他都是同步 IO, 因为没有像同步 IO 一样成熟和广泛使用, 所以不常用
Reactor 网络模型
单 Reactor 单线程
-
Reactor 通过 IO多路复用 监听 IO 事件, 然后将 IO 事件分配给 Acceptor 或者对应的 Handler 处理
-
如果是建立连接事件: 交给 Acceptor 处理, Acceptor 会通过 accept 方法获得一个连接, 然后再创建一个 handler 来处理对应的响应事件
-
如果不是建立连接事件: 交个当前连接对应的 Handler 对象进行响应
-
Handler 对象需要从连接 read 数据 -> 业务处理 -> 将返回数据 write 到连接中
Redis 单 Reactor 单线程模型
- 使用 IO多路复用 循环获取事件并进行事件分发
- 事件分发器会通过回调函数 串行 执行所有的事件(accept 连接创建, read, 业务处理, write)
单 Reactor 多线程
-
Reactor 通过 IO多路复用 监听 IO 事件, 然后将 IO 事件分配给 Acceptor 或者对应的 Handler 处理
-
如果不是建立连接事件: 交给 Acceptor 处理, Acceptor 会通过 accept 方法获得一个连接, 然后再创建一个 handler 来处理对应的响应事件
-
如果不是是事件, 就交个当前连接对应的 Handler 对象进行响应
前面都是一样的
- 主线程 Handler 对象需要从连接 read 数据 , 然后给线程池 processor 执行
- 线程池执行业务完毕后会添加写事件到 Reactor
- Reactor 下一次事件循环的时候会处理写事件
其实上面的模型给人一种 handler 要等待 processor 执行完成再执行 write 的感觉, 其实这样和单线程没啥区别, 所以不能这样理解
正确的是 processor 执行完后将写事件添加到 Reactor 然后 Reactor 下一次循环的时候处理写事件
下面是数据流动正确的图
redis 6.0 的多线程 IO
不同的是
- redis 6.0 : IO 是多线程, 业务处理是主线程
- 单 Reactor 多线程网络模型: IO 是主线程, 业务处理是多线程
- 主线程 使用 Reactor epoll_wait (IO 多路复用) 获取可读的 socket fd, 并分配给 IO 线程池读取数据, 主线程阻塞等待 IO 线程池将数据读取完毕
- IO 线程层读取数据并解析为 redis 命令, 放到读缓冲区
- 主线程顺序执行解析好的命令, 并将返回的结果放到写缓冲区, 阻塞等待 IO 线程数据写入完毕
- IO 线程写入返回的数据到 fd
- 清空缓冲区和队列, 继续事件循环
多 Reactor 多线程
因为一个 Reactor 对象承担所有事件的监听和响应, 而且只在主线程中运行, 在面对瞬间高并发的场景时, 容易成为性能的瓶颈
所以就考虑多 Reactor 来解决这个问题
- 主线程的 Reactor 负责 accept 连接, 然后将连接交给线程池的 Reactor
- 线程池的 Reactor 负责获取连接的读事件, 然后交给 handler 执行
reference
如何深刻理解Reactor和Proactor? - 知乎 (zhihu.com)
如何用Go实现一个异步网络库? - 知乎 (zhihu.com)
Go netpoller 网络模型之源码全面解析 - 知乎 (zhihu.com)
Redis中的Reactor模型 - 掘金 (juejin.cn)
Redis 6.0 多线程IO处理过程详解 - 知乎 (zhihu.com)
Redis 6.0 新特性:带你 100% 掌握多线程模型 - SegmentFault 思否
gev 源码分析
阅读这篇文章前, 请一定先了解 Reactor 网络模型
简介
gev
是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库 / websocket server,支持自定义协议,轻松快速搭建高性能服务器。
特点
- 基于 epoll 和 kqueue 实现的高性能事件循环
- 支持多核多线程
- 动态扩容 Ring Buffer 实现的读写缓冲区
- 异步读写
- 自动清理空闲连接
- SO_REUSEPORT 端口重用支持
- 支持 WebSocket/Protobuf, 自定义协议
- 支持定时任务,延时任务
- 开箱即用的高性能 websocket server
网络模型
gev
只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。
多 Reactor 多线程
gev 是基于多 Reactor 多线程网络模型构建的, listener(MainReactor) 通过负载均衡策略将新连接分配给 worker(子线程Reactor)
gev.NewServer 初始化
// NewServer 创建 Server
// @param handler 用户定义的 Handler 处理逻辑
// @param opts 配置
func NewServer(handler Handler, opts ...Option) (server *Server, err error)
Handler 用户自定义的处理逻辑的函数
// Handler Server 注册接口
type Handler interface {
CallBack
OnConnect(c *Connection)
}
type CallBack interface {
OnMessage(c *Connection, ctx interface{}, data []byte) interface{}
OnClose(c *Connection)
}
配置参数
// Options 服务配置
type Options struct {
Network string // 目前只支持 tcp
Address string // 监听地址
NumLoops int // worker 的数量
ReusePort bool // 是否可重用端口
IdleTime time.Duration // 最大空闲时间
Protocol Protocol // 自定义数据包的拆包解包 Pack, UnPack
Strategy LoadBalanceStrategy // 负载均衡策略, 目前支持轮询和最小连接数
tick time.Duration // 层级时间轮的 tick
wheelSize int64 // 层级时间轮 size
metricsPath, metricsAddress string // metric 指标暴露地址
}
Server
// Server gev Server
type Server struct {
listener *listener // 监听的主 Reactor
workLoops []*eventloop.EventLoop // worker Reactor
callback Handler // 用户定义的 Handler 函数
timingWheel *timingwheel.TimingWheel // 类似 kafka 中的层级定时器
opts *Options // Options 配置
running atomic.Bool // 状态是否为正在运行
}
listener
// listener 监听TCP连接, 作为多Reactor多线程网络模型中的主 Reactor
type listener struct {
file *os.File // 监听的 socket 文件
fd int // 监听的 socket 文件 fd
handleC handleConnFunc // 处理新连接 Conn 的函数(默认是通过负载均衡交给 worker)
listener net.Listener // Listener 对象
loop *eventloop.EventLoop // 事件循环
}
eventloop: listener 和 worker 都是通过 eventloop 来实现的
type eventLoopLocal struct {
ConnCunt atomic.Int64 // 连接数
needWake *atomic.Bool // 是否需要唤醒
poll *poller.Poller // poller linux 下由 epoll 实现
mu spinlock.SpinLock // 自旋锁
sockets map[int]Socket // fd -> Socket
packet []byte // 内部使用,临时缓冲区
taskQueueW []func() // 写事件队列
taskQueueR []func() // 读事件队列
UserBuffer *[]byte // 用户缓冲区
}
poller.Poller 在 linux 下由 epoll 实现, eventloop 通过 poller.Poller 来实现事件循环
type Poller struct {
fd int
eventFd int
buf []byte
running atomic.Bool
waitDone chan struct{}
}
总结:
- NewServer 初始化 Server, 会初始化一个 listener (主 Reactor), 一个或多个 worker Reactor, 一个层级时间轮, 一个用户自定义的 Handler
- listener, 和 worker 通过 poller.Poller 来实现事件循环, listener 通过负载均衡策略将新连接分配给 worker (Reactor 网络模型是: 一个主 Reactor, 多个 worker Reactor)
s.Start() 运行
// Start 启动 Server
// 在协程中启动主 Reactor 和 worker Reactor, 开始事件循环
func (s *Server) Start() {
sw := sync.WaitGroupWrapper{}
// 启动定时器
s.timingWheel.Start()
// 启动 worker
length := len(s.workLoops)
for i := 0; i < length; i++ {
sw.AddAndRun(s.workLoops[i].Run)
}
// 启动主 Reactor
sw.AddAndRun(s.listener.Run)
s.running.Set(true)
sw.Wait()
}
listener.Run 和 worker.Run 都是通过 eventloop.Run 来实现的, 传入的回调函数不同
// Run 启动事件循环
func (l *EventLoop) Run() {
l.poll.Poll(l.handlerEvent)
}
eventloop.Run 是由 poll 实现的
// Poll 启动 epoll wait 循环
func (ep *Poller) Poll(handler func(fd int, event Event)) {
defer func() {
close(ep.waitDone)
}()
events := make([]unix.EpollEvent, waitEventsBegin)
var (
wake bool
msec int
)
ep.running.Set(true)
for {
// wait 等待事件并读取
n, err := unix.EpollWait(ep.fd, events, msec)
if err != nil && err != unix.EINTR {
log.Error("EpollWait: ", err)
continue
}
if n <= 0 {
msec = -1
runtime.Gosched()
continue
}
msec = 0
// 遍历读取的事件
for i := 0; i < n; i++ {
fd := int(events[i].Fd)
if fd != ep.eventFd {
var rEvents Event
if ((events[i].Events & unix.POLLHUP) != 0) && ((events[i].Events & unix.POLLIN) == 0) {
rEvents |= EventErr
}
if (events[i].Events&unix.EPOLLERR != 0) || (events[i].Events&unix.EPOLLOUT != 0) {
rEvents |= EventWrite
}
if events[i].Events&(unix.EPOLLIN|unix.EPOLLPRI|unix.EPOLLRDHUP) != 0 {
rEvents |= EventRead
}
// 回调函数处理事件
handler(fd, rEvents)
} else {
ep.wakeHandlerRead()
wake = true
}
}
if wake {
handler(-1, 0)
wake = false
if !ep.running.Get() {
return
}
}
if n == len(events) {
events = make([]unix.EpollEvent, n*2)
}
}
}
poll 会从 EpollWait 中等待读/写事件, 并交给回调函数处理
我们看看 listener.Run 的回调函数 handleNewConnection
func (s *Server) handleNewConnection(fd int, sa unix.Sockaddr) {
// 通过负载均衡找到一个 worker
loop := s.opts.Strategy(s.workLoops)
// 创建连接
c := NewConnection(fd, loop, sa, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback)
// 添加一个事件, 将连接放到 worker 里面
loop.QueueInLoop(func() {
s.callback.OnConnect(c)
if err := loop.AddSocketAndEnableRead(fd, c); err != nil {
log.Error("[AddSocketAndEnableRead]", err)
}
})
}
我们看看 workLoops[i].Run 的回调函数 handleLoop
// workLoops[i].Run 的回调函数
func (l *EventLoop) handlerEvent(fd int, events poller.Event) {
if fd != -1 {
// 找到 fd 对应的 socket 调用 socket.HandleEvent
s, ok := l.sockets[fd]
if ok {
s.HandleEvent(fd, events)
}
} else {
l.needWake.Set(true)
l.doPendingFunc()
}
}
EventLoop.handleLoop 回调的是 Socket.HandleEvent
Socket 是一个接口, Connection 是 Socket 的实现类
我们看看 Collection.HandleEvent
// HandleEvent 内部使用,event loop 回调
func (c *Connection) HandleEvent(fd int, events poller.Event) {
// ...
// 处理错误事件, 关闭连接
if events&poller.EventErr != 0 {
c.handleClose(fd)
return
}
// 处理写事件
if !c.outBuffer.IsEmpty() {
if events&poller.EventWrite != 0 {
// if return true, it means closed
if c.handleWrite(fd) {
return
}
// ...
}
} else if events&poller.EventRead != 0 {
// 处理读事件
if c.handleRead(fd) {
return
}
// ...
}
// ...
}
我们看看处理读事件 c.handleRead(fd)
// 处理读事件
func (c *Connection) handleRead(fd int) (closed bool) {
// TODO 避免这次内存拷贝
// 将数据读取到 buf
buf := c.loop.PacketBuf()
n, err := unix.Read(c.fd, buf)
// ...
// read buffer 读完了
if c.inBuffer.IsEmpty() {
// 将 buf 中的数据写入 buffer
c.buffer.WithData(buf[:n])
buf = buf[n:n]
// 协议解码 UnPacket -> 协议处理 OnMessage -> 返回的数据编码 Packet
c.handlerProtocol(&buf, c.buffer)
if !c.buffer.IsEmpty() {
first, _ := c.buffer.PeekAll()
_, _ = c.inBuffer.Write(first)
}
} else { // read buffer 还有数据
// 将 buf 中的数据写入 read buffer
_, _ = c.inBuffer.Write(buf[:n])
buf = buf[:0]
// 协议解码 UnPacket -> 协议处理 OnMessage -> 返回的数据编码 Packet
c.handlerProtocol(&buf, c.inBuffer)
}
// 将返回的数据发送写给客户端 fd
if len(buf) != 0 {
closed = c.sendInLoop(buf)
}
return
}
我们看看 handlerProtocol 怎么做的
// 协议解码 UnPacket -> 协议处理 OnMessage -> 返回的数据编码 Packet
// @param tmpBuffer 临时 buffer, 用于存储返回的数据
// @param buffer 已经读取好的 buffer
func (c *Connection) handlerProtocol(tmpBuffer *[]byte, buffer *ringbuffer.RingBuffer) {
ctx, receivedData := c.protocol.UnPacket(c, buffer)
for ctx != nil || len(receivedData) != 0 {
sendData := c.callBack.OnMessage(c, ctx, receivedData)
if sendData != nil {
*tmpBuffer = append(*tmpBuffer, c.protocol.Packet(c, sendData)...)
}
// 如果有多个数据包,继续解析处理
ctx, receivedData = c.protocol.UnPacket(c, buffer)
}
}
OnMessage 就是我们为了实现 Handler 接口定义的回调函数, 例如
type example struct {
Count atomic.Int64
}
func (s *example) OnConnect(c *gev.Connection) {
s.Count.Add(1)
//log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *gev.Connection, ctx interface{}, data []byte) (out interface{}) {
//log.Println("OnMessage")
out = data
return
}
func (s *example) OnClose(c *gev.Connection) {
s.Count.Add(-1)
//log.Println("OnClose")
}
我们看看 OnMessage 处理完后返回的数据怎么返回给客户端的 c.sendInLoop(buf)
buf 是 OnMessage 处理完后返回的数据, 并且 Packet 之后的数据
// 将返回的数据放到 outBuffer 中
func (c *Connection) sendInLoop(data []byte) (closed bool) {
// 如果 outBuffer 为空, 直接将数据放到 outBuffer 中
if !c.outBuffer.IsEmpty() {
_, _ = c.outBuffer.Write(data)
} else {
// 如果 outBuffer 不为空, 尝试写入数据
n, err := unix.Write(c.fd, data)
if err != nil && err != unix.EAGAIN {
c.handleClose(c.fd)
closed = true
return
}
// 把没写完的数据放到 outBuffer 中
if n <= 0 {
_, _ = c.outBuffer.Write(data)
} else if n < len(data) {
_, _ = c.outBuffer.Write(data[n:])
}
// 如果 outBuffer 不为空, 则注册写事件
if !c.outBuffer.IsEmpty() {
_ = c.loop.EnableReadWrite(c.fd)
}
}
return
}
gev 处理请求后, 会将返回的数据储存到 outBuffer 里面 (如果 outBuffer 为空会先尝试直接将数据发送到网卡), 然后注册写事件到 poller (linux 使用 epoll 实现), 循环 epoll 事件的时候, 如果有写事件, 就会将 outBuffer 中的数据发送到网卡
快速开始
参考 GitHub 库中的快速开始和 example, 文档里面已经写的很好了, 这里就不再赘述了
贴一个 echo 的快速开始
package main
import (
"flag"
"net/http"
_ "net/http/pprof"
"strconv"
"time"
"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/log"
"github.com/Allenxuxu/toolkit/sync/atomic"
)
type example struct {
Count atomic.Int64
}
func (s *example) OnConnect(c *gev.Connection) {
s.Count.Add(1)
//log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *gev.Connection, ctx interface{}, data []byte) (out interface{}) {
//log.Println("OnMessage")
out = data
return
}
func (s *example) OnClose(c *gev.Connection) {
s.Count.Add(-1)
//log.Println("OnClose")
}
func main() {
go func() {
if err := http.ListenAndServe(":6060", nil); err != nil {
panic(err)
}
}()
handler := new(example)
var port int
var loops int
flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()
s, err := gev.NewServer(handler,
gev.Network("tcp"),
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops),
gev.MetricsServer("", ":9091"),
)
if err != nil {
panic(err)
}
s.RunEvery(time.Second*2, func() {
log.Info("connections :", handler.Count.Get())
})
s.Start()
}