如今,我们想要开发一个网络应用,那是相当地方便。不过就是引入一个框架,然后设置些参数,然后写写业务代码就搞定了。
写业务代码自然很重要,但是你知道:
你的数据是怎么来的吗?通过网络传输过来的呗。
你知道网络是通过什么方式传输过来的吗?光纤呗,TCP/IP协议呗。
看起来都难不住我们的同学们,但是,以上问题都不是我们关注的重点,我们今天要关注的是,TCP.IP协议是如何把数据传输到我们的应用服务器,而且准确地交到对应的业务代码手上的?
我们也不关注TCP协议的三次握手四次挥手,我们只需要确认一点,那就是TCP.IP协议是流式传输的,即数据是源源不断地从客户端传递到服务端的,而应用层是如何知道这些数据是什么的呢?当然这是上层的应用协议要做的事,比如http,smtp,ftp等等。
抛开其他不说,咱们使用 netty 来开发应用程序时,netty本身就承担了一个高层应用协议的角色,所以,我们可以从它是怎么识别这些传输过来的数据的过程,来一窥应用层协议的端倪。
其实大的方向都很简单,即客户端使用一种序列化协议将数据序列化,然后通过网络传输到服务端,然后服务端使用相应的反序列化协议,将数据解出来,再交给业务程序就好了。
所以,看起来好像只是一个序列化反序列化的问题而已。但如果是这样,咱们今天就不用再想这个问题了。
我们要考虑的是,客户端发送的数据是一次性到达服务端的吗?如果是这样,那太简单了,直接获取数据主好了。但是,如果我们要发送的数据非常大,TCP.IP能支持一下子传输吗?这是不可能的,TCP有一个MSS最大报文长度限制,超过这个之后,就必须进行拆分发送了。(粘包与拆包,太专业了)
我们来看下netty是如何处理这些相关数据的?
在dubbo中,是如何处利用netty理数据拆分的呢?
首先,我们看下dubbo创建netty的方式: (主要添加几个编码器解码器,以及handler)
// org.apache.dubbo.remoting.transport.netty4.NettyServer
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel(); }
其实netty的使用就是这么简单,你只需定义你的协议,你的handler就可以了,其他复杂的底层工作,一概不管!
我们首先来看netty是如何监听网络数据到来的?(基于 nio 绑定端口连接)
// io.netty.channel.socket.nio.NioServerSocketChannel
// 绑定socket服务到 nio channel 上
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependentVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
} @Override
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) superChannel();
}
所以,其实自己写 nio 的 server/client 可能也不会太难吧,但是你要应用的各种异常情况太多,就不见得能把握好了。
netty 的线程模型是 reactor 模型,有一个事件循环过程
// io.netty.channel.nio.NioEventLoop
// eventLoop 扫描事件
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required). if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
} cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// 处理事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
} // 处理事件
private void processSelectedKeys() {
if (selectedKeys != null) {
// 使用selectKeys进行处理
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
} private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com.netty.netty.issues/2363
selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) {
// ...
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
} if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com.netty.netty.issues/2363
selectedKeys.reset(i + 1); selectAgain();
i = -1;
}
}
} private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com.netty.netty.issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
} try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com.netty.netty.issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); unsafe.finishConnect();
} // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
} // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 读取数据,由 unsafe 类进行循环数据读取
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
} // io.netty.channel.nio.AbstractNioMessageChannel
// 处理真正的读数据过程
private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config); boolean closed = false;
Throwable exception = null;
try {
try {
// 循环读取数据,将数据读取到 readBuf 中
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 记录被读取了多少次数据了
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
} int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 依次调用管道进行处理
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete(); if (exception != null) {
closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception);
} if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com.netty.netty.issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
} // io.netty.channel.socket.nio.NioServerSocketChannel
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel()); try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t); try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
} return 0;
} // io.netty.channel.DefaultChannelPipeline
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
} // io.netty.channel.AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 在处理中,则直接调用,否则放入线程池运行
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 调用入站处理器读取消息
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
} final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
} @Override
public ChannelHandler handler() {
return this;
} @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
} @Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
} @Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
} @Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
} @Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.disconnect(promise);
} @Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.close(promise);
} @Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
unsafe.deregister(promise);
} @Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
} @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
} @Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
} @Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
} @Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered.
if (!channel.isOpen()) {
destroy();
}
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive(); readIfIsAutoRead();
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete(); readIfIsAutoRead();
} private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
} @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
} @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
} // DefaultChannelPipeline
// io.netty.channel.AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
} private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
} static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
} // io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
} try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
// DefaultChannelPipeline
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
} @Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
} for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
} return this;
} @Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); // 添加到pipeline的尾部
addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
} EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
} private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
} // NioEventLoopGroup
// io.netty.channel.MultithreadEventLoopGroup
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
} private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
} @Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
} // io.netty.channel.SingleThreadEventLoop
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
} @Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 此处注册好之后,就会开启另外的线程池来处理数据了
promise.channel().unsafe().register(this, promise);
return promise;
} // io.netty.channel.AbstractChannel $ AbstractUnsafe
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
} AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
} // io.netty.util.concurrent.SingleThreadEventExecutor
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
} boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
} if (!addTaskWakesUp && wakesUpForTask(task)) {
// 唤醒下一次接收数据
wakeup(inEventLoop);
}
} private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
// 把事件放入到另一个线程池处理, 一个阶段处理结束
doStartThread();
}
}
}
开启新的线程处理逻辑
// 开启新的线程处理逻辑
// 把事件放入到另一个线程池处理
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
} boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
} // Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
} try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
} terminationFuture.setSuccess(null);
}
}
}
}
});
} public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
} @Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
} @Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
} 实际解析数据信息是在 fireChannelRead 时触发的。 @Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
// 从 inBound 入站链中依次调用 channelRead() 方法
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
} // HeadContext
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
} // AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
} // io.netty.handler.codec.ByteToMessageDecoder
// 我们对数据的解析由这个类进行处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
// 针对多次到来的包,进行重新计算
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 调用解码方法
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com.netty.netty.issues/4275
numReads = 0;
discardSomeReadBytes();
} int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
// 如果解析到数据,就会往下一个 InBound 节点传
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
} /**
* Called once data should be decoded from the given {@link ByteBuf}. This method will call
* {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
*/
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 只要有可用的数据,会一直循环调用 decode 方法
while (in.isReadable()) {
int outSize = out.size(); if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear(); // Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com.netty.netty.issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
} int oldInputLength = in.readableBytes();
// 调用自行实现的 decode 方法,实现数据的组装
// 通过添加多个 pipeline 来实现业务的处理
decode(ctx, in, out); // Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com.netty.netty.issues/1664
if (ctx.isRemoved()) {
break;
} if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
} if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
} if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
} /**
* Get {@code numElements} out of the {@link CodecOutputList} and forward these through the pipeline.
*/
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
// 每个解析到的元素都会调用一次 fireChannelRead
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
如果自己来写这个组装包的逻辑,可能会是这样的:(仅仅是等到所有数据都到后,再传入下一个处理器即可)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
// 如果整个包还没完整,则等待下次调用
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data); Object obj = JSON.parseObject(data, target);
out.add(obj);
}
针对外部多次调入站程序的方法,通过 cumulate 方法组装数据
// 针对外部多次调入站程序的方法,通过 cumulate 方法组装数据 @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
// 合并数据
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com.netty.netty.issues/4275
numReads = 0;
discardSomeReadBytes();
} int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
} /**
* Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
*/
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
//
// See:
// - https://github.com.netty.netty.issues/2327
// - https://github.com.netty.netty.issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
下面我们来看下 dubbo 是如何进行数据包的组装的呢?(NEED_MORE_INPUT 的应用)
// Decoder 处理逻辑
// org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter
private class InternalDecoder extends ByteToMessageDecoder { @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try {
// decode object.
do {
int saveReaderIndex = message.readerIndex();
Object msg = codec.decode(channel, message);
// 只要遇到 NEED_MORE_INPUT 标识,则不会算本次接收完成,等待下一次回调
// 此处会先交给一连串的 codec 处理
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
} // org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
do {
Object obj = codec.decode(channel, buffer);
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
} else {
result.addMessage(obj);
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
} // org.apache.dubbo.remoting.exchange.codec.ExchangeCodec
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
// 可以看到,每个包都会有一个包头,只要解析出来,就可以知道它的类型,长度了
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
} @Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// check length.
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
} // get data length.
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len); // 只要数据未达到要求的长度,则返回 NEED_MORE_INPUT
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
} // limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try {
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
} // org.apache.dubbo.remoting.telnet.codec.TelnetCodec
@SuppressWarnings("unchecked")
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException {
if (isClientSide(channel)) {
return toString(message, getCharset(channel));
}
checkPayload(channel, readable);
if (message == null || message.length == 0) {
return DecodeResult.NEED_MORE_INPUT;
} if (message[message.length - 1] == '\b') { // Windows backspace echo
try {
boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char
channel.send(new String(doublechar ? new byte[]{32, 32, 8, 8} : new byte[]{32, 8}, getCharset(channel).name()));
} catch (RemotingException e) {
throw new IOException(StringUtils.toString(e));
}
return DecodeResult.NEED_MORE_INPUT;
} for (Object command : EXIT) {
if (isEquals(message, (byte[]) command)) {
if (logger.isInfoEnabled()) {
logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command)));
}
channel.close();
return null;
}
} boolean up = endsWith(message, UP);
boolean down = endsWith(message, DOWN);
if (up || down) {
LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
if (CollectionUtils.isEmpty(history)) {
return DecodeResult.NEED_MORE_INPUT;
}
Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
Integer old = index;
if (index == null) {
index = history.size() - 1;
} else {
if (up) {
index = index - 1;
if (index < 0) {
index = history.size() - 1;
}
} else {
index = index + 1;
if (index > history.size() - 1) {
index = 0;
}
}
}
if (old == null || !old.equals(index)) {
channel.setAttribute(HISTORY_INDEX_KEY, index);
String value = history.get(index);
if (old != null && old >= 0 && old < history.size()) {
String ov = history.get(old);
StringBuilder buf = new StringBuilder();
for (int i = 0; i < ov.length(); i++) {
buf.append("\b");
}
for (int i = 0; i < ov.length(); i++) {
buf.append(" ");
}
for (int i = 0; i < ov.length(); i++) {
buf.append("\b");
}
value = buf.toString() + value;
}
try {
channel.send(value);
} catch (RemotingException e) {
throw new IOException(StringUtils.toString(e));
}
}
return DecodeResult.NEED_MORE_INPUT;
}
for (Object command : EXIT) {
if (isEquals(message, (byte[]) command)) {
if (logger.isInfoEnabled()) {
logger.info(new Exception("Close channel " + channel + " on exit command " + command));
}
channel.close();
return null;
}
}
byte[] enter = null;
for (Object command : ENTER) {
if (endsWith(message, (byte[]) command)) {
enter = (byte[]) command;
break;
}
}
if (enter == null) {
return DecodeResult.NEED_MORE_INPUT;
}
LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
channel.removeAttribute(HISTORY_INDEX_KEY);
if (CollectionUtils.isNotEmpty(history) && index != null && index >= 0 && index < history.size()) {
String value = history.get(index);
if (value != null) {
byte[] b1 = value.getBytes();
byte[] b2 = new byte[b1.length + message.length];
System.arraycopy(b1, 0, b2, 0, b1.length);
System.arraycopy(message, 0, b2, b1.length, message.length);
message = b2;
}
}
String result = toString(message, getCharset(channel));
if (result.trim().length() > 0) {
if (history == null) {
history = new LinkedList<String>();
channel.setAttribute(HISTORY_LIST_KEY, history);
}
if (history.isEmpty()) {
history.addLast(result);
} else if (!result.equals(history.getLast())) {
history.remove(result);
history.addLast(result);
if (history.size() > 10) {
history.removeFirst();
}
}
}
return result;
}
所以,其实 dubbo 实现拆包的方式,也是依赖于 netty, 通过判定数据长度来决定是否包已到齐的。
同样,根据数据长度,也可以解决粘包问题,因为从头里指定的长度,即可知道数据到哪里时已取完,从而将粘在一起的包分开。
其实netty中提供了几个开箱即用的拆包方法 FixedLengthFrameDecoder,LineBasedFrameDecoder,DelimiterBasedFrameDecoder,LengthFieldBasedFrameDecoder。望文生义。只是自己实现也并不难,为什么不呢?
以上就是基于netty的TCP数据包的处理问题,也是一个简单的应用层协议处理过程,使我们可以更直接地了解应用层协议的处理过程。
当然,对于上面的基于数据长度进行数据包判定,会存在一些问题:
1. 当数据包很大时,将会阻塞其他请求;
2. 当数据包很大时,将会占用大量内存;
3. 同一连接中,不可能存在数据包的乱序传输;(TCP是否支持乱序、混合包传输?这是个问题)
当然,以上协议并不处理这种情况,针对大数据量请求,我们可以在客户端做好分包请求,从而减轻压力。
唠叨: 看透本质。