工作中的一个项目,我们的一个应用与银行系统进行tcp通信的时候,银行下送的报文有时会分多次返回。在tcp中这种数据包分多次小数据包发送的情况成为拆包问题。
其中一个,也是最常见的思路就是在报文的报文头部分规定某一段代表本次发送的完整报文的长度,这样接收方就会心中有数,在没有接收到这个长度的报文之前,认为本次通信未完成,数据包还不完整,从而继续等待下去。之前曾经遇到过这样的问题,那时候是用的java socket逐个字节对报文进行接收,直到看到结尾符为止。
只是这次项目原来的程序员用的netty框架,一开始没有注意到如何在netty正确处理拆包问题。导致后续投产后,银行返回的报文出现没有完整接收的情况,截断在中文汉字处产生乱码,导致异常。
下面介绍如何在netty中处理拆包问题。
server端代码:
public class NettyServer {
public static void main(String[] args) {
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
// Set up the default event pipeline.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new StringDecoder(), new StringEncoder(), new ServerHandler());
}
});
// Bind and start to accept incoming connections.
Channel bind = bootstrap.bind(new InetSocketAddress(8000));
System.out.println("Server已经启动,监听端口: " + bind.getLocalAddress() + ", 等待客户端注册。。。");
}
private static class ServerHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() instanceof String) {
String message = (String) e.getMessage();
System.out.println("Client发来:" + message);
// e.getChannel().write("Server已收到刚发送的:" + message+"\n");
e.getChannel().write("000287<?xml version=\"1.0\" encoding=\"GB18030\"?><root><head><TransCode>1002</TransCode><TransDate>20161025</TransDate><TransTime>092745</TransTime>"+
"<SeqNo>2016110542160157</SeqNo><ZoneCode>HZCQ</ZoneCode><TransRltCode>-25330</TransRltCode><TransRltMsg>000</TransRltMsg></head><body></body></root>");
System.out.println("\n等待客户端输入。。。");
}
super.messageReceived(ctx, e);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
super.exceptionCaught(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("有一个客户端注册上来了。。。");
System.out.println("Client:" + e.getChannel().getRemoteAddress());
System.out.println("Server:" + e.getChannel().getLocalAddress());
System.out.println("\n等待客户端输入。。。");
super.channelConnected(ctx, e);
}
}
}
client端代码:
public class NettyClient {
public static void main(String[] args) {
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
// Set up the default event pipeline.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new DeliDecoder(),
new StringEncoder(),
new ClientHandler());
}
});
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8000));
// Wait until the connection is closed or the connection attempt fails.
future.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
}
private static class ClientHandler extends SimpleChannelHandler {
private BufferedReader sin = new BufferedReader(new InputStreamReader(System.in));
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() instanceof String) {
String message = (String) e.getMessage();
System.out.println(message);
e.getChannel().write(sin.readLine());
}
super.messageReceived(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("已经与Server建立连接。。。。");
System.out.println("\n请输入要发送的信息:");
super.channelConnected(ctx, e);
e.getChannel().write("ddddd");
}
}
}
拆包问题解决的关键:
public class DeliDecoder extends FrameDecoder{
private static final Logger LOG = Logger.getLogger(DeliDecoder.class);
private final int headLen = 6; //银行回传的报文前6位为报文长度,前6位不计算在长度内
@Override
protected Object decode(ChannelHandlerContext chc, Channel channel,
ChannelBuffer buffer) throws Exception {
LOG.info("进入DeliDecoder.decode()");
if (buffer.readableBytes() < headLen) {
return null; //return null表示继续读取,下同
}
LOG.info("buffer copy...");
ChannelBuffer buffer2 = buffer.copy(); //直接用buffer.array()可能会报UnsupportedOperationException,故使用其copy
LOG.info("buffer copy done");
byte[] arr = buffer2.array();
LOG.info("buffer array init");
String temStr = new String(arr, "GB18030");
LOG.info(temStr);
int dataLength = Integer.parseInt(temStr.substring(0, 6));
LOG.info("dataLength : " + dataLength);
if (buffer.readableBytes() < dataLength + headLen) {
return null;
}
buffer.skipBytes(headLen); //从第7位开始读取报文正文
byte[] decoded = new byte[dataLength];
buffer.readBytes(decoded);
String msg = new String(decoded, "GB18030");
return msg;
}
}