ribbon是netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法,将netflix的中间层服务连接在一起。ribbon客户端组件提供一系列完善的配置项如连接超时,重试等。简单的说,就是在配置文件中列出load balancer(简称lb)后面所有的机器,ribbon会自动的帮助你基于某种规则(如简单轮询,随即连接等)去连接这些机器。我们也很容易使用ribbon实现自定义的负载均衡算法。
说起负载均衡一般都会想到服务端的负载均衡,常用产品包括lbs硬件或云服务、nginx等,都是耳熟能详的产品。
而spring cloud提供了让服务调用端具备负载均衡能力的ribbon,通过和eureka的紧密结合,不用在服务集群内再架设负载均衡服务,很大程度简化了服务集群内的架构。
具体也不想多写虚的介绍,反正哪里都能看得到相关的介绍。
直接开撸代码,通过代码来看ribbon是如何实现的。
配置
详解:
1.ribbonautoconfiguration配置生成ribbonloadbalancerclient实例。
代码位置:
spring-cloud-netflix-core-1.3.5.release.jar
org.springframework.cloud.netflix.ribbon
ribbonautoconfiguration.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@configuration
@conditionalonclass ({ iclient. class , resttemplate. class , asyncresttemplate. class , ribbon. class })
@ribbonclients
@autoconfigureafter (name = "org.springframework.cloud.netflix.eureka.eurekaclientautoconfiguration" )
@autoconfigurebefore ({loadbalancerautoconfiguration. class , asyncloadbalancerautoconfiguration. class })
@enableconfigurationproperties (ribboneagerloadproperties. class )
public class ribbonautoconfiguration {
// 略
@bean
@conditionalonmissingbean (loadbalancerclient. class )
public loadbalancerclient loadbalancerclient() {
return new ribbonloadbalancerclient(springclientfactory());
}
// 略
}
|
先看配置条件项,ribbonautoconfiguration配置必须在loadbalancerautoconfiguration配置前执行,因为在loadbalancerautoconfiguration配置中会使用ribbonloadbalancerclient实例。
ribbonloadbalancerclient继承自loadbalancerclient接口,是负载均衡客户端,也是负载均衡策略的调用方。
2.loadbalancerinterceptorconfig配置生成:
1).负载均衡拦截器loadbalancerinterceptor实例
包含:
loadbalancerclient实现类的ribbonloadbalancerclient实例
负载均衡的请求创建工厂loadbalancerrequestfactory:实例
2).resttemplate自定义的resttemplatecustomizer实例
代码位置:
spring-cloud-commons-1.2.4.release.jar
org.springframework.cloud.client.loadbalancer
loadbalancerautoconfiguration.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
@configuration
@conditionalonclass (resttemplate. class )
@conditionalonbean (loadbalancerclient. class )
@enableconfigurationproperties (loadbalancerretryproperties. class )
public class loadbalancerautoconfiguration {
// 略
@bean
@conditionalonmissingbean
public loadbalancerrequestfactory loadbalancerrequestfactory(
loadbalancerclient loadbalancerclient) {
return new loadbalancerrequestfactory(loadbalancerclient, transformers);
}
@configuration
@conditionalonmissingclass ( "org.springframework.retry.support.retrytemplate" )
static class loadbalancerinterceptorconfig {
@bean
public loadbalancerinterceptor ribboninterceptor(
loadbalancerclient loadbalancerclient,
loadbalancerrequestfactory requestfactory) {
return new loadbalancerinterceptor(loadbalancerclient, requestfactory);
}
@bean
@conditionalonmissingbean
public resttemplatecustomizer resttemplatecustomizer(
final loadbalancerinterceptor loadbalancerinterceptor) {
return new resttemplatecustomizer() {
@override
public void customize(resttemplate resttemplate) {
list<clienthttprequestinterceptor> list = new arraylist<>(
resttemplate.getinterceptors());
list.add(loadbalancerinterceptor);
resttemplate.setinterceptors(list);
}
};
}
}
// 略
}
|
先看配置条件项:
要求在项目环境中必须要有resttemplate类。
要求必须要有loadbalancerclient接口的实现类的实例,也就是上一步生成的ribbonloadbalancerclient。
3.通过上面一步创建的resttemplatecustomizer配置所有resttemplate实例,就是将负载均衡拦截器设置给resttemplate实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
@configuration
@conditionalonclass (resttemplate. class )
@conditionalonbean (loadbalancerclient. class )
@enableconfigurationproperties (loadbalancerretryproperties. class )
public class loadbalancerautoconfiguration {
// 略
@bean
public smartinitializingsingleton loadbalancedresttemplateinitializer(
final list<resttemplatecustomizer> customizers) {
return new smartinitializingsingleton() {
@override
public void aftersingletonsinstantiated() {
for (resttemplate resttemplate : loadbalancerautoconfiguration. this .resttemplates) {
for (resttemplatecustomizer customizer : customizers) {
customizer.customize(resttemplate);
}
}
}
};
}
// 略
@configuration
@conditionalonmissingclass ( "org.springframework.retry.support.retrytemplate" )
static class loadbalancerinterceptorconfig {
@bean
public loadbalancerinterceptor ribboninterceptor(
loadbalancerclient loadbalancerclient,
loadbalancerrequestfactory requestfactory) {
return new loadbalancerinterceptor(loadbalancerclient, requestfactory);
}
@bean
@conditionalonmissingbean
public resttemplatecustomizer resttemplatecustomizer(
final loadbalancerinterceptor loadbalancerinterceptor) {
return new resttemplatecustomizer() {
@override
public void customize(resttemplate resttemplate) {
list<clienthttprequestinterceptor> list = new arraylist<>(
resttemplate.getinterceptors());
list.add(loadbalancerinterceptor);
resttemplate.setinterceptors(list);
}
};
}
}
// 略
}
|
resttemplate.setinterceptors(list)这个地方就是注入负载均衡拦截器的地方loadbalancerinterceptor。
从这个地方实际上也可以猜出来,resttemplate可以通过注入的拦截器来构建相应的请求实现负载均衡。
也能看出来可以自定义拦截器实现其他目的。
4.ribbonclientconfiguration配置生成zoneawareloadbalancer实例
代码位置:
spring-cloud-netflix-core-1.3.5.release.jar
org.springframework.cloud.netflix.ribbon
ribbonclientconfiguration.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@suppresswarnings ( "deprecation" )
@configuration
@enableconfigurationproperties
//order is important here, last should be the default, first should be optional
// see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653
@import ({okhttpribbonconfiguration. class , restclientribbonconfiguration. class , httpclientribbonconfiguration. class })
public class ribbonclientconfiguration {
// 略
@bean
@conditionalonmissingbean
public iloadbalancer ribbonloadbalancer(iclientconfig config,
serverlist<server> serverlist, serverlistfilter<server> serverlistfilter,
irule rule, iping ping, serverlistupdater serverlistupdater) {
if ( this .propertiesfactory.isset(iloadbalancer. class , name)) {
return this .propertiesfactory.get(iloadbalancer. class , config, name);
}
return new zoneawareloadbalancer<>(config, rule, ping, serverlist,
serverlistfilter, serverlistupdater);
}
// 略
}
|
zoneawareloadbalancer继承自iloadbalancer接口,该接口有一个方法:
1
2
3
4
5
6
7
8
|
/**
* choose a server from load balancer.
*
* @param key an object that the load balancer may use to determine which server to return. null if
* the load balancer does not use this parameter.
* @return server chosen
*/
public server chooseserver(object key);
|
zoneawareloadbalancer就是一个具体的负载均衡实现类,也是默认的负载均衡类,通过对chooseserver方法的实现选取某个服务实例。
拦截&请求
1.使用resttemplate进行get、post等各种请求,都是通过doexecute方法实现
代码位置:
spring-web-4.3.12.release.jar
org.springframework.web.client
resttemplate.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class resttemplate extends interceptinghttpaccessor implements restoperations {
// 略
protected <t> t doexecute(uri url, httpmethod method, requestcallback requestcallback,
responseextractor<t> responseextractor) throws restclientexception {
assert .notnull(url, "'url' must not be null" );
assert .notnull(method, "'method' must not be null" );
clienthttpresponse response = null ;
try {
clienthttprequest request = createrequest(url, method);
if (requestcallback != null ) {
requestcallback.dowithrequest(request);
}
response = request.execute();
handleresponse(url, method, response);
if (responseextractor != null ) {
return responseextractor.extractdata(response);
}
else {
return null ;
}
}
catch (ioexception ex) {
string resource = url.tostring();
string query = url.getrawquery();
resource = (query != null ? resource.substring( 0 , resource.indexof( '?' )) : resource);
throw new resourceaccessexception( "i/o error on " + method.name() +
" request for \"" + resource + "\": " + ex.getmessage(), ex);
}
finally {
if (response != null ) {
response.close();
}
}
}
// 略
}
|
支持的各种http请求方法最终都是调用doexecute方法,该方法内调用创建方法创建请求实例,并执行请求得到响应对象。
2.生成请求实例创建工厂
上一步代码中,调用createrequest方法创建请求实例,这个方法是定义在父类中。
先整理出主要的继承关系:
createrequest方法实际是定义在httpaccessor抽象类中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public abstract class httpaccessor {
private clienthttprequestfactory requestfactory = new simpleclienthttprequestfactory();
public void setrequestfactory(clienthttprequestfactory requestfactory) {
assert .notnull(requestfactory, "clienthttprequestfactory must not be null" );
this .requestfactory = requestfactory;
}
public clienthttprequestfactory getrequestfactory() {
return this .requestfactory;
}
protected clienthttprequest createrequest(uri url, httpmethod method) throws ioexception {
clienthttprequest request = getrequestfactory().createrequest(url, method);
if (logger.isdebugenabled()) {
logger.debug( "created " + method.name() + " request for \"" + url + "\"" );
}
return request;
}
}
|
在createrequest方法中调用getrequestfactory方法获得请求实例创建工厂,实际上getrequestfactory并不是当前httpaccessor类中定义的,而是在子类interceptinghttpaccessor中定义的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public abstract class interceptinghttpaccessor extends httpaccessor {
private list<clienthttprequestinterceptor> interceptors = new arraylist<clienthttprequestinterceptor>();
public void setinterceptors(list<clienthttprequestinterceptor> interceptors) {
this .interceptors = interceptors;
}
public list<clienthttprequestinterceptor> getinterceptors() {
return interceptors;
}
@override
public clienthttprequestfactory getrequestfactory() {
clienthttprequestfactory delegate = super .getrequestfactory();
if (!collectionutils.isempty(getinterceptors())) {
return new interceptingclienthttprequestfactory(delegate, getinterceptors());
}
else {
return delegate;
}
}
}
|
在这里做了个小动作,首先还是通过httpaccessor类创建并获得simpleclienthttprequestfactory工厂,这个工厂主要就是在没有拦截器的时候创建基本请求实例。
其次,在有拦截器注入的情况下,创建interceptingclienthttprequestfactory工厂,该工厂就是创建带拦截器的请求实例,因为注入了负载均衡拦截器,所以这里就从interceptingclienthttprequestfactory工厂创建。
3.通过工厂创建请求实例
创建实例就看工厂的createrequest方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class interceptingclienthttprequestfactory extends abstractclienthttprequestfactorywrapper {
private final list<clienthttprequestinterceptor> interceptors;
public interceptingclienthttprequestfactory(clienthttprequestfactory requestfactory,
list<clienthttprequestinterceptor> interceptors) {
super (requestfactory);
this .interceptors = (interceptors != null ? interceptors : collections.<clienthttprequestinterceptor>emptylist());
}
@override
protected clienthttprequest createrequest(uri uri, httpmethod httpmethod, clienthttprequestfactory requestfactory) {
return new interceptingclienthttprequest(requestfactory, this .interceptors, uri, httpmethod);
}
}
|
就是new了个interceptingclienthttprequest实例,并且把拦截器、基本请求实例创建工厂注进去。
4.请求实例调用配置阶段注入的负载均衡拦截器的拦截方法intercept
可从第1步看出,创建完请求实例后,通过执行请求实例的execute方法执行请求。
1
2
3
4
5
|
clienthttprequest request = createrequest(url, method);
if (requestcallback != null ) {
requestcallback.dowithrequest(request);
}
response = request.execute();
|
实际请求实例是interceptingclienthttprequest,execute实际是在它的父类中。
类定义位置:
spring-web-4.3.12.release.jar
org.springframework.http.client
interceptingclienthttprequest.class
看一下它们的继承关系。
在execute方法中实际调用了子类实现的executeinternal方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public abstract class abstractclienthttprequest implements clienthttprequest {
private final httpheaders headers = new httpheaders();
private boolean executed = false ;
@override
public final httpheaders getheaders() {
return ( this .executed ? httpheaders.readonlyhttpheaders( this .headers) : this .headers);
}
@override
public final outputstream getbody() throws ioexception {
assertnotexecuted();
return getbodyinternal( this .headers);
}
@override
public final clienthttpresponse execute() throws ioexception {
assertnotexecuted();
clienthttpresponse result = executeinternal( this .headers);
this .executed = true ;
return result;
}
protected void assertnotexecuted() {
assert .state(! this .executed, "clienthttprequest already executed" );
}
protected abstract outputstream getbodyinternal(httpheaders headers) throws ioexception;
protected abstract clienthttpresponse executeinternal(httpheaders headers) throws ioexception;
}
|
其实就是interceptingclienthttprequest类的executeinternal方法,其中,又调用了一个执行器interceptingrequestexecution的execute,通关判断如果有拦截器注入进来过,就调用拦截器的intercept方法。
这里的拦截器实际上就是在配置阶段注入进resttemplate实例的负载均衡拦截器loadbalancerinterceptor实例,可参考上面配置阶段的第2步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
class interceptingclienthttprequest extends abstractbufferingclienthttprequest {
// 略
@override
protected final clienthttpresponse executeinternal(httpheaders headers, byte [] bufferedoutput) throws ioexception {
interceptingrequestexecution requestexecution = new interceptingrequestexecution();
return requestexecution.execute( this , bufferedoutput);
}
private class interceptingrequestexecution implements clienthttprequestexecution {
private final iterator<clienthttprequestinterceptor> iterator;
public interceptingrequestexecution() {
this .iterator = interceptors.iterator();
}
@override
public clienthttpresponse execute(httprequest request, byte [] body) throws ioexception {
if ( this .iterator.hasnext()) {
clienthttprequestinterceptor nextinterceptor = this .iterator.next();
return nextinterceptor.intercept(request, body, this );
}
else {
clienthttprequest delegate = requestfactory.createrequest(request.geturi(), request.getmethod());
for (map.entry<string, list<string>> entry : request.getheaders().entryset()) {
list<string> values = entry.getvalue();
for (string value : values) {
delegate.getheaders().add(entry.getkey(), value);
}
}
if (body.length > 0 ) {
streamutils.copy(body, delegate.getbody());
}
return delegate.execute();
}
}
}
}
|
5.负载均衡拦截器调用负载均衡客户端
在负载均衡拦截器loadbalancerinterceptor类的intercept方法中,又调用了负载均衡客户端loadbalancerclient实现类的execute方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class loadbalancerinterceptor implements clienthttprequestinterceptor {
private loadbalancerclient loadbalancer;
private loadbalancerrequestfactory requestfactory;
public loadbalancerinterceptor(loadbalancerclient loadbalancer, loadbalancerrequestfactory requestfactory) {
this .loadbalancer = loadbalancer;
this .requestfactory = requestfactory;
}
public loadbalancerinterceptor(loadbalancerclient loadbalancer) {
// for backwards compatibility
this (loadbalancer, new loadbalancerrequestfactory(loadbalancer));
}
@override
public clienthttpresponse intercept( final httprequest request, final byte [] body,
final clienthttprequestexecution execution) throws ioexception {
final uri originaluri = request.geturi();
string servicename = originaluri.gethost();
assert .state(servicename != null , "request uri does not contain a valid hostname: " + originaluri);
return this .loadbalancer.execute(servicename, requestfactory.createrequest(request, body, execution));
}
}
|
在配置阶段的第1步,可以看到实现类是ribbonloadbalancerclient。
6.负载均衡客户端调用负载均衡策略选取目标服务实例并发起请求
在ribbonloadbalancerclient的第一个execute方法以及getserver方法中可以看到,实际上是通过iloadbalancer的负载均衡器实现类作的chooseserver方法选取一个服务,交给接下来的请求对象发起一个请求。
这里的负载均衡实现类默认是zoneawareloadbalancer区域感知负载均衡器实例,其内部通过均衡策略选择一个服务。
zoneawareloadbalancer的创建可以参考配置阶段的第4步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
public class ribbonloadbalancerclient implements loadbalancerclient {
@override
public <t> t execute(string serviceid, loadbalancerrequest<t> request) throws ioexception {
iloadbalancer loadbalancer = getloadbalancer(serviceid);
server server = getserver(loadbalancer);
if (server == null ) {
throw new illegalstateexception( "no instances available for " + serviceid);
}
ribbonserver ribbonserver = new ribbonserver(serviceid, server, issecure(server,
serviceid), serverintrospector(serviceid).getmetadata(server));
return execute(serviceid, ribbonserver, request);
}
@override
public <t> t execute(string serviceid, serviceinstance serviceinstance, loadbalancerrequest<t> request) throws ioexception {
server server = null ;
if (serviceinstance instanceof ribbonserver) {
server = ((ribbonserver)serviceinstance).getserver();
}
if (server == null ) {
throw new illegalstateexception( "no instances available for " + serviceid);
}
ribbonloadbalancercontext context = this .clientfactory
.getloadbalancercontext(serviceid);
ribbonstatsrecorder statsrecorder = new ribbonstatsrecorder(context, server);
try {
t returnval = request.apply(serviceinstance);
statsrecorder.recordstats(returnval);
return returnval;
}
// catch ioexception and rethrow so resttemplate behaves correctly
catch (ioexception ex) {
statsrecorder.recordstats(ex);
throw ex;
}
catch (exception ex) {
statsrecorder.recordstats(ex);
reflectionutils.rethrowruntimeexception(ex);
}
return null ;
}
// 略
protected server getserver(iloadbalancer loadbalancer) {
if (loadbalancer == null ) {
return null ;
}
return loadbalancer.chooseserver( "default" ); // todo: better handling of key
}
protected iloadbalancer getloadbalancer(string serviceid) {
return this .clientfactory.getloadbalancer(serviceid);
}
public static class ribbonserver implements serviceinstance {
private final string serviceid;
private final server server;
private final boolean secure;
private map<string, string> metadata;
public ribbonserver(string serviceid, server server) {
this (serviceid, server, false , collections.<string, string> emptymap());
}
public ribbonserver(string serviceid, server server, boolean secure,
map<string, string> metadata) {
this .serviceid = serviceid;
this .server = server;
this .secure = secure;
this .metadata = metadata;
}
// 略
}
}
|
代码撸完,总结下。
普通使用resttemplate请求其他服务时,内部使用的就是常规的http请求实例发送请求。
为resttemplate增加了@loanbalanced 注解后,实际上通过配置,为resttemplate注入负载均衡拦截器,让负载均衡器选择根据其对应的策略选择合适的服务后,再发送请求。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.cnblogs.com/kongxianghai/p/8445030.html