Go 分布式令牌桶限流 + 兜底保障

时间:2021-09-05 02:38:22

Go 分布式令牌桶限流 + 兜底保障

上篇文章提到固定时间窗口限流无法处理突然请求洪峰情况,本文讲述的令牌桶线路算法则可以比较好的处理此场景。

工作原理

单位时间按照一定速率匀速的生产 token 放入桶内,直到达到桶容量上限。

处理请求,每次尝试获取一个或多个令牌,如果拿到则处理请求,失败则拒绝请求。

Go 分布式令牌桶限流 + 兜底保障

优缺点

优点

可以有效处理瞬间的突发流量,桶内存量 token 即可作为流量缓冲区平滑处理突发流量。

缺点

实现较为复杂。

代码实现

  1. core/limit/tokenlimit.go

分布式环境下考虑使用 redis 作为桶和令牌的存储容器,采用 lua 脚本实现整个算法流程。

redis lua 脚本

  1. -- 每秒生成token数量即token生成速度
  2. local rate = tonumber(ARGV[1])
  3. -- 桶容量
  4. local capacity = tonumber(ARGV[2])
  5. -- 当前时间戳
  6. local now = tonumber(ARGV[3])
  7. -- 当前请求token数量
  8. local requested = tonumber(ARGV[4])
  9. -- 需要多少秒才能填满桶
  10. local fill_time = capacity/rate
  11. -- 向下取整,ttl为填满时间的2倍
  12. local ttl = math.floor(fill_time*2)
  13. -- 当前时间桶容量
  14. local last_tokens = tonumber(redis.call("get", KEYS[1]))
  15. -- 如果当前桶容量为0,说明是第一次进入,则默认容量为桶的最大容量
  16. if last_tokens == nil then
  17. last_tokens = capacity
  18. end
  19. -- 上一次刷新的时间
  20. local last_refreshed = tonumber(redis.call("get", KEYS[2]))
  21. -- 第一次进入则设置刷新时间为0
  22. if last_refreshed == nil then
  23. last_refreshed = 0
  24. end
  25. -- 距离上次请求的时间跨度
  26. local delta = math.max(0, now-last_refreshed)
  27. -- 距离上次请求的时间跨度,总共能生产token的数量,如果超多最大容量则丢弃多余的token
  28. local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
  29. -- 本次请求token数量是否足够
  30. local allowed = filled_tokens >= requested
  31. -- 桶剩余数量
  32. local new_tokens = filled_tokens
  33. -- 允许本次token申请,计算剩余数量
  34. if allowed then
  35. new_tokens = filled_tokens - requested
  36. end
  37. -- 设置剩余token数量
  38. redis.call("setex", KEYS[1], ttl, new_tokens)
  39. -- 设置刷新时间
  40. redis.call("setex", KEYS[2], ttl, now)
  41. return allowed

令牌桶限流器定义

  1. type TokenLimiter struct {
  2. // 每秒生产速率
  3. rate int
  4. // 桶容量
  5. burst int
  6. // 存储容器
  7. store *redis.Redis
  8. // redis key
  9. tokenKey string
  10. // 桶刷新时间key
  11. timestampKey string
  12. // lock
  13. rescueLock sync.Mutex
  14. // redis健康标识
  15. redisAlive uint32
  16. // redis故障时采用进程内 令牌桶限流器
  17. rescueLimiter *xrate.Limiter
  18. // redis监控探测任务标识
  19. monitorStarted bool
  20. }
  21. func NewTokenLimiter(rate, burst int, store *redis.Redis, key string) *TokenLimiter {
  22. tokenKey := fmt.Sprintf(tokenFormat, key)
  23. timestampKey := fmt.Sprintf(timestampFormat, key)
  24. return &TokenLimiter{
  25. rate: rate,
  26. burst: burst,
  27. store: store,
  28. tokenKey: tokenKey,
  29. timestampKey: timestampKey,
  30. redisAlive: 1,
  31. rescueLimiter: xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)), burst),
  32. }
  33. }

获取令牌

Go 分布式令牌桶限流 + 兜底保障

  1. func (lim *TokenLimiter) reserveN(now time.Time, n int) bool {
  2. // 判断redis是否健康
  3. // redis故障时采用进程内限流器
  4. // 兜底保障
  5. if atomic.LoadUint32(&lim.redisAlive) == 0 {
  6. return lim.rescueLimiter.AllowN(now, n)
  7. }
  8. // 执行脚本获取令牌
  9. resp, err := lim.store.Eval(
  10. script,
  11. []string{
  12. lim.tokenKey,
  13. lim.timestampKey,
  14. },
  15. []string{
  16. strconv.Itoa(lim.rate),
  17. strconv.Itoa(lim.burst),
  18. strconv.FormatInt(now.Unix(), 10),
  19. strconv.Itoa(n),
  20. })
  21. // redis allowed == false
  22. // Lua boolean false -> r Nil bulk reply
  23. // 特殊处理key不存在的情况
  24. if err == redis.Nil {
  25. return false
  26. } else if err != nil {
  27. logx.Errorf("fail to use rate limiter: %s, use in-process limiter for rescue", err)
  28. // 执行异常,开启redis健康探测任务
  29. // 同时采用进程内限流器作为兜底
  30. lim.startMonitor()
  31. return lim.rescueLimiter.AllowN(now, n)
  32. }
  33. code, ok := resp.(int64)
  34. if !ok {
  35. logx.Errorf("fail to eval redis script: %v, use in-process limiter for rescue", resp)
  36. lim.startMonitor()
  37. return lim.rescueLimiter.AllowN(now, n)
  38. }
  39. // redis allowed == true
  40. // Lua boolean true -> r integer reply with value of 1
  41. return code == 1
  42. }

redis 故障时兜底策略

兜底策略的设计考虑得非常细节,当 redis 不可用的时候,启动单机版的 ratelimit 做备用限流,确保基本的限流可用,服务不会被冲垮。

  1. // 开启redis健康探测
  2. func (lim *TokenLimiter) startMonitor() {
  3. lim.rescueLock.Lock()
  4. defer lim.rescueLock.Unlock()
  5. // 防止重复开启
  6. if lim.monitorStarted {
  7. return
  8. }
  9. // 设置任务和健康标识
  10. lim.monitorStarted = true
  11. atomic.StoreUint32(&lim.redisAlive, 0)
  12. // 健康探测
  13. go lim.waitForRedis()
  14. }
  15. // redis健康探测定时任务
  16. func (lim *TokenLimiter) waitForRedis() {
  17. ticker := time.NewTicker(pingInterval)
  18. // 健康探测成功时回调此函数
  19. defer func() {
  20. ticker.Stop()
  21. lim.rescueLock.Lock()
  22. lim.monitorStarted = false
  23. lim.rescueLock.Unlock()
  24. }()
  25. for range ticker.C {
  26. // ping属于redis内置健康探测命令
  27. if lim.store.Ping() {
  28. // 健康探测成功,设置健康标识
  29. atomic.StoreUint32(&lim.redisAlive, 1)
  30. return
  31. }
  32. }
  33. }

项目地址

https://github.com/zeromicro/go-zero

欢迎使用 go-zero 并 star 支持我们!

原文链接:https://mp.weixin.qq.com/s/ulGRw4qkWbGKdF83VaIb7A