proxy.go 源码阅读

时间:2020-12-30 07:41:09
package main

import (
    "net"
    "time"
)

func initProxy() {

    pLog.Infof("Proxying %s -> %s\n", pConfig.Bind, pConfig.Backend)  //输出服务地址   后端服务地址列表  

    server, err := net.Listen("tcp", pConfig.Bind)  //建立tcp连接
    if err != nil {
        pLog.Fatal(err)
    }

    waitQueue := make(chan net.Conn, pConfig.WaitQueueLen)  //建立连接队列  队列默认等待队列长度---channel  net.Conn  ==控制最大排队队列
    availPools := make(chan bool, pConfig.MaxConn)  //建立最大连接数--channel  bool=== 控制最大连接数
    for i := 0; i < pConfig.MaxConn; i++ {  //
        availPools <- true
    }

    go loop(waitQueue, availPools)

    for {
        connection, err := server.Accept()  //等待获取下一次连接
        if err != nil {
            pLog.Error(err)
        } else {
            pLog.Infof("Received connection from %s.\n", connection.RemoteAddr())
            waitQueue <- connection
        }
    }
}

func loop(waitQueue chan net.Conn, availPools chan bool) {
    for connection := range waitQueue { //循环等待队列中 排队等待需要处理的数据--连接
        <-availPools  //从通道中 连接池中取出一个
        go func(connection net.Conn) {
            handleConnection(connection)
            availPools <- true  //使用结束放回连接池中  目的控制连接数量  通道阻塞特性
            pLog.Infof("Closed connection from %s.\n", connection.RemoteAddr())
        }(connection)
    }
}

func handleConnection(connection net.Conn) {
    defer connection.Close()

    bksvr, ok := getBackendSvr(connection)
    if !ok {
        return
    }
    remote, err := net.Dial("tcp", bksvr.svrStr)

    if err != nil {
        pLog.Error(err)
        bksvr.failTimes++
        return
    }

    //等待双向连接完成
    complete := make(chan bool, 2)
    oneSide := make(chan bool, 1)
    otherSide := make(chan bool, 1)
    go pass(connection, remote, complete, oneSide, otherSide)
    go pass(remote, connection, complete, otherSide, oneSide)
    <-complete
    <-complete
    remote.Close()
}

// copy Content two-way
func pass(from net.Conn, to net.Conn, complete chan bool, oneSide chan bool, otherSide chan bool) {
    var err error
    var read int
    bytes := make([]byte, 256)

    for {
        select {

        case <-otherSide:
            complete <- true
            return

        default:

            from.SetReadDeadline(time.Now().Add(time.Duration(pConfig.Timeout) * time.Second))
            read, err = from.Read(bytes)
            if err != nil {
                complete <- true
                oneSide <- true
                return
            }

            to.SetWriteDeadline(time.Now().Add(time.Duration(pConfig.Timeout) * time.Second))
            _, err = to.Write(bytes[:read])
            if err != nil {
                complete <- true
                oneSide <- true
                return
            }
        }
    }
}