前言
当前业务需求: 客户端传送数据的content_type: application/json, post请求,服务器需要从客request的中取出body进行网关的验权,然后把处理之后的数据重新封装到body中,经历查找问题解决问题的各种坑之后,痛定思痛,以此博客,希望能帮到有需求的人
获取body成功的 版本结合如下: 其他的版本不保证适合本博客
springboot 2.0.6.RELEASE + springcloud Finchley.SR2 + spring cloud gateway
maven依赖如下:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!-- spring cloud 依赖-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
采坑历史
一开始使用的是:springboot 2.0.3.RELEASE + springcloud Finchley.RELEASE + spring cloud gateway, 使用的是网上铺天盖地的下面的写法来获取body,不是批判,只是觉得在技术上对于存在问题的应该要自己去亲自验证,并且秉着尊重技术的原则,不转载传播错误的博客。获取body有异常的方法:
/**
* 获取请求体中的字符串内容
* @param serverHttpRequest
* @return
*/
private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest){
//获取请求体
Flux<DataBuffer> body = serverHttpRequest.getBody();
StringBuilder sb = new StringBuilder();
body.subscribe(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
DataBufferUtils.release(buffer);
String bodyString = new String(bytes, StandardCharsets.UTF_8);
sb.append(bodyString);
});
return sb.toString();
}
该方法在获取body的时候,对于json的获取经常出现不完整,问题描述见查错过程
查错过程
出现的错误:服务器获取的body不完整
查看nginx日志:nginx查看客户端的信息是完整的
结果是各种查错,找问题:
官方文档也有大量针对这个问题的issues:https://github.com/spring-cloud/spring-cloud-gateway/issues/509, 本人也在里面提问了,可能是因为蹩脚的英文,直接被忽视了,不过看到前人的一些提问,查看源码后, 另外github上这边博客给了很大 的启发,特此结合自己的验证: https://github.com/xiaozhiliaoo/gateway-sample2/blob/master/src/main/java/org/lili/ApiLocator.java
模拟场景一: application/x-www-form-urlencoded 表单提交数据
postman:
服务器接收 Controller层
gateway的处理:
代码
@Bean
public RouteLocator activityRouter(RouteLocatorBuilder builder) {
RouteLocatorBuilder.Builder routes = builder.routes();
RouteLocatorBuilder.Builder serviceProvider = routes
/**
* 拦截 /test/** 的所有请求,lb:// 代表将请求通过负载均衡路由到ORDER_SERVICE服务上面
* Hystrix 支持两个参数:
* name:即HystrixCommand的名字
* fallbackUri:即 fallback 对应的 uri,这里的 uri 仅支持forward: schemed 的
*
*/
.route(RouteIdEnum.TEST_ROUTE_ID.getCode(),
r -> r.readBody(String.class, requestBody -> {
logger.info("requestBody is {}", requestBody);
// 这里不对body做判断处理
return true;
}).and().path(contextPath.concat("/test/**"))
.filters(f -> f.stripPrefix(1).requestRateLimiter(c -> c.setRateLimiter(apiRedisRateLimiter()).setKeyResolver(apiKeyResolver))
.filter(parameterWrapFilter).hystrix(h -> h.setName("activityInnerHystrixCommand").setFallbackUri("forward:/activityInner/hystrixFallback")))
.uri("lb://".concat("ORDER_SERVICE")
)
;
RouteLocator routeLocator = serviceProvider.build();
logger.info("ActivityServiceRouter is loading ... {}", routeLocator);
return routeLocator;
}
模拟场景二: application/json 提交数据
postman
服务器接收,controller
gateway
@Bean
public RouteLocator activityRouter(RouteLocatorBuilder builder) {
RouteLocatorBuilder.Builder routes = builder.routes();
RouteLocatorBuilder.Builder serviceProvider = routes
/**
* 拦截 /test/** 的所有请求,lb:// 代表将请求通过负载均衡路由到ISHANGJIE_ORDER_SERVICE服务上面
* Hystrix 支持两个参数:
* name:即HystrixCommand的名字
* fallbackUri:即 fallback 对应的 uri,这里的 uri 仅支持forward: schemed 的
*
*/
.route(RouteIdEnum.TEST_ROUTE_ID.getCode(),
r -> r.readBody(Object.class, requestBody -> {
logger.info("requestBody is {}", requestBody);
// 这里不对body做判断处理
return true;
}).and().path(contextPath.concat("/test/**"))
.filters(f -> f.stripPrefix(1).requestRateLimiter(c -> c.setRateLimiter(apiRedisRateLimiter()).setKeyResolver(apiKeyResolver))
.filter(parameterWrapFilter).hystrix(h -> h.setName("activityInnerHystrixCommand").setFallbackUri("forward:/activityInner/hystrixFallback")))
.uri("lb://".concat(ServiceConstant.ISHANGJIE_ORDER_SERVICE))
)
;
RouteLocator routeLocator = serviceProvider.build();
logger.info("ActivityServiceRouter is loading ... {}", routeLocator);
return routeLocator;
}
两种场景都是在filter中通过 Object object = exchange.getAttribute(“cachedRequestBodyObject”); 来获取body数据
代码样例:
package com.ald.ishangjie.controller.filter.common;
import io.netty.buffer.ByteBufAllocator;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
/**
* @desc: 统一整理客户端的请求和重新封装参数
* @date: 2019/1/3 17:52
*/
@Component
public class ParameterWrapFilter implements GatewayFilter, Ordered {
private Logger logger = LoggerFactory.getLogger(ParameterWrapFilter.class);
@Value("${default.app-version}")
private int appVersion;
@Value("${wechat.applet.token_secret}")
private String TOKEN_SECRET;
/**
* 过滤业务处理
* @param exchange
* @param chain
* @return
*/
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest serverHttpRequest = exchange.getRequest();
ServerHttpResponse serverHttpResponse = exchange.getResponse();
HttpMethod method = serverHttpRequest.getMethod();
String contentType = serverHttpRequest.getHeaders().getFirst(HttpHeaders.CONTENT_TYPE);
String userName = serverHttpRequest.getHeaders().getFirst(CommonConstant.KEY_USERNAME);
String netType = serverHttpRequest.getHeaders().getFirst(CommonConstant.KEY_NET_TYPE);
String mapToken = serverHttpRequest.getHeaders().getFirst("mapToken");
String id = "";
Long userId = null;
URI uri = serverHttpRequest.getURI();
String path = uri.getPath();
InetSocketAddress remoteAddress = exchange.getRequest().getRemoteAddress();
String ip = remoteAddress.getHostName();
//post请求并且 content-type 为 application/json 或者 application/x-www-form-urlencoded
if (method == HttpMethod.POST) {
//从请求里获取Post请求体 参考 https://github.com/xiaozhiliaoo/gateway-sample2/tree/master
Object object = exchange.getAttribute("cachedRequestBodyObject");
JSONObject json = (JSONObject) JSONObject.toJSON(object);
String bodyStr = json.toJSONString();
/* String bodyStr = resolveBodyFromRequest(serverHttpRequest);
logger.info("RequestWrapFilter uri={},bodyStr={}", uri, bodyStr);
// 站内H5请求
if (path.contains(CommonConstant.INNER_H5) || path.contains(CommonConstant.INNER_H5_OLD)) {
// 站内参数验证,并且将相关的参数解析到appInfoBo对象
AppInfoBo appInfoBo = InnerParamCheckUtil.doParamCheck(exchange, tokenRedisCache);
//参数转换为空,直接响应错误
if (null == appInfoBo) {
return serverHttpResponse.writeWith(Mono.just(this.wrapResponse(exchange, "", IShangJieExceptionCode.H5_AUTH_CHECK_ERROR)));
}
userName = appInfoBo.getUserName();
netType = appInfoBo.getNetType();
id = appInfoBo.getId();
// 站外H5请求
} else if (path.contains(CommonConstant.OUTER_H5) || path.contains(CommonConstant.OUTER_H5_OLD)) {
String token = serverHttpRequest.getHeaders().getFirst("token");
if (StringUtils.isNotBlank(token)) {
userName = BaseTokenUtil.getUserName(token, TOKEN_SECRET);
}
}else{
logger.info("RequestWrapFilter exception uri={},bodyStr={}", uri, bodyStr);
}
if (StringUtils.isNotBlank(userName)) {
UserDo userDo = this.userService.getUserByUserName(userName);
if (null != userDo) {
userId = userDo.getRid();
}
}
// 重新封装 bodyStr
logger.info("gateway fitle contentType={}", contentType);
if (StringUtils.isNotBlank(bodyStr)) {
// application/json格式
if (MediaType.APPLICATION_JSON_VALUE.equalsIgnoreCase(contentType) || MediaType.APPLICATION_JSON_UTF8_VALUE.equalsIgnoreCase(contentType)) {
//将bodyString解析成jsonObject对象
JSONObject json = JSONObject.parseObject(bodyStr);
bodyStr = this.fillJsonBody(json, userName, netType, userId, ip, id);
} else if (MediaType.APPLICATION_FORM_URLENCODED_VALUE.equalsIgnoreCase(contentType)) {
// 普通键值对,增加参数
bodyStr = String.format("%s&%s", bodyStr, this.fillParamBody(userName, netType, userId, ip, id));
}
} else {
logger.error("POST body is null,url={}", serverHttpRequest.getURI().getRawPath());
// json格式 application/json
if (MediaType.APPLICATION_JSON_VALUE.equalsIgnoreCase(contentType) || MediaType.APPLICATION_JSON_UTF8_VALUE.equalsIgnoreCase(contentType)) {
preWarnUtil.warn("ParameterWrapFilter/filter", IShangJieExceptionCode.BODY_IS_NULL_ERROR);
// 返回参数异常响应
return serverHttpResponse.writeWith(Mono.just(this.wrapResponse(exchange, "", IShangJieExceptionCode.PARAM_ERROR)));
// 表单格式 application/x-www-form-urlencoded
} else if (MediaType.APPLICATION_FORM_URLENCODED_VALUE.equalsIgnoreCase(contentType)) {
// 普通键值对,增加参数
bodyStr = String.format("?%s", this.fillParamBody(userName, netType, userId, ip, id));
}
}
*/
//重新 封装request,传给下一级
ServerHttpRequest request = serverHttpRequest.mutate().uri(uri).build();
DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);
// 定义新的消息头
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
// 添加消息头
headers.set(CommonConstant.KEY_MAP_TOKEN, mapToken);
// 由于修改了传递参数,需要重新设置CONTENT_LENGTH,长度是字节长度,不是字符串长度
int length = bodyStr.getBytes().length;
headers.remove(HttpHeaders.CONTENT_LENGTH);
headers.setContentLength(length);
// 设置CONTENT_TYPE
if (StringUtils.isNotBlank(contentType)) {
headers.set(HttpHeaders.CONTENT_TYPE, contentType);
} else {
logger.info("=====StringUtils.isBlank(contentType)=====");
preWarnUtil.warn("ParameterWrapFilter/filter", IShangJieExceptionCode.CONTENT_TYPE_IS_NULL_ERROR);
}
// 由于post的body只能订阅一次,由于上面代码中已经订阅过一次body。所以要再次封装请求到request才行,不然会报错请求已经订阅过
request = new ServerHttpRequestDecorator(request) {
@Override
public Flux<DataBuffer> getBody() {
return bodyFlux;
}
};
request.mutate().header(HttpHeaders.CONTENT_LENGTH, Integer.toString(bodyStr.length()));
return chain.filter(exchange.mutate().request(request).build());
} else if (method == HttpMethod.GET) {
LogUtil.addLog(LogLevelEnum.INFO, "ParameterWrapFilter failed,path={}", serverHttpRequest.getURI().getPath());
return serverHttpResponse.writeWith(Mono.just(this.wrapResponse(exchange, "", IShangJieExceptionCode.GET_NOT_ALLOW_ERROR)));
}
return chain.filter(exchange);
}
/**
* 填充json数据格式
*/
private String fillJsonBody(JSONObject json, String userName, String netType, Long userId, String ip, String id) {
if (null == json) {
json = new JSONObject();
}
json.put(CommonConstant.KEY_APP_VERSION, appVersion);
json.put(CommonConstant.KEY_USERNAME, userName);
json.put(CommonConstant.KEY_NET_TYPE, netType);
json.put(CommonConstant.KEY_USER_ID, userId);
json.put(CommonConstant.KEY_IP, ip);
json.put(CommonConstant.KEY_ID, id);
// 转换回字符串
return JSONObject.toJSONString(json);
}
/**
* 填充拼接数据格式
*/
private String fillParamBody(String userName, String netType, Long userId, String ip, String id) {
StringBuffer buffer = new StringBuffer();
buffer.append(CommonConstant.KEY_USERNAME).append("=").append(userName)
.append("&").append(CommonConstant.KEY_NET_TYPE).append("=").append(netType)
.append("&").append(CommonConstant.KEY_APP_VERSION).append("=").append(appVersion)
.append("&").append(CommonConstant.KEY_USER_ID).append("=").append(userId)
.append("&").append(CommonConstant.KEY_IP).append("=").append(ip)
.append("&").append(CommonConstant.KEY_ID).append("=").append(id);
return buffer.toString();
}
public String getBody(ServerHttpRequest serverHttpRequest){
String body = null;
try {
ByteBuffer byteBuffer = Mono.from(serverHttpRequest.getBody()).toFuture().get().asByteBuffer();
byte[] bytes = new byte[byteBuffer.capacity()];
while (byteBuffer.hasRemaining()) {
byteBuffer.get(bytes);
}
body = new String(bytes, Charset.forName("UTF-8"));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return body;
}
/**
* string转成 buffer
*
* @param value
* @return
*/
private DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
}
/**
* 优先级最高
*
* @return
*/
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
两个场景postman run的结果:
服务器接收参数:
通过上面的试验,服务都是接收到数据,postman压测了1000个请求,未出现异常数据