SpringCloud学习6-如何创建一个服务消费者consumer

时间:2021-04-28 21:29:02

上一节如何创建一个服务提供者provider已经启动了一个provider的server,提供用户信息查询接口。接下来,我们启动另一个provider,由于是同一台机器本地测试,我们换一个端口

--server.port=8084

通过启动传参数覆盖port。这样,我们就有两个provider实例了。接下来,可以使用我们consumer负载均衡的消费这两个provider。

升级eureka依赖

eureka之前的pom依赖过期了,需要修改为

spring-cloud-starter-netflix-eureka-server

同样的,所有的client都要修改为

spring-cloud-starter-netflix-eureka-client

创建一个consumer工程

创建一个子模块。

https://github.com/Ryan-Miao/spring-cloud-Edgware-demo/tree/master/consumer-demo

配置基本和provider一致

<dependencies>
<!--springboot 依赖start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<!--springboot 依赖结束--> <dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
</dependency> <!--工具类 start-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency> <dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
</dependency>
<!--工具类end--> <!--内部依赖-->
<dependency>
<groupId>com.test</groupId>
<artifactId>provider-api</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--内部依赖end--> </dependencies>
  • spring-cloud-starter-netflix-eureka-client eureka客户端,负责维护注册和心跳
  • spring-cloud-starter-openfeign 声明式的HttpClient Feign客户端
  • spring-cloud-starter-netflix-ribbon 客户端负载均衡
  • spring-cloud-starter-netflix-hystrix http请求健康熔断
  • provider-api 我们定义好的provider请求的客户端

启动类

启动类和provider相同,多了一行注解

@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
public class ConsumerApplication { public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
} }
  • EnableFeignClients 启用Feign

Swagger等基础配置

同provider,提供几个简单api。省略叙述。

FeignClient 远程调用

创建一个接口,继承我们provider-api里声明的接口

@FeignClient(value = "PROVIDER-DEMO", fallback = UserClientFallback.class)
public interface UserClient extends UserApi {
}
  • FeignClient会标注这是一个Feign的客户端,在项目启动的时候就会扫描到,value是连接的service的名称,这里即我们的provider, fallback则是当远程请求失败的时候,服务降级,我们来决定做什么。

如果不填写fallback,则请求遇到非200会报错,抛出一个RuntimeException, HystrixRuntimeException. 有可能是远程返回500, 400等,也有可能是连接超时,还有可能是hystrix 熔断。

而填写了fallback, 则会在服务调用失败的时候,转调用我们对应的fallback方法。

fallback就是实现我们这个UserClient接口。

@Component
@RequestMapping("/userClientFallback")
public class UserClientFallback implements UserClient { @Override
public List<UserVo> list() {
UserVo userVo = new UserVo();
userVo.setAge(1);
userVo.setBirth(LocalDate.now());
userVo.setId(1);
userVo.setName("fallback");
return Lists.newArrayList(userVo);
} @Override
public String fallback() {
return "访问失败后调用此方法,进行服务降级.";
}
}
  • Component是要把这个Fallback注册到spring容器里,FeignClient在项目启动的时候会读取fallback, 然后从context里读取这个instance,如果没有找到,就启动失败、

见org.springframework.cloud.netflix.feign.HystrixTargeter#getFromContext

private <T> T getFromContext(String fallbackMechanism, String feignClientName, FeignContext context,
Class<?> beanType, Class<T> targetType) {
Object fallbackInstance = context.getInstance(feignClientName, beanType);
if (fallbackInstance == null) {
throw new IllegalStateException(String.format(
"No " + fallbackMechanism + " instance of type %s found for feign client %s",
beanType, feignClientName));
} if (!targetType.isAssignableFrom(beanType)) {
throw new IllegalStateException(
String.format(
"Incompatible " + fallbackMechanism + " instance. Fallback/fallbackFactory of type %s is not assignable to %s for feign client %s",
beanType, targetType, feignClientName));
}
return (T) fallbackInstance;
}
  • @RequestMapping 则是不得已而为之了。前文provider-demo里,我们把api抽取成UserApi
@RequestMapping("/api/v1/users")
public interface UserApi { @GetMapping("/")
List<UserVo> list(); @GetMapping("/fallback")
String fallback();
}

这里的RequestMapping会被spring启动的到时候扫描到,在初始化RequestMappingHandlerMapping的时候,扫描所有的bean,把RequestMapping的bean给注册RequestMapping. 这时候,它不管你是不是controller的。我们FeignClient所声明的接口上有@RequestMapping,也会被扫描。而我们Fallback也继承,也会有@RequestMapping,这时候重复定义RequestMapping会报错

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'requestMappingHandlerMapping' defined in class path resource [org/springframework/boot/autoconfigure/web/WebMvcAutoConfiguration$EnableWebMvcConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: Ambiguous mapping. Cannot map 'com.test.cloud.client.UserClient' method
public abstract java.util.List<com.test.cloud.vo.UserVo> com.test.cloud.api.UserApi.list()
to {[/api/v1/users/],methods=[GET]}: There is already 'userClientFallback' bean method
public java.util.List<com.test.cloud.vo.UserVo> com.test.cloud.client.UserClientFallback.list() mapped.

事实上,我们并不是要将FeignClient给注册到RequestMapping里的,而且OpenFeign也有自己的一套注解方案。只是spring-cloud为了方便集成和简化OpenFeign的用法,把Spring-Web的注解做了适配。不好的地方是RequestMapping的扫描并没有排除。

以下代码会找到方法注解@RequestMapping.

org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping#createRequestMappingInfo(java.lang.reflect.AnnotatedElement)

private RequestMappingInfo createRequestMappingInfo(AnnotatedElement element) {
RequestMapping requestMapping = AnnotatedElementUtils.findMergedAnnotation(element, RequestMapping.class);
RequestCondition<?> condition = (element instanceof Class ?
getCustomTypeCondition((Class<?>) element) : getCustomMethodCondition((Method) element));
return (requestMapping != null ? createRequestMappingInfo(requestMapping, condition) : null);
}

而RequestMapping这个bean创建完后会扫描所有bean, 并注册

org.springframework.web.servlet.handler.AbstractHandlerMethodMapping.MappingRegistry#register

public void register(T mapping, Object handler, Method method) {
this.readWriteLock.writeLock().lock();
try {
HandlerMethod handlerMethod = createHandlerMethod(handler, method);
assertUniqueMethodMapping(handlerMethod, mapping); if (logger.isInfoEnabled()) {
logger.info("Mapped \"" + mapping + "\" onto " + handlerMethod);
}
this.mappingLookup.put(mapping, handlerMethod); List<String> directUrls = getDirectUrls(mapping);
for (String url : directUrls) {
this.urlLookup.add(url, mapping);
} String name = null;
if (getNamingStrategy() != null) {
name = getNamingStrategy().getName(handlerMethod, mapping);
addMappingName(name, handlerMethod);
} CorsConfiguration corsConfig = initCorsConfiguration(handler, method, mapping);
if (corsConfig != null) {
this.corsLookup.put(handlerMethod, corsConfig);
} this.registry.put(mapping, new MappingRegistration<T>(mapping, handlerMethod, directUrls, name));
}
finally {
this.readWriteLock.writeLock().unlock();
}
}
private void assertUniqueMethodMapping(HandlerMethod newHandlerMethod, T mapping) {
HandlerMethod handlerMethod = this.mappingLookup.get(mapping);
if (handlerMethod != null && !handlerMethod.equals(newHandlerMethod)) {
throw new IllegalStateException(
"Ambiguous mapping. Cannot map '" + newHandlerMethod.getBean() + "' method \n" +
newHandlerMethod + "\nto " + mapping + ": There is already '" +
handlerMethod.getBean() + "' bean method\n" + handlerMethod + " mapped.");
}
}

总之,由于这个冲突,fallback必须制定一个随意不相干的url地址。等后面我学会怎么手动排除RequestMapping的时候就不用了。

接下来,直接调用FeignClient

@Api
@RestController
@RequestMapping("/api/v1/users")
public class UserController { private final UserClient userClient; @Autowired
public UserController(UserClient userClient) {
this.userClient = userClient;
} @GetMapping("/feign")
public List<UserVo> feign() {
return userClient.list();
} @GetMapping("/feign-fallback")
public String fallback() {
return userClient.fallback();
} }

在provider-api里,我设计userClient.list()返回用户列表,userClient.fallback()随机报500. 这样,启动,访问两个api可以观察到服务降级了。

关于Feign,Hystrix,Ribbon的配置

我目前用到的配置有以下几种,不全,暂时有这些

#eureka客户端ribbon刷新时间
#默认30s
ribbon.ServerListRefreshInterval: 5000 # ribbon默认配置
#ribbon.ConnectTimeout=250
#ribbon.ReadTimeout=1000
#ribbon.OkToRetryOnAllOperations=true
#ribbon.MaxAutoRetriesNextServer=2
#ribbon.MaxAutoRetries=0 # feign日志配置, 指定某个service的日志级别
#logging.level.com.test.cloud.client.UserClient: info # ribbon全局默认连接和等待时间
ribbon.ConnectTimeout: 1000
ribbon.ReadTimeout: 10000 # ribbon指定service的连接和等待时间,注意service的名称要和在FeignClient注解里标注的内容一致, 要大写
PROVIDER-DEMO.ribbon.ConnectTimeout: 1000
PROVIDER-DEMO.ribbon.ReadTimeout: 1000 # feign全局开启hystrix支持,默认false
feign.hystrix.enabled: true
# hystrix全局默认超时时间
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds: 5000
# hystrix指定request的单独设置超时时间, commandkey的组成为ClientClassName#methodName(ParamTypeClassName..)
hystrix.command.UserClient#list().execution.isolation.thread.timeoutInMilliseconds: 5000

需要注意的是,需要理解几个超时的概念。即,需要明白hystrix是干啥的,ribbon又是干啥的,Feign如何把它们集成的。

Feign

OpenFeign可以配置超时,日志,序列化和反序列化,重试等。只要手动声明对应的bean即可。具体配置见

org.springframework.cloud.netflix.feign.FeignClientsConfiguration

值得注意的是,默认不会重试

@Bean
@ConditionalOnMissingBean
public Retryer feignRetryer() {
return Retryer.NEVER_RETRY;
}

以及,默认不会采用hystrix

@Configuration
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "feign.hystrix.enabled", matchIfMissing = false)
public Feign.Builder feignHystrixBuilder() {
return HystrixFeign.builder();
}
}

需要引入hystrix class和配置

feign.hystrix.enabled: true

Hystrix

有关具体原理信息,参见官网。个人简单理解,Hystrix为每个依赖的服务创建一个线程池,服务在线程池里执行,hystrix会有一些策略决定什么时候执行超时,还可以获得执行结果的成功率。于是可以指定一些策略,比如超时后中断线程,比如成功率在某一段时间低于阀值后拒绝服务执行。这样就像一个保险丝一样,当不满足我们设置的策略时,直接烧断了,从而起到保护服务资源的作用。当然,实现会更复杂,还有恢复机制。

所以,hystrix会有个超时的配置,决定线程执行时间。

# hystrix全局默认超时时间
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds: 5000
# hystrix指定request的单独设置超时时间, commandkey的组成为ClientClassName#methodName(ParamTypeClassName..)
hystrix.command.UserClient#list().execution.isolation.thread.timeoutInMilliseconds: 5000

在Feign集成Hystrix的时候,把ClientClassName#methodName(ParamTypeClassName..)设置成Hystrix的CommandKey, CommandKey就是hystrix执行策略的最小单位,比如对应某个http请求,对应这个请求的最长时间即我们设置的超时。

feign.Feign#configKey(java.lang.Class, java.lang.reflect.Method)

public static String configKey(Class targetType, Method method) {
StringBuilder builder = new StringBuilder();
builder.append(targetType.getSimpleName());
builder.append('#').append(method.getName()).append('(');
for (Type param : method.getGenericParameterTypes()) {
param = Types.resolve(targetType, targetType, param);
builder.append(Types.getRawType(param).getSimpleName()).append(',');
}
if (method.getParameterTypes().length > 0) {
builder.deleteCharAt(builder.length() - 1);
}
return builder.append(')').toString();
}

Feign会把host当作groupkey, 这里则是我们的服务名。

SpringCloud学习6-如何创建一个服务消费者consumer

当然,还有更多细节的配置,比如线程池,时间窗口大小等。见官网Configuration

Ribbon

Ribbon采用客户端负载均衡。与服务端负载均衡对应,比如我们访问baidu.com, 域名解析器后转向某个负载均衡设备来决定我们的请求打到哪台机器上,对于我们请求者来说是透明的,我们不知道负载信息。

而Ribbon则是自己维护所有可用的服务列表,根据某种策略,去选择请求哪个服务实例。比如随机选取,线性轮询选取,在线性轮询的基础上重试选取,权重选取,Zone优先选取等。

在Feign集成Ribbon的时候,把两个超时时间委托给Ribbon。

public FeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig,
ServerIntrospector serverIntrospector) {
super(lb, clientConfig);
this.setRetryHandler(RetryHandler.DEFAULT);
this.clientConfig = clientConfig;
this.connectTimeout = clientConfig.get(CommonClientConfigKey.ConnectTimeout);
this.readTimeout = clientConfig.get(CommonClientConfigKey.ReadTimeout);
this.serverIntrospector = serverIntrospector;
}

在不和Ribbon集成的时候,OpenFeign会设置连接超时和读取超时

feign.Client.Default#convertAndSend

 final HttpURLConnection
connection =
(HttpURLConnection) new URL(request.url()).openConnection();
if (connection instanceof HttpsURLConnection) {
HttpsURLConnection sslCon = (HttpsURLConnection) connection;
if (sslContextFactory != null) {
sslCon.setSSLSocketFactory(sslContextFactory);
}
if (hostnameVerifier != null) {
sslCon.setHostnameVerifier(hostnameVerifier);
}
}
connection.setConnectTimeout(options.connectTimeoutMillis());
connection.setReadTimeout(options.readTimeoutMillis());

而和Ribbon集成后,Feign会读取ribbon的两个时间设置,即

# ribbon全局默认连接和等待时间
ribbon.ConnectTimeout: 1000
ribbon.ReadTimeout: 10000 # ribbon指定service的连接和等待时间,注意service的名称要和在FeignClient注解里标注的内容一致, 要大写
PROVIDER-DEMO.ribbon.ConnectTimeout: 1
PROVIDER-DEMO.ribbon.ReadTimeout: 1

关于单独执行某个服务的超时配置,区别Ribbon全局时间配置,这个idea没有自动提示,debug了半天源码,找到配置为服务名大写+.ribbon.ConnectTimeout

com.netflix.client.config.DefaultClientConfigImpl#getInstancePropName(java.lang.String, java.lang.String)

public String getInstancePropName(String restClientName, String key) {
return restClientName + "." + this.getNameSpace() + "." + key;
}

这里设置为1只是为了测试超时设置。debug追踪发现,确实如此。这种最佳实践真的只能自己去实践。

调优

由于http rest请求的复杂性,可能需要调整超时时间,心跳时间,甚至根据当前服务的请求速率设置线程池大小和排队大小,设置熔断条件等。这个只能在监控上线后,根据监控信息去对应修改需要的配置。目前我还没有最佳实践,不乱说了。

结尾

到这里,在启动了eureka,provider之后,启动consumer就可以实现远程调用了。嗯,基本满足开发需求了。访问feign的接口,观察admin里两个provider的请求,可以发现我们的请求确实负载到不同的instance上了。访问fallback接口,可以看到失败的时候会执行我们的降级策略。

Miao语

基础很重要,基础很重要,基础非常重要。