NIO框架之MINA源码解析(一):背景
NIO框架之MINA源码解析(二):mina核心引擎
NIO框架之MINA源码解析(三):底层通信与责任链模式应用
1、粘包与段包
粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。
造成的可能原因:
发送端需要等缓冲区满才发送出去,造成粘包
接收方不及时接收缓冲区的包,造成多个包接收
断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全。
2、消息传输的格式
消息长度+消息头+消息体 即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束。
消息头+消息体 即固定长度的消息,前几个字节为消息头,后面的是消息头。
在MINA中用的是
消息长度+消息体 即前4个字节用于存储消息的长度,用于判断当前消息什么时候结束。
3、编码与解码
在网络中,信息的传输都是通过字节的形式传输的,而我们在编写自己的代码时,则都是具体的对象,那么要想我们的对象能够在网络中传输,就需要编码与解码。
编码:即把我们的消息编码成二进制形式,能以字节的形式在网络中传输。
解码:即把我们收到的字节解码成我们代码中的对象。
在MINA中对象的编码与解码用的都是JDK提供的ObjectOutputStream来实现的。
4、MINA中消息的处理实现
消息的接受处理,我们常用的是TCP协议,而TCP协议会分片的,在下面的代码中,具体功能就是循环从通道里面读取数据,直到没有数据可读,或者buffer满了,然后就把接受到的数据发给解码工厂进行处理。
4.1、消息的接收
//class AbstractPollingIoProcessorprivate void read(S session) {
IoSessionConfig config = session.getConfig();
int bufferSize = config.getReadBufferSize();
IoBuffer buf = IoBuffer.allocate(bufferSize);
final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
try {
int readBytes = 0;
int ret;
try {
//是否有分片 tcp传输会有分片,即把大消息分片成多个小消息再传输
if (hasFragmentation) {
//read方法非阻塞,没有读到数据的时候返回0
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
//buffer 满了
if (!buf.hasRemaining()) {
break;
}
}
} else {
ret = read(session, buf);
if (ret > 0) {
readBytes = ret;
}
}
} finally {
buf.flip();
}
if (readBytes > 0) {
IoFilterChain filterChain = session.getFilterChain();
//处理消息
filterChain.fireMessageReceived(buf);
buf = null;
if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
if (ret < 0) {
scheduleRemove(session);
}
} catch (Throwable e) {
if (e instanceof IOException) {
if (!(e instanceof PortUnreachableException)
|| !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
|| ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
scheduleRemove(session);
}
}
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
}
}
4.2、解码与编码
//class AbstractIoBuffer public Object getObject(final ClassLoader classLoader) throws ClassNotFoundException {//首先判断当前buffer中消息长度是否完整,不完整的话直接返回 if (!prefixedDataAvailable(4)) { throw new BufferUnderflowException(); }//消息长度 int length = getInt(); if (length <= 4) { throw new BufferDataException("Object length should be greater than 4: " + length); } int oldLimit = limit();//limit到消息结尾处 limit(position() + length); try { ObjectInputStream in = new ObjectInputStream(asInputStream()) { @Override protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException { int type = read(); if (type < 0) { throw new EOFException(); } switch (type) { case 0: // NON-Serializable class or Primitive types return super.readClassDescriptor(); case 1: // Serializable class String className = readUTF(); Class<?> clazz = Class.forName(className, true, classLoader); return ObjectStreamClass.lookup(clazz); default: throw new StreamCorruptedException("Unexpected class descriptor type: " + type); } } @Override protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { String name = desc.getName(); try { return Class.forName(name, false, classLoader); } catch (ClassNotFoundException ex) { return super.resolveClass(desc); } } }; return in.readObject(); } catch (IOException e) { throw new BufferDataException(e); } finally { limit(oldLimit); } }//判断当前消息是否完整 public boolean prefixedDataAvailable(int prefixLength, int maxDataLength) { if (remaining() < prefixLength) { return false; } int dataLength; switch (prefixLength) { case 1: dataLength = getUnsigned(position()); break; case 2: dataLength = getUnsignedShort(position()); break; case 4: dataLength = getInt(position()); break; default: throw new IllegalArgumentException("prefixLength: " + prefixLength); } if (dataLength < 0 || dataLength > maxDataLength) { throw new BufferDataException("dataLength: " + dataLength); }//判断当前消息是否完整 return remaining() - prefixLength >= dataLength; }//编码 public IoBuffer putObject(Object o) { int oldPos = position(); skip(4); // Make a room for the length field.预留4个字节用于存储消息长度 try { ObjectOutputStream out = new ObjectOutputStream(asOutputStream()) { @Override protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException { try { Class<?> clz = Class.forName(desc.getName()); if (!Serializable.class.isAssignableFrom(clz)) { // NON-Serializable class write(0); super.writeClassDescriptor(desc); } else { // Serializable class write(1); writeUTF(desc.getName()); } } catch (ClassNotFoundException ex) { // Primitive types write(0); super.writeClassDescriptor(desc); } } }; out.writeObject(o); out.flush(); } catch (IOException e) { throw new BufferDataException(e); } // Fill the length field int newPos = position(); position(oldPos);//存储消息长度 putInt(newPos - oldPos - 4); position(newPos); return this; }
4.3、断包与粘包处理
// class CumulativeProtocolDecoder public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {//是否有分片,tcp 有分片 if (!session.getTransportMetadata().hasFragmentation()) { while (in.hasRemaining()) { if (!doDecode(session, in, out)) { break; } } return; }// 1、断包处理// 2、处理粘包 boolean usingSessionBuffer = true;//session中是否有断包情况(上次处理后),断包保存在session中 IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER); // If we have a session buffer, append data to that; otherwise // use the buffer read from the network directly. if (buf != null) {//有断包,则把当前包拼接到断包里面 boolean appended = false; // Make sure that the buffer is auto-expanded. if (buf.isAutoExpand()) { try { buf.put(in); appended = true; } catch (IllegalStateException e) { // A user called derivation method (e.g. slice()), // which disables auto-expansion of the parent buffer. } catch (IndexOutOfBoundsException e) { // A user disabled auto-expansion. } } if (appended) { buf.flip(); } else { // Reallocate the buffer if append operation failed due to // derivation or disabled auto-expansion. buf.flip(); IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true); newBuf.order(buf.order()); newBuf.put(buf); newBuf.put(in); newBuf.flip(); buf = newBuf; // Update the session attribute. session.setAttribute(BUFFER, buf); } } else { buf = in; usingSessionBuffer = false; }//2 粘包处理,可能buffer中有多个消息,需要多次处理(解码)每个消息,直到消息处理完,或者剩下的消息不是一个完整的消息或者buffer没有数据了 for (;;) { int oldPos = buf.position(); boolean decoded = doDecode(session, buf, out); if (decoded) {//解码 成功 if (buf.position() == oldPos) { throw new IllegalStateException("doDecode() can't return true when buffer is not consumed."); }//buffer空了 if (!buf.hasRemaining()) {//buffer没有数据了 break; } } else {//剩下的消息不是一个完整的消息,断包出现了 break; } } // if there is any data left that cannot be decoded, we store // it in a buffer in the session and next time this decoder is // invoked the session buffer gets appended to if (buf.hasRemaining()) {//剩下的消息不是一个完整的消息,断包出现了//如果断包已经保存在session中,则更新buffer,没有的话,就把剩下的断包保存在session中 if (usingSessionBuffer && buf.isAutoExpand()) { buf.compact(); } else { storeRemainingInSession(buf, session); } } else { if (usingSessionBuffer) { removeSessionBuffer(session); } } }
//class ObjectSerializationDecoder protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {//首先判断当前buffer中消息长度是否完整,不完整的话直接返回 if (!in.prefixedDataAvailable(4, maxObjectSize)) { return false; } out.write(in.getObject(classLoader)); return true; }