一篇带给你 Sentinel 和常用流控算法

时间:2022-04-06 06:11:34

一篇带给你 Sentinel 和常用流控算法

本文主要讲述常见的几种限流算法:计数器算法、漏桶算法、令牌桶算法。然后结合我对 Sentinel 1.8.0 的理解,给大家分享 Sentinel 在源码中如何使用这些算法进行流控判断。

计数器限流算法

我们可以直接通过一个计数器,限制每一秒钟能够接收的请求数。比如说 qps定为 1000,那么实现思路就是从第一个请求进来开始计时,在接下去的 1s 内,每来一个请求,就把计数加 1,如果累加的数字达到了 1000,那么后续的请求就会被全部拒绝。等到 1s 结束后,把计数恢复成 0 ,重新开始计数。

一篇带给你 Sentinel 和常用流控算法

优点:实现简单

缺点:如果1s 内的前半秒,已经通过了 1000 个请求,那后面的半秒只能请求拒绝,我们把这种现象称为“突刺现象”。

实现代码案例:

  1. public class Counter { 
  2.     public long timeStamp = getNowTime(); 
  3.     public int reqCount = 0; 
  4.     public final int limit = 100; // 时间窗口内最大请求数 
  5.     public final long interval = 1000; // 时间窗口ms 
  6.  
  7.     public boolean limit() { 
  8.         long now = getNowTime(); 
  9.         if (now < timeStamp + interval) { 
  10.             // 在时间窗口内 
  11.             reqCount++; 
  12.             // 判断当前时间窗口内是否超过最大请求控制数 
  13.             return reqCount <= limit; 
  14.         } else { 
  15.             timeStamp = now; 
  16.             // 超时后重置 
  17.             reqCount = 1; 
  18.             return true
  19.         } 
  20.     } 
  21.  
  22.     public long getNowTime() { 
  23.         return System.currentTimeMillis(); 
  24.     } 
滑动时间窗算法

滑动窗口,又称 Rolling Window。为了解决计数器算法的缺陷,我们引入了滑动窗口算法。下面这张图,很好地解释了滑动窗口算法:

一篇带给你 Sentinel 和常用流控算法

在上图中,整个红色的矩形框表示一个时间窗口,在我们的例子中,一个时间窗口就是一分钟。然后我们将时间窗口进行划分,比如图中,我们就将滑动窗口 划成了6格,所以每格代表的是10秒钟。每过10秒钟,我们的时间窗口就会往右滑动一格。每一个格子都有自己独立的计数器counter,比如当一个请求 在0:35秒的时候到达,那么0:30~0:39对应的counter就会加1。

那么滑动窗口怎么解决刚才的临界问题的呢?我们可以看上图,0:59到达的100个请求会落在灰色的格子中,而1:00到达的请求会落在橘黄色的格子中。当时间到达1:00时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是200个,超过了限定的100个,所以此时能够检测出来触发了限流。

我再来回顾一下刚才的计数器算法,我们可以发现,计数器算法其实就是滑动窗口算法。只是它没有对时间窗口做进一步地划分,所以只有1格。

由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。

实现代码案例:

  1. public class SlideWindow { 
  2.  
  3.     /** 队列id和队列的映射关系,队列里面存储的是每一次通过时候的时间戳,这样可以使得程序里有多个限流队列 */ 
  4.     private volatile static Map<String, List<Long>> MAP = new ConcurrentHashMap<>(); 
  5.  
  6.     private SlideWindow() {} 
  7.  
  8.     public static void main(String[] args) throws InterruptedException { 
  9.         while (true) { 
  10.             // 任意10秒内,只允许2次通过 
  11.             System.out.println(LocalTime.now().toString() + SlideWindow.isGo("ListId", 2, 10000L)); 
  12.             // 睡眠0-10秒 
  13.             Thread.sleep(1000 * new Random().nextInt(10)); 
  14.         } 
  15.     } 
  16.  
  17.     /** 
  18.      * 滑动时间窗口限流算法 
  19.      * 在指定时间窗口,指定限制次数内,是否允许通过 
  20.      * 
  21.      * @param listId     队列id 
  22.      * @param count      限制次数 
  23.      * @param timeWindow 时间窗口大小 
  24.      * @return 是否允许通过 
  25.      */ 
  26.     public static synchronized boolean isGo(String listId, int count, long timeWindow) { 
  27.         // 获取当前时间 
  28.         long nowTime = System.currentTimeMillis(); 
  29.         // 根据队列id,取出对应的限流队列,若没有则创建 
  30.         List<Long> list = MAP.computeIfAbsent(listId, k -> new LinkedList<>()); 
  31.         // 如果队列还没满,则允许通过,并添加当前时间戳到队列开始位置 
  32.         if (list.size() < count) { 
  33.             list.add(0, nowTime); 
  34.             return true
  35.         } 
  36.  
  37.         // 队列已满(达到限制次数),则获取队列中最早添加的时间戳 
  38.         Long farTime = list.get(count - 1); 
  39.         // 用当前时间戳 减去 最早添加的时间戳 
  40.         if (nowTime - farTime <= timeWindow) { 
  41.             // 若结果小于等于timeWindow,则说明在timeWindow内,通过的次数大于count 
  42.             // 不允许通过 
  43.             return false
  44.         } else { 
  45.             // 若结果大于timeWindow,则说明在timeWindow内,通过的次数小于等于count 
  46.             // 允许通过,并删除最早添加的时间戳,将当前时间添加到队列开始位置 
  47.             list.remove(count - 1); 
  48.             list.add(0, nowTime); 
  49.             return true
  50.         } 
  51.     } 
  52.  

在 Sentinel 中 通过 LeapArray 结构来实现时间窗算法, 它的核心代码如下(只列举获取时间窗方法):

  1. /** 
  2.      * 获取当前的时间窗 
  3.      * 
  4.      * Get bucket item at provided timestamp
  5.      * 
  6.      * @param timeMillis a valid timestamp in milliseconds 
  7.      * @return current bucket item at provided timestamp if the time is valid; null if time is invalid 
  8.      */ 
  9. public WindowWrap<T> currentWindow(long timeMillis) { 
  10.   if (timeMillis < 0) { 
  11.     return null
  12.   } 
  13.  
  14.   int idx = calculateTimeIdx(timeMillis); 
  15.   // Calculate current bucket start time
  16.   // 计算窗口的开始时间,计算每个格子的开始时间 
  17.   long windowStart = calculateWindowStart(timeMillis); 
  18.  
  19.   /* 
  20.          * Get bucket item at given time from the array. 
  21.          * 
  22.          * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. 
  23.          * (2) Bucket is up-to-datethen just return the bucket. 
  24.          * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. 
  25.          */ 
  26.   while (true) { 
  27.     WindowWrap<T> old = array.get(idx); 
  28.     // 如果没有窗格,创建窗格 
  29.     if (old == null) { 
  30.       /* 
  31.                  *     B0       B1      B2    NULL      B4 
  32.                  * ||_______|_______|_______|_______|_______||___ 
  33.                  * 200     400     600     800     1000    1200  timestamp 
  34.                  *                             ^ 
  35.                  *                          time=888 
  36.                  *            bucket is empty, so create new and update 
  37.                  * 
  38.                  * If the old bucket is absent, then we create a new bucket at {@code windowStart}, 
  39.                  * then try to update circular array via a CAS operation. Only one thread can 
  40.                  * succeed to update, while other threads yield its time slice. 
  41.                  */ 
  42.       WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); 
  43.       if (array.compareAndSet(idx, null, window)) { 
  44.         // Successfully updated, return the created bucket. 
  45.         return window; 
  46.       } else { 
  47.         // Contention failed, the thread will yield its time slice to wait for bucket available. 
  48.         Thread.yield(); 
  49.       } 
  50.       // 当前窗格存在,返回历史窗格 
  51.     } else if (windowStart == old.windowStart()) { 
  52.       /* 
  53.                  *     B0       B1      B2     B3      B4 
  54.                  * ||_______|_______|_______|_______|_______||___ 
  55.                  * 200     400     600     800     1000    1200  timestamp 
  56.                  *                             ^ 
  57.                  *                          time=888 
  58.                  *            startTime of Bucket 3: 800, so it's up-to-date 
  59.                  * 
  60.                  * If current {@code windowStart} is equal to the start timestamp of old bucket, 
  61.                  * that means the time is within the bucket, so directly return the bucket. 
  62.                  */ 
  63.       return old; 
  64.       // 
  65.     } else if (windowStart > old.windowStart()) { 
  66.       /* 
  67.                  *   (old) 
  68.                  *             B0       B1      B2    NULL      B4 
  69.                  * |_______||_______|_______|_______|_______|_______||___ 
  70.                  * ...    1200     1400    1600    1800    2000    2200  timestamp 
  71.                  *                              ^ 
  72.                  *                           time=1676 
  73.                  *          startTime of Bucket 2: 400, deprecated, should be reset 
  74.                  * 
  75.                  * If the start timestamp of old bucket is behind provided time, that means 
  76.                  * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. 
  77.                  * Note that the reset and clean-up operations are hard to be atomic, 
  78.                  * so we need a update lock to guarantee the correctness of bucket update
  79.                  * 
  80.                  * The update lock is conditional (tiny scope) and will take effect only when 
  81.                  * bucket is deprecated, so in most cases it won't lead to performance loss. 
  82.                  */ 
  83.       if (updateLock.tryLock()) { 
  84.         try { 
  85.           // Successfully get the update lock, now we reset the bucket. 
  86.           // 清空所有的窗格数据 
  87.           return resetWindowTo(old, windowStart); 
  88.         } finally { 
  89.           updateLock.unlock(); 
  90.         } 
  91.       } else { 
  92.         // Contention failed, the thread will yield its time slice to wait for bucket available. 
  93.         Thread.yield(); 
  94.       } 
  95.       // 如果时钟回拨,重新创建时间格 
  96.     } else if (windowStart < old.windowStart()) { 
  97.       // Should not go through here, as the provided time is already behind. 
  98.       return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); 
  99.     } 
  100.   } 
漏桶算法

漏桶算法(Leaky Bucket)是网络世界中流量整形(Traffic Shaping)或速率限制(Rate Limiting)时经常使用的一种算法,它的主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量, 执行过程如下图所示。

一篇带给你 Sentinel 和常用流控算法

实现代码案例:

  1. public class LeakyBucket { 
  2.   public long timeStamp = System.currentTimeMillis();  // 当前时间 
  3.   public long capacity; // 桶的容量 
  4.   public long rate; // 水漏出的速度 
  5.   public long water; // 当前水量(当前累积请求数) 
  6.  
  7.   public boolean grant() { 
  8.     long now = System.currentTimeMillis(); 
  9.     // 先执行漏水,计算剩余水量 
  10.     water = Math.max(0, water - (now - timeStamp) * rate);  
  11.  
  12.     timeStamp = now; 
  13.     if ((water + 1) < capacity) { 
  14.       // 尝试加水,并且水还未满 
  15.       water += 1; 
  16.       return true
  17.     } else { 
  18.       // 水满,拒绝加水 
  19.       return false
  20.     } 
  21.   } 

说明:

(1)未满加水:通过代码 water +=1进行不停加水的动作。

(2)漏水:通过时间差来计算漏水量。

(3)剩余水量:总水量-漏水量。

在 Sentine 中RateLimiterController 实现了了漏桶算法 , 核心代码如下

  1. @Override 
  2. public boolean canPass(Node node, int acquireCount, boolean prioritized) { 
  3.   // Pass when acquire count is less or equal than 0. 
  4.   if (acquireCount <= 0) { 
  5.     return true
  6.   } 
  7.   // Reject when count is less or equal than 0. 
  8.   // Otherwise,the costTime will be max of long and waitTime will overflow in some cases. 
  9.   if (count <= 0) { 
  10.     return false
  11.   } 
  12.  
  13.   long currentTime = TimeUtil.currentTimeMillis(); 
  14.   // Calculate the interval between every two requests. 
  15.   // 计算时间间隔 
  16.   long costTime = Math.round(1.0 * (acquireCount) / count * 1000); 
  17.  
  18.   // Expected pass time of this request. 
  19.   // 期望的执行时间 
  20.   long expectedTime = costTime + latestPassedTime.get(); 
  21.  
  22.   // 当前时间 > 期望时间 
  23.   if (expectedTime <= currentTime) { 
  24.     // Contention may exist here, but it's okay. 
  25.     // 可以通过,并且设置最后通过时间 
  26.     latestPassedTime.set(currentTime); 
  27.     return true
  28.   } else { 
  29.     // Calculate the time to wait. 
  30.     // 等待时间 = 期望时间 - 最后时间 - 当前时间 
  31.     long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis(); 
  32.     // 等待时间 > 最大排队时间 
  33.     if (waitTime > maxQueueingTimeMs) { 
  34.       return false
  35.     } else { 
  36.       // 上次时间 + 间隔时间 
  37.       long oldTime = latestPassedTime.addAndGet(costTime); 
  38.       try { 
  39.         // 等待时间 
  40.         waitTime = oldTime - TimeUtil.currentTimeMillis(); 
  41.         // 等待时间 > 最大排队时间 
  42.         if (waitTime > maxQueueingTimeMs) { 
  43.           latestPassedTime.addAndGet(-costTime); 
  44.           return false
  45.         } 
  46.         // in race condition waitTime may <= 0 
  47.         // 休眠等待 
  48.         if (waitTime > 0) { 
  49.           Thread.sleep(waitTime); 
  50.         } 
  51.         // 等待完了,就放行 
  52.         return true
  53.       } catch (InterruptedException e) { 
  54.       } 
  55.     } 
  56.   } 
  57.   return false
令牌桶算法

令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。如下图所示:

一篇带给你 Sentinel 和常用流控算法

简单的说就是,一边请求时会消耗桶内的令牌,另一边会以固定速率往桶内放令牌。当消耗的请求大于放入的速率时,进行相应的措施,比如等待,或者拒绝等。

实现代码案例:

  1. public class TokenBucket { 
  2.   public long timeStamp = System.currentTimeMillis();  // 当前时间 
  3.   public long capacity; // 桶的容量 
  4.   public long rate; // 令牌放入速度 
  5.   public long tokens; // 当前令牌数量 
  6.  
  7.   public boolean grant() { 
  8.     long now = System.currentTimeMillis(); 
  9.     // 先添加令牌 
  10.     tokens = Math.min(capacity, tokens + (now - timeStamp) * rate); 
  11.     timeStamp = now; 
  12.     if (tokens < 1) { 
  13.       // 若不到1个令牌,则拒绝 
  14.       return false
  15.     } else { 
  16.       // 还有令牌,领取令牌 
  17.       tokens -= 1; 
  18.       return true
  19.     } 
  20.   } 

Sentinel 在 WarmUpController 中运用到了令牌桶算法,在这里可以实现对系统的预热,设定预热时间和水位线,对于预热期间多余的请求直接拒绝掉。

  1. public boolean canPass(Node node, int acquireCount, boolean prioritized) { 
  2.   long passQps = (long) node.passQps(); 
  3.  
  4.   long previousQps = (long) node.previousPassQps(); 
  5.   syncToken(previousQps); 
  6.  
  7.   // 开始计算它的斜率 
  8.   // 如果进入了警戒线,开始调整他的qps 
  9.   long restToken = storedTokens.get(); 
  10.   if (restToken >= warningToken) { 
  11.     long aboveToken = restToken - warningToken; 
  12.     // 消耗的速度要比warning快,但是要比慢 
  13.     // current interval = restToken*slope+1/count 
  14.     double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count)); 
  15.     if (passQps + acquireCount <= warningQps) { 
  16.       return true
  17.     } 
  18.   } else { 
  19.     if (passQps + acquireCount <= count) { 
  20.       return true
  21.     } 
  22.   } 
  23.  
  24.   return false
限流算法总结 计数器 VS 时间窗

时间窗算法的本质也是通过计数器算法实现的。

时间窗算法格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确,但是也会占用更多的内存存储。

漏桶 VS 令牌桶

漏桶算法和令牌桶算法本质上是为了做流量整形或速率限制,避免系统因为大流量而被打崩,但是两者的核心差异在于限流的方向是相反的

漏桶:限制的是流量的流出速率,是相对固定的。

令牌桶 :限制的是流量的平均流入速率,并且允许一定程度的突然性流量,最大速率为桶的容量和生成token的速率。

在某些场景中,漏桶算法并不能有效的使用网络资源,因为漏桶的漏出速率是相对固定的,所以在网络情况比较好并且没有拥塞的状态下,漏桶依然是会有限制的,并不能放开量,因此并不能有效的利用网络资源。而令牌桶算法则不同,其在限制平均速率的同时,支持一定程度的突发流量。

参考文档

https://www.cnblogs.com/linjiqin/p/9707713.html

https://www.cnblogs.com/dijia478/p/13807826.html

原文链接:https://mp.weixin.qq.com/s/0fxrAFpK4Aqm6qtFLoEAZw