详解spring cloud hystrix 请求合并collapsing

时间:2022-09-26 20:34:56

hystrixcommand之前可以使用请求合并器(hystrixcollapser就是一个抽象的父类)来把多个请求合并成一个然后对后端依赖系统发起调用。

下图显示了两种情况下线程的数量和网络的连接数的情况:第一种是不使用合并器,第二种是使用请求合并器(假设所有的链接都是在一个短的时间窗口内并行的,比如10ms内)。

详解spring cloud hystrix 请求合并collapsing

为什么要使用请求合并?

使用请求合并来减少执行并发hystrixcommand执行所需的线程数和网络连接数,请求合并是自动执行的,不会强制开发人员手动协调批处理请求。

全局上下文-global context(跨越所有tomcat线程)

这种合并类型是在全局应用级别上完成的,因此任何tomcat线程上的任何用户的请求都可以一起合并。

例如,如果您配置一个hystrixcommand支持任何用户请求依赖关系来检索电影评级,那么当同一个jvm中的任何用户线程发出这样的请求时,hystrix会将其请求与任何其他请求一起添加到同一个已折叠网络通话。

用户请求上下文-request context(单个tomcat线程)

如果你配置一个hystrixcommand仅仅为一个单个用户处理批量请求,hystrix可以在一个tomcat线程(请求)中合并请求。

例如,一个用户想要加载300个视频对象的书签,不是去执行300次网络请求,hystrix能够将他们合并成为一个。

hystrix默认是的就是request-scope,要使用request-scoped的功能(request caching,request collapsing, request log)你必须管理hystrixrequestcontext的生命周期(或者实现一个可替代的hystrixconcurrencystrategy
这就意味你在执行一个请求之前需要执行以下的代码:

 

复制代码 代码如下:
hystrixrequestcontext  context=hystrixrequestcontext.initializecontext();

 

并且在请求的结束位置执行:

?
1
context.shutdown();

在标准的javaweb应用中,你也可以使用一个servlet过滤器来初始化这个生命周期

?
1
2
3
4
5
6
7
8
9
10
11
12
public class hystrixrequestcontextservletfilter implements filter {
 
 public void dofilter(servletrequest request, servletresponse response, filterchain chain)
  throws ioexception, servletexception {
  hystrixrequestcontext context = hystrixrequestcontext.initializecontext();
  try {
   chain.dofilter(request, response);
  } finally {
   context.shutdown();
  }
 }
}

然后将它配置在web.xml中

?
1
2
3
4
5
6
7
8
9
<filter>
 <display-name>hystrixrequestcontextservletfilter</display-name>
 <filter-name>hystrixrequestcontextservletfilter</filter-name>
 <filter-class>com.netflix.hystrix.contrib.requestservlet.hystrixrequestcontextservletfilter</filter-class>
</filter>
<filter-mapping>
 <filter-name>hystrixrequestcontextservletfilter</filter-name>
 <url-pattern>/*</url-pattern>
</filter-mapping>

如果你是springboot开发的话代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@webfilter(filtername = "hystrixrequestcontextservletfilter",urlpatterns = "/*")
public class hystrixrequestcontextservletfilter implements filter {
 @override
 public void init(filterconfig filterconfig) throws servletexception {
 
 }
 
 @override
 public void dofilter(servletrequest servletrequest, servletresponse servletresponse, filterchain filterchain) throws ioexception, servletexception {
  hystrixrequestcontext context = hystrixrequestcontext.initializecontext();
  try{
   filterchain.dofilter(servletrequest,servletresponse);
  }finally {
   context.shutdown();
  }
 }
 
 @override
 public void destroy() {
 
 }
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
@springbootapplication
@enablediscoveryclient
@enablefeignclients
@enablehystrix
//这个是必须的,否则filter无效
@servletcomponentscan
public class application {
 
 public static void main(string[] args) {
  new springapplicationbuilder(application.class).web(true).run(args);
 }
 
}

请求合并的成本是多少?

启用请求合并的成本是在执行实际命令之前的延迟。最大的成本是批处理窗口的大小,默认是10ms。

如果你有一个命令需要花费5ms去执行并且有一个10ms的批处理窗口,执行的时间最坏的情况是15ms,一般情况下,请求不会在批处理窗口刚打开的时候发生,所以时间窗口的中间值是时间窗口的一半,在这种情况下是5ms。

这个成本是否值得取决于正在执行的命令,高延迟命令不会受到少量附加平均延迟的影响。而且,给定命令的并发量也是关键:如果很少有超过1个或2个请求被组合在一起,那么这个成本就是不值得的。事实上,在一个单线程的顺序迭代请求合并将会是一个主要的性能瓶颈,每一次迭代都会等待10ms的窗口等待时间。

但是,如果一个特定的命令同时被大量使用,并且可以同时批量打几十个甚至几百个呼叫,那么成本通常远远超过所达到的吞吐量的增加,因为hystrix减少了它所需的线程数量,依赖。(这段话不太好理解,其实就是说如果并发比较高,这个成本是值得的,因为hystrix可以节省很多线程和连接资源)。

请求合并的流程(如下图)

详解spring cloud hystrix 请求合并collapsing

理论知识已经讲完了,下面来看看例子,下面的例子集成了eureka+feign+hystrix,完整的例子请查看:https://github.com/jingangwang/micro-service

实体类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class user {
 
 private integer id;
 private string username;
 private integer age;
 
 public user() {
 }
 
 public user(integer id, string username, integer age) {
  this.id = id;
  this.username = username;
  this.age = age;
 }
 
 public integer getid() {
  return id;
 }
 
 public void setid(integer id) {
  this.id = id;
 }
 
 public string getusername() {
  return username;
 }
 
 public void setusername(string username) {
  this.username = username;
 }
 
 public integer getage() {
  return age;
 }
 
 public void setage(integer age) {
  this.age = age;
 }
 
 @override
 public string tostring() {
  final stringbuffer sb = new stringbuffer("user{");
  sb.append("id=").append(id);
  sb.append(", username='").append(username).append(''');
  sb.append(", age=").append(age);
  sb.append('}');
  return sb.tostring();
 }
}

服务提供者代码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@restcontroller
@requestmapping("user")
public class usercontroller {
 
 @requestmapping("getuser")
 public user getuser(integer id) {
  return new user(id, "test", 29);
 }
 
 @requestmapping("getalluser")
 public list<user> getalluser(string ids){
  string[] split = ids.split(",");
  return arrays.aslist(split)
    .stream()
    .map(id -> new user(integer.valueof(id),"test"+id,30))
    .collect(collectors.tolist());
 }
}

消费者代码

userfeignclient

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@feignclient(name = "eureka-provider",configuration = feignconfiguration.class)
public interface userfeignclient {
 /**
  * 根据id查找用户
  * @param id 用户id
  * @return  user
  */
 @requestmapping(value = "user/getuser.json",method = requestmethod.get)
 user finduserbyid(@requestparam("id") integer id);
 
 
 /**
  * 超找用户列表
  * @param ids id列表
  * @return 用户的集合
  */
 @requestmapping(value = "user/getalluser.json",method = requestmethod.get)
 list<user> findalluser(@requestparam("ids") string ids);
}

userservice(设置为全局上下文)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@service
public class userservice {
 @autowired
 private userfeignclient userfeignclient;
 
 
 
 /**
  * maxrequestsinbatch        该属性设置批量处理的最大请求数量,默认值为integer.max_value
  * timerdelayinmilliseconds       该属性设置多长时间之内算一次批处理,默认为10ms
  * @param id
  * @return
  */
 @hystrixcollapser(collapserkey = "findcollapserkey",scope = com.netflix.hystrix.hystrixcollapser.scope.global,batchmethod = "findalluser",collapserproperties = {
   @hystrixproperty(name = "timerdelayinmilliseconds",value = "5000" ),
   @hystrixproperty(name = "maxrequestsinbatch",value = "5" )
 })
 public future<user> find(integer id){
  return null;
 }
 
 @hystrixcommand(commandkey = "findalluser")
 public list<user> findalluser(list<integer> ids){
  return userfeignclient.findalluser(stringutils.join(ids,","));
 }
}

feigncollapsercontroller

?
1
2
3
4
5
6
7
8
9
@requestmapping("user")
@restcontroller
public class feigncollapsercontroller {
 @autowired
 private userservice userservice;
 @requestmapping("finduser")
 public user getuser(integer id) throws executionexception, interruptedexception {
  return userservice.find(id).get();
 }

上面的代码我们这是的是全局上下文(所有tomcat的线程的请求都可以合并),合并的时间窗口为5s(每一次请求都得等5s才发起请求),最大合并数为5。我们在postman中,5s之内发起两次请求,用户id不一样。

localhost:8082/user/finduser.json?id=123189891

localhost:8082/user/finduser.json?id=222222

结果如下图所示,两次请求合并为一次请求批量请求。

详解spring cloud hystrix 请求合并collapsing

我们再来测试一下请求上下文(request-scope)的情况,加入上面所提到的hystrixrequestcontextservletfilter,并修改userservice

hystrixrequestcontextservletfilter

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
 * @author wjg
 * @date 2017/12/22 15:15
 */
@webfilter(filtername = "hystrixrequestcontextservletfilter",urlpatterns = "/*")
public class hystrixrequestcontextservletfilter implements filter {
 @override
 public void init(filterconfig filterconfig) throws servletexception {
 
 }
 
 @override
 public void dofilter(servletrequest servletrequest, servletresponse servletresponse, filterchain filterchain) throws ioexception, servletexception {
  hystrixrequestcontext context = hystrixrequestcontext.initializecontext();
  try{
   filterchain.dofilter(servletrequest,servletresponse);
  }finally {
   context.shutdown();
  }
 }
 
 @override
 public void destroy() {
 
 }
}

userservice(设置为请求上下文)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@service
public class userservice {
 @autowired
 private userfeignclient userfeignclient;
 
 
 
 /**
  * maxrequestsinbatch        该属性设置批量处理的最大请求数量,默认值为integer.max_value
  * timerdelayinmilliseconds       该属性设置多长时间之内算一次批处理,默认为10ms
  * @param id
  * @return
  */
 @hystrixcollapser(collapserkey = "findcollapserkey",scope = com.netflix.hystrix.hystrixcollapser.scope.request,batchmethod = "findalluser",collapserproperties = {
   @hystrixproperty(name = "timerdelayinmilliseconds",value = "5000" ),
   @hystrixproperty(name = "maxrequestsinbatch",value = "5" )
 })
 public future<user> find(integer id){
  return null;
 }
 
 @hystrixcommand(commandkey = "findalluser")
 public list<user> findalluser(list<integer> ids){
  return userfeignclient.findalluser(stringutils.join(ids,","));
 }
}

feigncollapser2controller

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@requestmapping("user")
@restcontroller
public class feigncollapser2controller {
 @autowired
 private userservice userservice;
 
 @requestmapping("finduser2")
 public list<user> getuser() throws executionexception, interruptedexception {
  future<user> user1 = userservice.find(1989);
  future<user> user2= userservice.find(1990);
  list<user> users = new arraylist<>();
  users.add(user1.get());
  users.add(user2.get());
  return users;
 }
}

我们在postman中输入:localhost:8082/user/finduser2.json

详解spring cloud hystrix 请求合并collapsing

可以看到一个请求内的两次连续调用被合并了。这个地方要注意,不能直接使用userserver.find(1989).get(),否则直接按同步执行处理,不会合并。如果两个tab页同时调用上述地址,发现发起了两次批量请求,说明作用域是request范围。

参考资料如下:

https://github.com/netflix/hystrix/wiki/how-to-use

http://www.zzvips.com/article/160601.html

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/heartroll/article/details/78872436