Ribbon客户端负载均衡策略测试及其改进

时间:2024-10-20 19:15:13
import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.function.client.WebClient; import com.itmuch.cloud.study.user.entity.User; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; @Slf4j @Api(tags = "负载均衡节点") @RestController @RequestMapping("/node") public class NodeController { @Autowired private WebClient webClient; @Value("${microservice-ribbon-user.ribbon.listOfServers}") private List<String> listOfServers; private ExecutorService executorService = Executors.newFixedThreadPool(10); @ApiOperation("查询用户") @GetMapping("/user/{id}") public List<User> findById(@PathVariable Long id) throws InterruptedException { // WebClient支持异步 List<User> users = new CopyOnWriteArrayList<User>(); listOfServers.stream() .forEach(hostWithPort -> webClient.get() .uri(String.format("http://%s/%s", hostWithPort, id))// URI .acceptCharset(StandardCharsets.UTF_8) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(User.class) .subscribe(resp -> users.add(resp))); int index = 0; while (users.isEmpty() && (index++) < 100) { TimeUnit.MILLISECONDS.sleep(10); log.info("index:{}, waitting......", index); } if (users.isEmpty()) { throw new RuntimeException("查询超时,无返回值"); } return users; } @ApiOperation("查询用户 by execute") @GetMapping("/v0/user/{id}") public List<User> findByExecute(@PathVariable Long id) throws InterruptedException { // List<User> users = new ArrayList<User>(); // TODO ArrayList users一定概率有null值 // 原因:通过new ArrayList<>()初始化的大小是0,首次插入触发扩容,并发可能导致出现null值 List<User> users = new CopyOnWriteArrayList<User>(); listOfServers.stream() .forEach(hostWithPort -> executorService.execute(() -> webClient.get() .uri(String.format("http://%s/%s", hostWithPort, id))// URI .acceptCharset(StandardCharsets.UTF_8) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(User.class) .subscribe(resp -> users.add(resp)))); int index = 0; while (users.isEmpty() && (index++) < 100) { TimeUnit.MILLISECONDS.sleep(10); log.info("index:{}, waitting......", index); } if (users.isEmpty()) { throw new RuntimeException("查询超时,无返回值"); } return users; } @ApiOperation("查询用户 by submit") @GetMapping("/v1/user/{id}") public List<User> findBySubmit(@PathVariable Long id) throws InterruptedException { List<User> users = new CopyOnWriteArrayList<User>(); listOfServers.stream() .forEach(hostWithPort -> executorService.submit(() -> webClient.get() .uri(String.format("http://%s/%s", hostWithPort, id))// URI .acceptCharset(StandardCharsets.UTF_8) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(User.class) .subscribe(resp -> users.add(resp)), users)); int index = 0; while (users.isEmpty() && (index++) < 100) { TimeUnit.MILLISECONDS.sleep(10); log.info("index:{}, waitting......", index); } if (users.isEmpty()) { throw new RuntimeException("查询超时,无返回值"); } return users; } @ApiOperation("查询用户 by invokeAny") @GetMapping("/v2/user/{id}") public User findByInvokeAny(@PathVariable Long id) throws InterruptedException, ExecutionException, TimeoutException { return executorService.invokeAny(listOfServers.stream().map(hostWithPort -> new Callable<User>() { @Override public User call() { Mono<User> mono = webClient.get() .uri(String.format("http://%s/%s", hostWithPort, id))// URI .acceptCharset(StandardCharsets.UTF_8) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(User.class); return mono.block(); } }).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS); } @ApiOperation("查询用户 by invokeAll") @GetMapping("/v3/user/{id}") public List<User> findByInvokeAll(@PathVariable Long id) throws InterruptedException { List<Future<User>> futures = executorService.invokeAll(listOfServers.stream().map(hostWithPort -> new Callable<User>() { @Override public User call() { Mono<User> mono = webClient.get() .uri(String.format("http://%s/%s", hostWithPort, id))// URI .acceptCharset(StandardCharsets.UTF_8) .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(User.class); return mono.block(); } }).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS); List<User> users = new ArrayList<User>(); for (Future<User> future : futures) { try { users.add(future.get()); } catch (Exception e) { log.error(e.getMessage(), e); } } return users; } }