一、简单的限流
基本原理
当系统处理能力有限,如何组织计划外的请求对系统施压。首先我们先看下一些简单的限流策略,防止暴力攻击。比如要对ip访问,没5s只能访问10次,超过进行拦截。
如上图,一般使用滑动窗口来统计区间时间内的访问次数。
使用 zset 记录 ip 访问次数,每个 ip 通过 key 保存下来,score 保存当前时间戳,value 唯一用时间戳或者uuid来实现
代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
public class redislimitertest {
private jedis jedis;
public redislimitertest(jedis jedis) {
this .jedis = jedis;
}
/**
* @param ipaddress ip地址
* @param period 特定的时间内,单位秒
* @param maxcount 最大允许的次数
* @return
*/
public boolean isiplimit(string ipaddress, int period, int maxcount) {
string key = string.format( "ip:%s" , ipaddress);
// 毫秒时间戳
long currenttimemillis = system.currenttimemillis();
pipeline pipe = jedis.pipelined();
// redis事务,保证原子性
pipe.multi();
// 存放数据,value 和 score 都使用毫秒时间戳
pipe.zadd(key, currenttimemillis, "" + uuid.randomuuid());
// 移除窗口区间所有的元素
pipe.zremrangebyscore(key, 0 , currenttimemillis - period * 1000 );
// 获取时间窗口内的行为数量
response< long > count = pipe.zcard(key);
// 设置 zset 过期时间,避免冷用户持续占用内存,这里宽限1s
pipe.expire(key, period + 1 );
// 提交事务
pipe.exec();
pipe.close();
// 比较数量是否超标
return count.get() > maxcount;
}
public static void main(string[] args) {
jedis jedis = new jedis( "localhost" , 6379 );
redislimitertest limiter = new redislimitertest(jedis);
for ( int i = 1 ; i <= 20 ; i++) {
// 验证ip 10秒钟之内只能访问5次
boolean islimit = limiter.isiplimit( "222.73.55.22" , 10 , 5 );
system.out.println( "访问第" + i + "次, 结果:" + (islimit ? "限制访问" : "允许访问" ));
}
}
}
|
执行结果
访问第1次, 结果:允许访问
访问第2次, 结果:允许访问
访问第3次, 结果:允许访问
访问第4次, 结果:允许访问
访问第5次, 结果:允许访问
访问第6次, 结果:限制访问
访问第7次, 结果:限制访问
... ...
缺点:要记录时间窗口所有的行为记录,量很大,比如,限定60s内不能超过100万次这种场景,不太适合这样限流,因为会消耗大量的储存空间。
二、漏斗限流
基本原理
- 漏斗的容量是限定的,如果满了,就装不进去了。
- 如果将漏嘴放开,水就会往下流,流走一部分之后,就又可以继续往里面灌水。
- 如果漏嘴流水的速率大于灌水的速率,那么漏斗永远都装不满。
- 如果漏嘴流水速率小于灌水的速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
public class funnellimitertest {
static class funnel {
int capacity; // 漏斗容量
float leakingrate; // 漏嘴流水速率
int leftquota; // 漏斗剩余空间
long leakingts; // 上一次漏水时间
public funnel( int capacity, float leakingrate) {
this .capacity = capacity;
this .leakingrate = leakingrate;
this .leftquota = capacity;
this .leakingts = system.currenttimemillis();
}
void makespace() {
long nowts = system.currenttimemillis();
long deltats = nowts - leakingts; // 距离上一次漏水过去了多久
int deltaquota = ( int ) (deltats * leakingrate); // 腾出的空间 = 时间*漏水速率
if (deltaquota < 0 ) { // 间隔时间太长,整数数字过大溢出
this .leftquota = capacity;
this .leakingts = nowts;
return ;
}
if (deltaquota < 1 ) { // 腾出空间太小 就等下次,最小单位是1
return ;
}
this .leftquota += deltaquota; // 漏斗剩余空间 = 漏斗剩余空间 + 腾出的空间
this .leakingts = nowts;
if ( this .leftquota > this .capacity) { // 剩余空间不得高于容量
this .leftquota = this .capacity;
}
}
boolean watering( int quota) {
makespace();
if ( this .leftquota >= quota) { // 判断剩余空间是否足够
this .leftquota -= quota;
return true ;
}
return false ;
}
}
// 所有的漏斗
private map<string, funnel> funnels = new hashmap<>();
/**
* @param capacity 漏斗容量
* @param leakingrate 漏嘴流水速率 quota/s
*/
public boolean isiplimit(string ipaddress, int capacity, float leakingrate) {
string key = string.format( "ip:%s" , ipaddress);
funnel funnel = funnels.get(key);
if (funnel == null ) {
funnel = new funnel(capacity, leakingrate);
funnels.put(key, funnel);
}
return !funnel.watering( 1 ); // 需要1个quota
}
public static void main(string[] args) throws exception{
funnellimitertest limiter = new funnellimitertest();
for ( int i = 1 ; i <= 50 ; i++) {
// 每1s执行一次
thread.sleep( 1000 );
// 漏斗容量是2 ,漏嘴流水速率是0.5每秒,
boolean islimit = limiter.isiplimit( "222.73.55.22" , 2 , ( float ) 0.5 / 1000 );
system.out.println( "访问第" + i + "次, 结果:" + (islimit ? "限制访问" : "允许访问" ));
}
}
}
|
执行结果
访问第1次, 结果:允许访问 # 第1次,容量剩余2,执行后1
访问第2次, 结果:允许访问 # 第2次,容量剩余1,执行后0
访问第3次, 结果:允许访问 # 第3次,由于过了2s, 漏斗流水剩余1个空间,所以容量剩余1,执行后0
访问第4次, 结果:限制访问 # 第4次,过了1s, 剩余空间小于1, 容量剩余0
访问第5次, 结果:允许访问 # 第5次,由于过了2s, 漏斗流水剩余1个空间,所以容量剩余1,执行后0
访问第6次, 结果:限制访问 # 以此类推...
访问第7次, 结果:允许访问
访问第8次, 结果:限制访问
访问第9次, 结果:允许访问
访问第10次, 结果:限制访问
我们观察 funnel 对象的几个字段,我们发现可以将 funnel 对象的内容按字段存储到一个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。
但是有个问题,我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。
如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码的复杂度也跟着升高很多。这真是个艰难的选择,我们该如何解决这个问题呢?redis-cell 救星来了!
redis-cell
redis 4.0 提供了一个限流 redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并提供了原子的限流指令。
该模块只有1条指令cl.throttle,它的参数和返回值都略显复杂,接下来让我们来看看这个指令具体该如何使用。
1
|
> cl.throttle key:xxx 15 30 60 1
|
15 : 15 capacity 这是漏斗容量
30 60 : 30 operations / 60 seconds 这是漏水速率
1 : need 1 quota (可选参数,默认值也是1)
1
2
3
4
5
6
|
> cl.throttle laoqian:reply 15 30 60
1) (integer) 0 # 0 表示允许,1表示拒绝
2) (integer) 15 # 漏斗容量capacity
3) (integer) 14 # 漏斗剩余空间left_quota
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)
|
在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试。
参考来源
《redis深度历险 核心原理与应用实践》_钱文品
到此这篇关于redis限流的几种实现的文章就介绍到这了,更多相关redis限流内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://juejin.cn/post/7040631021343080478