【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

时间:2022-12-13 21:56:13

【SpringCloud负载均衡】【源码+图解】【二】LoadBalancer配置

3. LoadBalancer的工作原理

当客户端发起请求后被LoadBalancerInterceptor拦截,看下拦截的方法

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
    // 即BlockingLoadBalancerClient
	private LoadBalancerClient loadBalancer;
    // 即LoadBalancerRequestFactory
	private LoadBalancerRequestFactory requestFactory;

	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		final URI originalUri = request.getURI();
        // 获取url的host,如上文中UserController中的"http://product/prod",则serviceName(后文也指serviceId)为product
		String serviceName = originalUri.getHost();
		return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
	}

}

当请求被拦截后LoadBalancer的整个工作流程

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

具体过程如下:

  1. 将普通httpRequest封装成BlockingLoadBalancerRequest
  2. 如果当前serviceId对应的上下文不存在则创建上下文
  3. 从上下文中获取负载均衡器
  4. 利用负载均衡器选取合适的serviceInstance
  5. 向具体的serviceInstance发送请求

3.1 创建LoadBalancerRequest

public class LoadBalancerRequestFactory {
    public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) {
        // 将普通的HttpRequest包装成LoadBalancerRequest
		return new BlockingLoadBalancerRequest(loadBalancer, transformers,
				new BlockingLoadBalancerRequest.ClientHttpRequestData(request, body, execution));
	}
}

BlockingLoadBalancerRequest的关键在于apply函数,后续再详细分析

3.2 创建上下文

在LoadBalancer中LoadBalancerClientFactory负责管理yml文件中的配置、微服务的应用上下文ApplicationContext,每一个serviceId对应一个ApplicationContext,而这个ApplicationContext中最关键的是提供服务列表的Supplier和选择服务的ReactiveLoadBalancer。先看LoadBalancerClientFactory类图

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

然后我们将LoadBalancerClientFactory的属性具象化,方便理解,看下图

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

接下来详细介绍下这三个参数

3.2.1 properties

它的载体为LoadBalancerClientsProperties类,我们看下它的类图

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

结合类图,我们就可以列出关于loadBalance的相关配置

spring:
  cloud:
	loadbalancer:
      health-check:
        initial-delay: 0s
        interval: 25s
        refetch-instances: false
        refetch-instances-interval: 25s
        path:
          default: /actuator/health
        repeat-health-check: true
      hint:
        my-hint: hint-example
      hint-header-name: "X-SC-LB-Hint"
      retry:
        enabled: true
        retry-on-all-operations: false
        max-retries-on-same-service-instance: 0
        max-retries-on-next-service-instance: 1
        retryable-status-codes:
        - 404
        backoff:
          enabled: false
          min-backoff: 5
          max-backoff: 10 
          jitter: 0.5
      sticky-session:
        instance-id-cookie-name: sc-lb-instance-id
        add-service-instance-cookie: false
      use-raw-status-code-in-response-data: false
      x-forwarded:
        enabled: false
      clients:
        product: 
          # client的配置与前面相同,参考上面的类图

3.2.2 configurations

默认情况下会有下面四个配置类,也就是对于每一个ApplicationContext都会加载的配置,这些配置主要是注入负载均衡所需要的的Balancer和实例提供者Supplier

default.org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration
org.springframework.cloud.netflix.eureka.loadbalancer.EurekaLoadBalancerClientConfiguration
default.org.springframework.cloud.loadbalancer.config.BlockingLoadBalancerClientAutoConfiguration
default.org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration

当然也可以自己配置,如下

@LoadBalancerClient(name = "my-loadbalance-config")
public class MyLoadbalanceConfiguration {
    
}

然后在NamedContextFactory.setConfigurations方法打断点,启动后发起http请求既可以看到自定义的配置已经被添加到了configurations

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

那么,这些configurations用来干嘛呢?接下来的contexts揭晓

3.2.3 contexts

前面说过,对于每一个不同的serviceId都会创建一个不同的上下文,那么我们分析下创建的过程,即NamedContextFactory.createContext

createContext位于它的父类NamedContextFactory中,接下来看下源码

public abstract class NamedContextFactory<C extends NamedContextFactory.Specification>
		implements DisposableBean, ApplicationContextAware {
    // name即为serviceId
    protected AnnotationConfigApplicationContext createContext(String name) {
		AnnotationConfigApplicationContext context;
        // 默认下parent为AnnotationConfigServletWebServerApplicationContext
		if (this.parent != null) {
			......
			context = new AnnotationConfigApplicationContext(beanFactory);
			context.setClassLoader(this.parent.getClassLoader());
		}
		else {
			context = new AnnotationConfigApplicationContext();
		}
        // 也就是说可以通过设置@LoadBalancerClient的name属性给不同的serviceId配置不同的configuration
		if (this.configurations.containsKey(name)) {
			for (Class<?> configuration : this.configurations.get(name).getConfiguration()) {
				context.register(configuration);
			}
		}
        // 注册默认的配置,即前面说到的那四个以及@LoadBalancerClient的name不赋值的configuration
		for (Map.Entry<String, C> entry : this.configurations.entrySet()) {
			if (entry.getKey().startsWith("default.")) {
				for (Class<?> configuration : entry.getValue().getConfiguration()) {
					context.register(configuration);
				}
			}
		}
        // defaultConfigType: LoadBalancerClientConfiguration
		context.register(PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType);
        // 设置loadbalancer.client.name = serviceId
		context.getEnvironment().getPropertySources().addFirst(new MapPropertySource(this.propertySourceName,
				Collections.<String, Object>singletonMap(this.propertyName, name)));
		if (this.parent != null) {
			context.setParent(this.parent);
		}
		context.setDisplayName(generateDisplayName(name));
        // 刷新上下文,注入configurations中的beans
		context.refresh();
		return context;
	}
}

默认情况下我们看下**context.refresh()**都注入了哪些依赖

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

3.3 获取ReactiveLoadBalancer

上一小节结束后对于**LoadBalancerClientFactory.getInstance(String serviceId)**就能明白其中的工作原理了,看下源码

public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
		implements ReactiveLoadBalancer.Factory<ServiceInstance> {
    @Override
	public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
        // 1、首先从contexts中获取serviceId的ApplicationContext
        // 2、从ApplicationContext中获取ReactorServiceInstanceLoadBalancer的bean,默认下是RoundRobinLoadBalancer,即轮询选择器
		return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
	}
}

3.4 获取ServiceInstance

接下来就到**RoundRobinLoadBalancer.choose(Request request)**挑选实例,也就是负载均衡最关键的一步了。

首先看下RoundRobinLoadBalancer的类图,了解下它的功能与属性

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

position:上一次选取的实例的下标

serviceId:应用名字

serviceInstanceListSupplierProvider:获取serviceId的所有实例

其次看下RoundRobinLoadBalancer的配置

public class LoadBalancerClientConfiguration {
	@Bean
	@ConditionalOnMissingBean
	public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
			LoadBalancerClientFactory loadBalancerClientFactory) {
        // 注意,这里的name就是前面创建context时在配置的loadbalancer.client.name,也即当前的serviceId
		String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
		return new RoundRobinLoadBalancer(
            // 这里返回的是ClientFactoryObjectProvider,它的作用其实就是LoadBalancerClientFactory的一个中间代理,
            // 这个代理只提供ServiceInstanceListSupplier类
				loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
	}
}

最后choose的过程

public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    public Mono<Response<ServiceInstance>> choose(Request request) {
        // 默认下为CachingServiceInstanceListSupplier
		ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
				.getIfAvailable(NoopServiceInstanceListSupplier::new);
		return supplier.get(request) // 对于Supplier的工作原理见后面的4.2节
            // 1、先获取符合条件的instances列表
            .next() 
            // 2、从列表中轮询获取单个serviceInstance
			.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances)); 
	}
    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
		if (instances.isEmpty()) {
			return new EmptyResponse();
		}
		// 当前pos = lastPos + 1
		int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;
        // 求余实现轮询
		ServiceInstance instance = instances.get(pos % instances.size());
		return new DefaultResponse(instance);
	}
}

3.5 向serviceInstance请求结果

上一步挑选到serviceinstance后就该到向它发送请求获取结果了,该请求由BlockingLoadBalancerRequest发出.

先看下BlockingLoadBalancerRequest的类图

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

BlockingLoadBalancerRequest发出请求是在apply方法,看下源码

class BlockingLoadBalancerRequest implements HttpRequestLoadBalancerRequest<ClientHttpResponse> {
	@Override
	public ClientHttpResponse apply(ServiceInstance instance) throws Exception {
		HttpRequest serviceRequest = new ServiceRequestWrapper(clientHttpRequestData.request, instance, loadBalancer);
		if (this.transformers != null) {
            // LoadBalancerServiceInstanceCookieTransformer和XForwardedHeadersTransformer
			for (LoadBalancerRequestTransformer transformer : this.transformers) {
                // 进一步封装 request
				serviceRequest = transformer.transformRequest(serviceRequest, instance);
			}
		}
        // 底层的InterceptingClientHttpRequest发送http请求获取结果,不再详细分析
		return clientHttpRequestData.execution.execute(serviceRequest, clientHttpRequestData.body);
	}
}

整体过程如下图

【SpringCloud负载均衡】【源码+图解】【三】LoadBalancer的工作原理

这里着重分析下LoadBalancerServiceInstanceCookieTransformerXForwardedHeadersTransformer,看看两者对request做了什么

3.5.1 LoadBalancerServiceInstanceCookieTransformer

public class LoadBalancerServiceInstanceCookieTransformer implements LoadBalancerRequestTransformer {
    @Override
	public HttpRequest transformRequest(HttpRequest request, ServiceInstance instance) {
        // 如果spring.cloud.loadbalancer.clients.{serviceId}存在,则获取spring.cloud.loadbalancer.clients.{serviceId}.stickySession
        // 否则获取默认的spring.cloud.loadbalancer.stickySession
		LoadBalancerProperties.StickySession stickySession = factory != null
				? factory.getProperties(instance.getServiceId()).getStickySession() : stickySessionProperties;
        // 获取stickySession.instanceIdCookieName,默认下为sc-lb-instance-id
		String instanceIdCookieName = stickySession.getInstanceIdCookieName();
		HttpHeaders headers = request.getHeaders();
		List<String> cookieHeaders = new ArrayList<>(request.getHeaders().getOrEmpty(HttpHeaders.COOKIE));
        // 创建cookie
		String serviceInstanceCookie = new HttpCookie(instanceIdCookieName, instance.getInstanceId()).toString();
		cookieHeaders.add(serviceInstanceCookie);
        // 将cookie放到request的headers
		headers.put(HttpHeaders.COOKIE, cookieHeaders);
		return request;
	}
}

3.5.2 XForwardedHeadersTransformer

public class XForwardedHeadersTransformer implements LoadBalancerRequestTransformer {
	@Override
	public HttpRequest transformRequest(HttpRequest request, ServiceInstance instance) {
        // 如果spring.cloud.loadbalancer.clients.{serviceId}存在,则获取spring.cloud.loadbalancer.clients.{serviceId}.xForwarded
        // 否则获取默认的spring.cloud.loadbalancer.xForwarded
		LoadBalancerProperties.XForwarded xForwarded = factory.getProperties(instance.getServiceId()).getXForwarded();
        // 默认为false
		if (xForwarded.isEnabled()) {
			HttpHeaders headers = request.getHeaders();
			String xForwardedHost = request.getURI().getHost();
			String xforwardedProto = request.getURI().getScheme();
			headers.add("X-Forwarded-Host", xForwardedHost);
			headers.add("X-Forwarded-Proto", xforwardedProto);
		}
		return request;
	}

}

3.5.3实现自定义的HeadersTransformer

// 1、@Component注入spring容器
@Component
// 2、实现LoadBalancerRequestTransformer
public class MyHeadersTransformer implements LoadBalancerRequestTransformer{

    // 3、实现transformRequest方法,添加自定义逻辑
    @Override
    public HttpRequest transformRequest(HttpRequest request, ServiceInstance instance) {
        System.out.println("MyHeadersTransformer");
        return request;
    }

}

未完待续