这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
背景
由于一些老项目,使用的注册中心还是Zookeeper
,众所周知在Spring Cloud
组件中是有客户端的负载均衡组件Spring Cloud LoadBalancer
会存在客户端缓存。
那么就会出现一个问题:
由于服务提供者已经在Zookeeper
下线了,而客户端缓存了旧的ServiceInstance
数据,导致调用失败。
之前也在spring-cloud-zookeeper提过这个issues,不过没人理我,所以需要自己改造
改造思路
知道了问题所在改造起来就非常容易了,思路很简单,就是服务提供者在Zookeeper
下线后需要客户端去删除客户端的本地缓存
所以我们需要知道Zookeeper
本地缓存在哪。接下来就是我们源码分析找找看
客户端获取消费者(ServiceInstance
)源码分析
我们知道Spring Cloud
统一了服务变成模型,有一个DiscoveryClient
接口,所以我们直接看DiscoveryClient
接口的实现类
然后我们简单看看ZookeeperDiscoveryClient
获取服务的方法实现
这一段方法比较简单,就是去zookeeper
获取注册数据,没有缓存,那么客户端缓存是再哪里缓存的呢。我们必须找到调用的缓存的地方
可以看到这里是响应式获取数据,也没有缓存,我还需要向上寻找
功夫不负有心人,我们总算找到了这个缓存类
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
.getBeanProvider(LoadBalancerCacheManager.class);
如果看过我之前的这篇Spring Cloud落地之Spring Cloud LoadBalancer 线上优化方案
就知道他的缓存用的什么缓存,这里我们就不再介绍使用的什么缓存了。只需要知道我们拿到了这个缓存,就可以做我们想做的事情了。
改造缓存分布式删除
首先这里的客户端缓存是本地缓存,我们的机器一般是部署了多个节点,我们需要删除所有节点的缓存。
所以我们可以这么设计
- 客户端(网关)直接使用
Zookeeper
的事件监听然后去删除缓存 - 由下线的服务提供者去调用客户端(网关的接口),然后客户端通知其他节点一起删除缓存
现在又两种方案,最简单的方案肯定是第一种
Zookeeper事件监听
实现代码大致如下
@Component
@Slf4j
public class ZookeeperListener implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Resource
private CuratorFramework curatorClient;
@Value("${spring.cloud.zookeeper.discovery.root}")
private String path;
@PostConstruct
public void init() {
//当前节点
CuratorCache curatorCache = CuratorCache.builder(curatorClient, path).build();
//监听子节点,不监听当前节点
CuratorCacheListener pathCacheListener = CuratorCacheListener
.builder()
.forPathChildrenCache(path, curatorClient, (client, event) -> {
String type = event.getType().name();
log.info("PathChildrenCacheListener {}", type);
if (Objects.equals(event.getType(), PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = applicationContext
.getBeanProvider(LoadBalancerCacheManager.class);
LoadBalancerCacheManager ifAvailable = cacheManagerProvider.getIfAvailable();
assert ifAvailable != null;
Cache cache = ifAvailable.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (Objects.nonNull(cache)) {
// todo 这里需要删除指定key 而不是全量清除缓存
cache.clear();
}
log.info("本地缓存清除完成");
}
}).build();
curatorCache.listenable().addListener(pathCacheListener);
curatorCache.start();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
在写完代码上线测试发现比较多的问题,大致如下
- Zookeeper 不同版本导致事件监听失效
由于我们zk线上版本是3.5,测试是3.7.导致这段代码测试环境有效线上报错
- Zookeeper 事件延迟
- Zookeeper 事件存在丢失的情况
http删除缓存
Zookeeper
事件监听不靠谱我们就使用第二种方案
多节点的缓存删除我们使用redis作通知
- RedissonConfig
@Configuration
public class RedissonConfig {
@Value("${redis..host}")
private String redisLoginHost;
@Value("${redis..port}")
private Integer redisLoginPort;
@Value("${redis..password}")
private String redisLoginPassword;
@Bean
public RedissonClient redissonClient() {
return createRedis(redisLoginHost, redisLoginPort, redisLoginPassword);
}
private RedissonClient createRedis(String redisHost, Integer redisPort, String redisPassword) {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://" + redisHost + ":" + redisPort + "");
if (DataUtils.isNotEmpty(redisPassword)) {
singleServerConfig.setPassword(redisPassword);
}
return Redisson.create(config);
}
}
- RedisSubscriber
@Component
@Slf4j
public class RedisSubscriber implements ApplicationRunner, ApplicationContextAware {
public static final String GRACEFUL_SHUTDOWN = "graceful-shutdown";
private ApplicationContext applicationContext;
@Autowired
private RedissonClient redisson;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void run(ApplicationArguments args) {
RTopic topic = redisson.getTopic(GRACEFUL_SHUTDOWN);
topic.addListener(ClientDTO.class, (channel, clientDTO) -> {
String applicationName = clientDTO.getApplicationName();
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = applicationContext
.getBeanProvider(LoadBalancerCacheManager.class);
LoadBalancerCacheManager ifAvailable = cacheManagerProvider.getIfAvailable();
assert ifAvailable != null;
Cache cache = ifAvailable.getCache(SERVICE_INSTANCE_CACHE_NAME);
if (Objects.nonNull(cache)) {
List<ZookeeperServiceInstance> serviceInstances = cache.get(applicationName, List.class);
if (DataUtils.isNotEmpty(serviceInstances)) {
List<ZookeeperServiceInstance> collect = serviceInstances.stream().filter(s -> {
ServiceInstance<ZookeeperInstance> serviceInstance = s.getServiceInstance();
String id = serviceInstance.getId();
return !Objects.equals(id, clientDTO.getId());
}).collect(Collectors.toList());
cache.put(applicationName, collect);
log.info("本地缓存清除完成 id {} ", clientDTO.getId());
}
else {
log.info("本地缓存null");
}
}
});
}
}
- controller
@GetMapping("/flushCache")
public Map<String, Object> flushCache(ClientDTO clientDTO) {
log.info("flushCache, applicationName : {}", clientDTO.getApplicationName());
if (DataUtils.isNotEmpty(clientDTO)) {
RTopic topic = redissonClient.getTopic(GRACEFUL_SHUTDOWN);
topic.publish(clientDTO);
log.info("flushCache 发送缓存topic, applicationName : {}", clientDTO.getApplicationName());
}
Map<String, Object> result = new HashMap<>();
result.put("code", 100);
result.put("message", "ok");
return result;
}
这样我们服务提供者在销毁的时候注销zk,然后调用该接口去删除客户端缓存,就可以解决如下问题。实现Spring Cloud Zookeeper
的优雅下线
客户端优雅下线sdk
我们可以给接入的服务消费者提供一个简单的sdk,在接受到Spring ContextClosedEvent事件后进行调用上面的接口清除缓存
核心代码如下
public void gracefulShutdown() {
this.serviceRegistry.deregister(this.serviceInstanceRegistration);
log.info("shutdown 注销Zookeeper服务");
this.serviceRegistry.close();
log.info("shutdown 关闭Zookeeper连接");
try {
ServiceInstance<ZookeeperInstance> instance = this.serviceInstanceRegistration.getServiceInstance();
String serviceName = this.serviceInstanceRegistration.getServiceInstance().getName();
String host = this.serviceInstanceRegistration.getServiceInstance().getAddress();
String id = this.serviceInstanceRegistration.getServiceInstance().getId();
String url = String.format("%s?applicationName=%s&host=%s&id=%s", this.flushCacheUrl, serviceName, host, id);
String ret = OkHttpUtils.get(url);
log.info("ret: {}", ret);
} catch (Exception var7) {
log.error("flush cache error : {}", this.flushCacheUrl);
}
}
总结
基于该方案改造后,线上服务发版下线就再也没有报错了,非常优雅