1、循环使用缓存
每条日志需要开辟缓存块来存储内容,以减少频繁的内存分配与回收。日志结构体定义如下:
type MLogger struct { // freeList is a list of byte buffers, maintained under freeListMu. freeList *buffer // freeListMu maintains the free list. It is separate from the main mutex // so buffers can be grabbed and printed to without holding the main lock, // for better parallelization. freeListMu sync.Mutex // ... }
已经开辟出的多个内存空间形成一个单链表,其中的freeList指向这个链表的头部。由于有多个go协程同时要操作这个单链表,如打印完日志后回收缓存,或者要求一个缓存块来存储日志内容。
首先看一下缓存的回收方法,如下::
// putBuffer returns a buffer to the free list. func (l *MLogger) putBuffer(b *buffer) { if b.Len() >= 1500 { fmt.Println("buffer:%d\n",b.Len()) return // Let big buffers die a natural death(自然死). } l.freeListMu.Lock() // 上锁 b.next = l.freeList // 为下一个可用的buffer设置值 l.freeList = b // 为当前可用的freeList设置buffer l.freeListMu.Unlock() // 解锁 }
当开辟的缓存块过大时不进行重复利用,以释放这些内存空间。
最主要的操作就是将不需要的缓存块插入到单链表的头部,然后让freeList指针指向新插入的缓存块即可。
获取缓存块:
// getBuffer returns a new, ready-to-use buffer. (获取一个新的,可使用的缓存) func (l *MLogger) getBuffer() *buffer { l.freeListMu.Lock() // 上锁 b := l.freeList if b != nil { l.freeList = b.next } l.freeListMu.Unlock() // 解锁 if b == nil { b = new(buffer) } else { b.next = nil b.Reset() } return b }
在获取缓存块时需要优先考虑freeList中可用的缓存块,如果有就从链表头部取一个块返回(注意:必须调用Reset()方法,因为这个块中还缓存有上一次日志的信息),否则就创建一个新的缓存块返回。
2、创意检索算法
由于广告创意有多个定向条件,而Redis中缓存的数据结构为定向条件到创意集合的映射,如:
s_area_%d // 地域编码,如861100表示北京 s_network_%d // 网络定向,1表示2G、2表示3G、3表示4G、4表示WIFI s_os_%d // 1为ios,2为android
假设要检索出地域编码为861100、操作系统定向为ios,网络定向为WIFI的创意,而Redis中已经存在的数据集如下:
s_area_861100 = {1,4,6,7} s_network_1 = {4,5,6} s_os_1 = {1,5,6}
当检索创意时,新创建一个map,其key为创意ID,而值为创意出现的次数,如ID为4的创意在s_area_861100、s_network_1与s_os_1中出现了3次,则map为:
map[4] = 3
统计所有的创意出现的次数:
map[1] = 2
map[4] = 2
map[5] = 2
map[6] = 3
map[7] = 1
迭代这个map,检索出创意ID出现3次的所有创意,这些创意就是符合地域编码为861100、操作系统定向为ios,网络定向为WIFI的创意集合。
3、避免缓存穿透
为了在并发高的环境下减少IO次数,可以在本地缓存中缓存一些数据,当缓存一段时长时,重新从其它的数据源拉取最新的数据,这样减少IO的同时也能保证数据的时效性。但是当并发很高时,缓存在本地的数据在某个时刻过期,这里就会有多个请求同时做数据的拉取更新操作。如果数据源为MySQL数据库时,则无法承担很高的并发。
所以在数据过期时,需要控制只有一个请求做更新操作,其它请求等待或者直接返回,这取决于具体的业务
func (c *PowerCache) GetWithValueLoader(ctx context.Context, key string, valueLoader ValueLoader) (interface{}, error) { v, err := c.GetIfPresent(key) // 获取本地缓存中的数据 if err == nil { // 表示从本地缓存中取出了不过期的数据,直接返回即可 return v, err } // 代码点1 if atomic.CompareAndSwapInt32(&c.loadFlag, 0,1) { // 进行数据更新 v, err := c.GetIfPresent(key) if err == nil { c.loadFlag = 0 return v, err } // 从数据源拉取最新代码同时也会更新本地缓存数据 temp,tempErr :=c.loadWithValueLoader(ctx,key, valueLoader) // 代码点2 c.loadFlag = 1 // 代码点3 return temp,tempErr }else { time.Sleep(time.Millisecond * time.Duration(5)) v, err := c.GetIfPresent(key) if err == nil { return v, err } } return nil,errors.New("Data Empty!") // 等待5毫秒后仍然没有从本地缓存中获取到最新的数据 }
其中涉及到一个原子API CompareAndSwapInt32,对全局的loadFlag原子性的进行比较交换。也就是loadFlag与0比较,如果相等则将loadFlag置为1。由于是原子性操作,所以只会有一个go协程调用方法后返回true。
进入后还要再一次从本地缓存中取一次数据,因为这个go协程可能在代码点1处,而另外一个更新数据的go协程已经走到了代码点3,将loadFlag设置为1。这时候这个协程直接调用GetIfPresent()方法获取数据返回即可。
代码点2在代码点3之后,避免由于本地缓存数据还未更新时,loadFlag置为1,导致更多的协程进入这个循环体内。
不更新数据的协程直接等待5毫秒后再次从本地缓存获取数据,如果取到直接返回,否则返回空。
5、流量请求分发
首先需要理解双管道类型:
func main() { requestChan := make(chan chan string) go GetResponse(requestChan) go SendRequest(requestChan) time.Sleep(time.Second) // 防止主协程过早退出,子协程跟着退出 } func GetResponse(requestChan chan chan string) { // 创建一个接收信息的小管道 responseChan := make(chan string) // 将这个小管道通过公有的大管理传递给其它协程 requestChan <- responseChan // 从小管道中接收信息 response := <-responseChan fmt.Printf("Response: %v\n", response) } func SendRequest(requestChan chan chan string) { // 从大管道中取出小管道 responseChan := <-requestChan // 往小管道中写入内容 responseChan <- "helloworld!" }
可以将chan chan 理解为大管道嵌套的小管道。如上例子打印内容如下:
Response: helloworld!
可以使用管道特性来控制go协程,因为协程在获取管道数据时,如果获取不到数据将阻塞等待。利用这个特性结合双管道可以模拟一个协程池,类似于线程池。这样就可以重复利用协程及管道来减少创建协程带来的性能损耗了。
先来参阅一篇文章,如下:
http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/
其基本的架构如下图:
多个Request将任务放入了管道中,Dispatcher从管道中取出任务并从WorkerPool的大管道中取出一个Worker对象,将任务放入worker对象的管道JobChannel中。有几点值得注意:
(1)在WorkerPool中取出的Worker对象,这个Worker对象的JobChannel管道后监听的go协程一定是阻塞等待的,也就是说go协程没有其它的任务在执行。
(2)Dispatcher从大管道中取Worker对象,当这个Worker对象中的Jobchannel被协程处理完后,这个协程负责再次将这个Worker对象放入WorkerPool中。
AdExchange由于对请求处理的时间要求比较苛刻,并且要对多个返回结果进行筛选,所以架构与上面有所不同。
这个架构没有JobQueue这个队列,而Dispatcher也不是单个协程,而只是一个Dispatcher函数。所有的请求来时,由于每个请求都由一个go协程来处理,所以让这些go协程尽可能的多做与自身相关的任务,如将自己的Job分发出去,减少由单个Dispatcher分发带来的风险,提升分发效率。
初始化一个指定大小的线程池,代码如下:
func init(){ var MaxWorker = 200 WorkerPool = make(chan chan Job, MaxWorker) for i := 0; i < MaxWorker; i++ { worker := NewWorker(i+1, WorkerPool) worker.Start() } }
调用了worker对象的Start()函数,这个函数的实现如下:
func (w Worker) Start() { go func() { for { // 处理任务的协程负责将w.JobChannel重新放回大管道内 w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // 处理具体的job任务 case <-w.QuitChan: return // we have received a signal to stop } } }() }
Start()启动了一个协程,将Worker对象的JobChannel放入大管道内,表示这个协程没有在做任务,而是在监听JobChannel。
当Dispatcher从大管道内取出一个w.JobChannel时,往其中放入Job对象,这样上面的协程就能接收到任务,处理具体的Job任务了,处理完成后再将这个JobChannel放回大管道,供Dispatcher继续获取分派任务。
如Dispatcher分派任务,如下:
for _, val := range jobs { select { case worker := <-WorkerPool: worker <- val case <-time.After(time.Millisecond * time.Duration(5)): return nil, errors.New("WorkerPool Blocking!") } }
从WorkerPool中取可用的JobChannel,如果取不到表示线程池中所有的协程都在做任务,已经没有空闲的协程了。
这样做的好处有:
(1)分离业务代码,如上代码分离了请求的分发、结果的筛选与请求的处理业务
(2)高效的并发模型 实现了线程安全,同时可以在每个协程内缓存数据块,无锁操作map等等