[置顶] NIO框架之MINA源码解析(四):粘包与断包处理及编码与解码

时间:2021-03-29 17:15:38

NIO框架之MINA源码解析(一):背景


NIO框架之MINA源码解析(二):mina核心引擎


NIO框架之MINA源码解析(三):底层通信与责任链模式应用



1、粘包与段包


粘包:指TCP协议中,发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。
造成的可能原因:

    发送端需要等缓冲区满才发送出去,造成粘包

    接收方不及时接收缓冲区的包,造成多个包接收


断包:也就是数据不全,比如包太大,就把包分解成多个小包,多次发送,导致每次接收数据都不全。


2、消息传输的格式


消息长度+消息头+消息体  即前N个字节用于存储消息的长度,用于判断当前消息什么时候结束。

消息头+消息体    即固定长度的消息,前几个字节为消息头,后面的是消息头。

在MINA中用的是

消息长度+消息体 即前4个字节用于存储消息的长度,用于判断当前消息什么时候结束。


3、编码与解码


   在网络中,信息的传输都是通过字节的形式传输的,而我们在编写自己的代码时,则都是具体的对象,那么要想我们的对象能够在网络中传输,就需要编码与解码。


   编码:即把我们的消息编码成二进制形式,能以字节的形式在网络中传输。

   解码:即把我们收到的字节解码成我们代码中的对象。

   在MINA中对象的编码与解码用的都是JDK提供的ObjectOutputStream来实现的。


4、MINA中消息的处理实现


消息的接受处理,我们常用的是TCP协议,而TCP协议会分片的,在下面的代码中,具体功能就是循环从通道里面读取数据,直到没有数据可读,或者buffer满了,然后就把接受到的数据发给解码工厂进行处理。


4.1、消息的接收


//class AbstractPollingIoProcessorprivate void read(S session) {
IoSessionConfig config = session.getConfig();
int bufferSize = config.getReadBufferSize();
IoBuffer buf = IoBuffer.allocate(bufferSize);

final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();

try {
int readBytes = 0;
int ret;

try {
//是否有分片 tcp传输会有分片,即把大消息分片成多个小消息再传输
if (hasFragmentation) {
//read方法非阻塞,没有读到数据的时候返回0
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
//buffer 满了
if (!buf.hasRemaining()) {
break;
}
}
} else {
ret = read(session, buf);

if (ret > 0) {
readBytes = ret;
}
}
} finally {
buf.flip();
}

if (readBytes > 0) {
IoFilterChain filterChain = session.getFilterChain();
//处理消息
filterChain.fireMessageReceived(buf);
buf = null;

if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}

if (ret < 0) {
scheduleRemove(session);
}
} catch (Throwable e) {
if (e instanceof IOException) {
if (!(e instanceof PortUnreachableException)
|| !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
|| ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
scheduleRemove(session);
}
}

IoFilterChain filterChain = session.getFilterChain();
filterChain.fireExceptionCaught(e);
}
}


4.2、解码与编码


//class AbstractIoBuffer public Object getObject(final ClassLoader classLoader) throws ClassNotFoundException {//首先判断当前buffer中消息长度是否完整,不完整的话直接返回        if (!prefixedDataAvailable(4)) {            throw new BufferUnderflowException();        }//消息长度        int length = getInt();        if (length <= 4) {            throw new BufferDataException("Object length should be greater than 4: " + length);        }        int oldLimit = limit();//limit到消息结尾处        limit(position() + length);        try {            ObjectInputStream in = new ObjectInputStream(asInputStream()) {                @Override                protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException {                    int type = read();                    if (type < 0) {                        throw new EOFException();                    }                    switch (type) {                    case 0: // NON-Serializable class or Primitive types                        return super.readClassDescriptor();                    case 1: // Serializable class                        String className = readUTF();                        Class<?> clazz = Class.forName(className, true, classLoader);                        return ObjectStreamClass.lookup(clazz);                    default:                        throw new StreamCorruptedException("Unexpected class descriptor type: " + type);                    }                }                @Override                protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {                    String name = desc.getName();                    try {                        return Class.forName(name, false, classLoader);                    } catch (ClassNotFoundException ex) {                        return super.resolveClass(desc);                    }                }            };            return in.readObject();        } catch (IOException e) {            throw new BufferDataException(e);        } finally {            limit(oldLimit);        }    }//判断当前消息是否完整 public boolean prefixedDataAvailable(int prefixLength, int maxDataLength) {        if (remaining() < prefixLength) {            return false;        }        int dataLength;        switch (prefixLength) {        case 1:            dataLength = getUnsigned(position());            break;        case 2:            dataLength = getUnsignedShort(position());            break;        case 4:            dataLength = getInt(position());            break;        default:            throw new IllegalArgumentException("prefixLength: " + prefixLength);        }        if (dataLength < 0 || dataLength > maxDataLength) {            throw new BufferDataException("dataLength: " + dataLength);        }//判断当前消息是否完整         return remaining() - prefixLength >= dataLength;    }//编码 public IoBuffer putObject(Object o) {        int oldPos = position();        skip(4); // Make a room for the length field.预留4个字节用于存储消息长度        try {            ObjectOutputStream out = new ObjectOutputStream(asOutputStream()) {                @Override                protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {                    try {                        Class<?> clz = Class.forName(desc.getName());                        if (!Serializable.class.isAssignableFrom(clz)) { // NON-Serializable class                            write(0);                            super.writeClassDescriptor(desc);                        } else { // Serializable class                            write(1);                            writeUTF(desc.getName());                        }                    } catch (ClassNotFoundException ex) { // Primitive types                        write(0);                        super.writeClassDescriptor(desc);                    }                }            };            out.writeObject(o);            out.flush();        } catch (IOException e) {            throw new BufferDataException(e);        }        // Fill the length field        int newPos = position();        position(oldPos);//存储消息长度        putInt(newPos - oldPos - 4);        position(newPos);        return this;    }


4.3、断包与粘包处理

// class CumulativeProtocolDecoder public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {//是否有分片,tcp 有分片        if (!session.getTransportMetadata().hasFragmentation()) {            while (in.hasRemaining()) {                if (!doDecode(session, in, out)) {                    break;                }            }            return;        }// 1、断包处理// 2、处理粘包        boolean usingSessionBuffer = true;//session中是否有断包情况(上次处理后),断包保存在session中        IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);        // If we have a session buffer, append data to that; otherwise        // use the buffer read from the network directly.        if (buf != null) {//有断包,则把当前包拼接到断包里面            boolean appended = false;            // Make sure that the buffer is auto-expanded.            if (buf.isAutoExpand()) {                try {                    buf.put(in);                    appended = true;                } catch (IllegalStateException e) {                    // A user called derivation method (e.g. slice()),                    // which disables auto-expansion of the parent buffer.                } catch (IndexOutOfBoundsException e) {                    // A user disabled auto-expansion.                }            }            if (appended) {                buf.flip();            } else {                // Reallocate the buffer if append operation failed due to                // derivation or disabled auto-expansion.                buf.flip();                IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);                newBuf.order(buf.order());                newBuf.put(buf);                newBuf.put(in);                newBuf.flip();                buf = newBuf;                // Update the session attribute.                session.setAttribute(BUFFER, buf);            }        } else {            buf = in;            usingSessionBuffer = false;        }//2 粘包处理,可能buffer中有多个消息,需要多次处理(解码)每个消息,直到消息处理完,或者剩下的消息不是一个完整的消息或者buffer没有数据了        for (;;) {            int oldPos = buf.position();            boolean decoded = doDecode(session, buf, out);            if (decoded) {//解码 成功                if (buf.position() == oldPos) {                    throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");                }//buffer空了                if (!buf.hasRemaining()) {//buffer没有数据了                    break;                }            } else {//剩下的消息不是一个完整的消息,断包出现了                break;            }        }        // if there is any data left that cannot be decoded, we store        // it in a buffer in the session and next time this decoder is        // invoked the session buffer gets appended to        if (buf.hasRemaining()) {//剩下的消息不是一个完整的消息,断包出现了//如果断包已经保存在session中,则更新buffer,没有的话,就把剩下的断包保存在session中            if (usingSessionBuffer && buf.isAutoExpand()) {                buf.compact();            } else {                storeRemainingInSession(buf, session);            }        } else {            if (usingSessionBuffer) {                removeSessionBuffer(session);            }        }    }


//class  ObjectSerializationDecoder protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {//首先判断当前buffer中消息长度是否完整,不完整的话直接返回        if (!in.prefixedDataAvailable(4, maxObjectSize)) {            return false;        }        out.write(in.getObject(classLoader));        return true;    }