fasthttp 的 goroutine pool 实现探究

时间:2022-02-15 09:49:22

引言

fasthttp是一个非常优秀的web server框架,号称比官方的net/http快10倍以上。fasthttp用了很多黑魔法。俗话说,源码面前,了无秘密,我们今天通过源码来看一看她的goroutine pool的实现。

热身

fasthttp写server和原生的net/http写法上基本没有区别,这里就不举例子。直接找到入口函数,在根目录下的server.go文件中,我们从函数ListenAndServe()跟踪进去。从端口监听到处理请求的函数调用链如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func ListenAndServe(addr string, handler RequestHandler) error {
    s := &Server{
        Handler: handler,
    }
    return s.ListenAndServe(addr)
}
 
// ListenAndServe serves HTTP requests from the given TCP addr.
func (s *Server) ListenAndServe(addr string) error {
    ln, err := net.Listen("tcp", addr)
    if err != nil {
        return err
    }
    return s.Serve(ln)
}
 
// Serve blocks until the given listener returns permanent error.
func (s *Server) Serve(ln net.Listener) error {
    ...
    wp := &workerPool{
        WorkerFunc:      s.serveConn,
        MaxWorkersCount: maxWorkersCount,
        LogAllErrors:    s.LogAllErrors,
        Logger:          s.logger(),
    }
    wp.Start()  //启动worker pool
 
    for {
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
            wp.Stop()
            if err == io.EOF {
                return nil
            }
            return err
        }
        if !wp.Serve(c) {
            s.writeFastError(c, StatusServiceUnavailable,
                "The connection cannot be served because Server.Concurrency limit exceeded")
            c.Close()
            if time.Since(lastOverflowErrorTime) > time.Minute {
                s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+
                    "Try increasing Server.Concurrency", maxWorkersCount)
                lastOverflowErrorTime = time.Now()
            }
            time.Sleep(100 * time.Millisecond)
        }
        c = nil
    }
}

上面代码中workerPool就是一个线程池。相关代码在server.go文件的同级目录下的workerpool.go文件中。我们从上面代码涉及到的往下看。首先是workerPool struct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type workerPool struct {
    WorkerFunc func(c net.Conn) error
    MaxWorkersCount int
    LogAllErrors bool
    MaxIdleWorkerDuration time.Duration
    Logger Logger
    lock         sync.Mutex
    workersCount int
    mustStop     bool
    ready []*workerChan
 
    stopCh chan struct{}
    workerChanPool sync.Pool
}
 
type workerChan struct {
    lastUseTime time.Time
    ch          chan net.Conn
}

workerPool sturct中的WorkerFunc是conn的处理函数,类似net/http包中的ServeHTTP。因为所有conn的处理都是一样的,所以WorkerFunc不需要和传入的每个conn绑定,整个worker pool共用一个。workerChanPool是sync.Pool对象池。

MaxIdleWorkerDuration是worker空闲的最长时间,超过就将worker关闭。workersCount是worker的数量。ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。wp.Start()启动worker pool。wp.Stop()是出错处理。wp.Serve(c)是对conn进行处理的函数。我们先看一下wp.Start()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (wp *workerPool) Start() {
    if wp.stopCh != nil {
        panic("BUG: workerPool already started")
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    go func() {
        var scratch []*workerChan
        for {
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}
 
func (wp *workerPool) Stop() {
    ...
    close(wp.stopCh)
    wp.stopCh = nil
 
    wp.lock.Lock()
    ready := wp.ready
    for i, ch := range ready {
        ch.ch <- nil
        ready[i] = nil
    }
    wp.ready = ready[:0]
    wp.mustStop = true
    wp.lock.Unlock()
}

简单来说,wp.Start()启动了一个goroutine,负责定期清理worker pool中过期worker(过期=未使用时间超过MaxIdleWorkerDuration)。清理操作都在wp.clean()函数中完成,这里就不继续往下看了。stopCh是一个标示worker pool停止的chan。上面的for-select-stop是很常用的方式。wp.Stop()负责停止worker pool的处理工作,包括关闭stopCh,清理闲置的worker列表(这时候还有一部分worker在处理conn,待其处理完成通过判断wp.mustStop来停止)。这里需要注意的一点是做资源清理的时候,对于channel需要置nil。下面看看最重要的函数wp.Serve()

核心

下面是wp.Serve()函数的调用链。wp.Serve()负责处理来自client的每一条连接。其中比较关键的函数是wp.getCh(),她从worker pool的可用空闲worker列表尾部取出一个可用的worker。这里有几个逻辑需要注意的是:1.如果没有可用的worker(比如处理第一个conn是,worker pool还是空的)则新建;2.如果worker达到上限,则直接不处理(这个地方感觉略粗糙啊!)。go func()那几行代码就是新建worker,我们放到下面说。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (wp *workerPool) Serve(c net.Conn) bool {
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    ch.ch <- c
    return true
}
 
func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false
 
    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        ch = ready[n]
        ready[n] = nil
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()
 
    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get()
        if vch == nil {
            vch = &workerChan{
                ch: make(chan net.Conn, workerChanCap),
            }
        }
        ch = vch.(*workerChan)
        go func() {
            wp.workerFunc(ch)
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

workerFunc()函数定义如下(去掉了很多不影响主线的逻辑),结合上一篇《如何裸写一个goroutine pool》,还是熟悉的配方,熟悉的味道。这里要看的wp.release()是干啥的。因为前面的wp.Serve()函数只处理一个conn,所以for循环执行一次我们就可以把worker放到空闲队列中去等待下一次conn过来,从代码中可以看出来放回果然是放到空闲队列的末尾(可算和上面呼应上了)。还有上面提到的mustStop,如果worker pool停止了,mustStop就为true,那么workerFunc就要跳出循环,也就是goroutine结束了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn
 
    var err error
    for c = range ch.ch {
        if c == nil {
            break
        }
 
        ...
        c = nil
 
        if !wp.release(ch) {
            break
        }
    }
 
    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}
 
func (wp *workerPool) release(ch *workerChan) bool {
    ch.lastUseTime = time.Now()
    wp.lock.Lock()
    if wp.mustStop {
        wp.lock.Unlock()
        return false
    }
    wp.ready = append(wp.ready, ch)
    wp.lock.Unlock()
    return true
}

总结

除了fasthttp,我还看了github上其他开源且star数在100以上的goroutine pool的实现,基本核心原理都在我上一篇文章中说的那些。fasthttp的实现多了一层goroutine回收机制,不得不说确实挺巧妙。fasthttp性能这么好一定是有其原因的,源码之后再慢慢读。

fasthttp 的 goroutine pool 实现探究的更多相关文章

  1. 如果裸写一个goroutine pool

    引言 在上文中,我说到golang的原生http server处理client的connection的时候,每个connection起一个goroutine,这是一个相当粗暴的方法.为了感受更深一点, ...

  2. goroutine pool,WaitGroup,chan 示例

    服务端高并发编程经常需要写很多goroutine来服务每一个连接,如何正确使用goroutine池是又拍云的工程师们需要考虑的问题,今天这篇文章,分享给同样需要使用go语言的小伙伴们. 文/陶克路 本 ...

  3. 通过 Channel 实现 Goroutine Pool

    最近用到了 Go 从 Excel 导数据到服务器内部 用的是 http 请求 但是发现一个问题 从文件读取之后 新开 Goroutine 会无限制新增 导致全部卡在初始化请求 于是乎就卡死了 问题模拟 ...

  4. Goroutine并发调度模型深度解析之手撸一个协程池

    golanggoroutine协程池Groutine Pool高并发 并发(并行),一直以来都是一个编程语言里的核心主题之一,也是被开发者关注最多的话题:Go语言作为一个出道以来就自带 『高并发』光环 ...

  5. go中控制goroutine数量

    控制goroutine数量 前言 控制goroutine的数量 通过channel+sync 使用semaphore 线程池 几个开源的线程池的设计 fasthttp中的协程池实现 Start Sto ...

  6. Golang(九)简单 Goroutine 池实现

    0. 前言 最近使用 Golang 写一个并发执行的测试脚本 之前习惯使用 Java,习惯性想先建一个线程池.然后意识到 Golang 没有封装好的线程池 结合之前学习的 Goroutine 原理和 ...

  7. Awesome Go

    A curated list of awesome Go frameworks, libraries and software. Inspired by awesome-python. Contrib ...

  8. golang协程池设计

    Why Pool go自从出生就身带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在g ...

  9. Go 语言相关的优秀框架,库及软件列表

    If you see a package or project here that is no longer maintained or is not a good fit, please submi ...

随机推荐

  1. 2014年年度工作总结--IT狂人实录

    2014年也是我人生最重要的一年,她见证了我的成长与蜕变,让我从一个迷茫的旅者踏上一条柳暗花明的路. 春宇之行 从春宇短暂的9个月,却经历常人难以想想的风风雨雨,首先要感谢春宇公司给我带来了安逸宽松的 ...

  2. Android 自定义view (一)&mdash&semi;&mdash&semi;attr 理解

    前言: 自定义view是android自定义控件的核心之一,那么在学习自定义view之前,我们先来了解下自定义view的自定义属性的attr的用法吧 Android attr 是什么 (1)attr ...

  3. SOSO街景地图 API (Javascript)开发教程(1)- 街景

    SOSO街景地图 Javascript API 干什么用的? 你想在网页里嵌入个地图,就需要它了! 另外,它还支持:地点搜索.周边/附近查询.地图标注.公交/驾车路线规划.地理坐标与地址相互转换.地理 ...

  4. sql server CTE递归使用测试

    --CTE递归查询 if(object_id(N'menu') > 0) drop table menu CREATE TABLE MENU ( name nvarchar(50) NOT NU ...

  5. 从一个故障说说Java的三个BlockingQueue

    原文地址:http://hellojava.info/?p=464 最近出了个故障,排查的时候耗费了很长的时间,回顾整个排查过程,经验主义在这里起了不好的作用,直接导致了整个故障排查的时间非常长,这个 ...

  6. SDL2源码分析5:更新纹理(SDL&lowbar;UpdateTexture&lpar;&rpar;)

    ===================================================== SDL源码分析系列文章列表: SDL2源码分析1:初始化(SDL_Init()) SDL2源 ...

  7. oracle 解析json格式

    1. CREATE OR REPLACE PACKAGE PKG_ANALYSE_COMMON IS -- AUTHOR : YZCHEN -- CREATED : 2013/11/26 14:12: ...

  8. GTID 跳过事物

    Mysql5.7 stop slave; set @@SESSION.GTID_NEXT='507e80e9-3648-11e9-aa70-fa163e77a52d:20173'; begin; co ...

  9. Aizu2224 Save your cats(最大生成树)

    https://vjudge.net/problem/Aizu-2224 场景嵌入得很好,如果不是再最小生成树专题里,我可能就想不到解法了. 对所有的边(栅栏)求最大生成树,剩下来的长度即解(也就是需 ...

  10. zookeeper简单实战

    一.安装(单机模式.集群模式.伪集群模式) 1:安装JDK 2:解压zk压缩包 3:在conf目录下创建zoo.cfg配置文件. 设置超时时间,快照目录,事务日志文件目录,对外端口,服务IP 4:启动 ...