一、先来看一下客户端示例代码。
public class NettyClientTest {
public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();//与服务端不同,客户端只需要一个IO线程组 try {
Bootstrap b = new Bootstrap();
b.group(group)
.option(ChannelOption.TCP_NODELAY, true)//禁用nagel算法
.channel(NioSocketChannel.class)//设置channel类型为NioSocketChannel
.handler(new ChannelInitializer<SocketChannel>() {//为channel设置初始化Handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();//等不等待连接结束
f.channel().closeFuture().sync();//同步等待关闭
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception{
int port = 8082;
new NettyClientTest().connect(port,"127.0.0.1");
}
} class EchoClientHandler extends ChannelInboundHandlerAdapter{
private int count = 0;
static final String ECHO_REQ = "HI , MY NAME IS CHENYANG.$_"; @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i = 0;i < 10;i++){
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("This is"+ ++count + "times receive server:[" + msg + "]");
ctx.writeAndFlush(Unpooled.copiedBuffer("hehe.$_".getBytes()));
ctx.fireChannelRead(msg);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
二、启动过程分析
由于客户端Bootstrap的配置过程和服务端ServerBootstrap配置过程原理相类似,此处不再单独讲解客户端的配置过程。接下来直接看客户端的connect过程。
三、connect过程分析
ChannelFuture f = b.connect(host, port).sync();
connect代码如下:
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(new InetSocketAddress(inetHost, inetPort));
}
继续深入
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
} validate();
return doConnect(remoteAddress, localAddress());
}
继续查看doConnect源码
/**
* @see {@link #connect()}
*/
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//与服务端的类似,负责初始化和注册这个channel
final Channel channel = regFuture.channel();//获得创建的channel
if (regFuture.cause() != null) {
return regFuture;
} final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);//连接
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
} return promise;
}
看一下initAndRegister代码
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();//调用之前设置的channel工厂,创建channel,此处就是NioSocketChannel
try {
init(channel);//初始化这个channel,这个针对客户端和服务端是不同的
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
} ChannelFuture regFuture = group().register(channel);//向NioEventLoopGroup中注册这个channel
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
} // If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread. return regFuture;
}
首先看一下针对客户端的init代码。
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());//设置用户添加的handler,也就是初始化的handler final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {//设置channel的配置选项
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
} final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());//设置channel的属性
}
}
}
接下来看register过程,这个和服务端是一样的。(ChannelFuture regFuture = group().register(channel);)
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);//next()会在Group中选出下一个NioEventLoop
}
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
} channel.unsafe().register(this, promise);//unsafe中执行真正的注册操作
return promise;
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
} AbstractChannel.this.eventLoop = eventLoop;//设置该channel绑定的eventloop if (eventLoop.inEventLoop()) {//必须保证在eventloop线程中执行
register0(promise);//注册
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
继续看register0代码
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();//在selector上注册
neverRegistered = false;
registered = true;//设置已经注册标识
safeSetSuccess(promise);//设置注册成功
pipeline.fireChannelRegistered();//引发channelRegistered事件,这会导致初始化Handler的channelRegistered被调用
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {//如果channel可用,针对客户端,也就是connect成功
pipeline.fireChannelActive();//引发channelActive事件,最终注册read事件
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
看doRegister代码
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);//注意,这里注册的op为0,不会监听任何事件
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
initAndRegister执行完成之后,继续看doConnect0代码
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {//接下来的代码实在eventloop中执行,而不是用户线程
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);//执行connect
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
继续看connect代码,简单的调用了pipeline.connect
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
从tail开始
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
最终会调用到head.connect()
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
} try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
} boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);//设置promise
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress; // Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();//支持连接超时机制
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
} promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
客户端的isActive()
@Override
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
服务端的isActive()
@Override
public boolean isActive() {
return javaChannel().socket().isBound();
}
看一下doConnect代码
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
javaChannel().socket().bind(localAddress);
} boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);//执行真正的异步connect
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);//如果没有注册成功,就注册OP_CONNECT事件
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
} // trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {//如果connect成功
13 pipeline().fireChannelActive();//最终会注册read事件,细节如下
14 } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive(); if (channel.config().isAutoRead()) {
channel.read();//pipeline.read()-->tail.read()-->****-->head.read()-->unsafe.beginRead()-->doBeginRead()-->real操作
} return this;
}
四、看一下如何获取异步连接结果的
在NioEventLoop的循环中,可以看到如下代码:
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); unsafe.finishConnect();
}
当发生OP_CONNECT事件时,最终会调用unsafe.finishConnect,代码如下
@Override
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out. assert eventLoop().inEventLoop();//确保该操作是在eventLoop线程中的 try {
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
// See https://github.com/netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
@Override
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {//判断JDK的SocketChannel连接结果,返回true表示连接成功
throw new Error();
}
}
判断JDK的SocketChannel连接结果,返回true表示连接成功
public boolean finishConnect() throws IOException {
Object var1 = this.readLock;
synchronized(this.readLock) {
Object var2 = this.writeLock;
synchronized(this.writeLock) {
Object var3 = this.stateLock;
boolean var10000;
synchronized(this.stateLock) {
if(!this.isOpen()) {
throw new ClosedChannelException();
} if(this.state == 2) {
var10000 = true;
return var10000;
} if(this.state != 1) {
throw new NoConnectionPendingException();
}
} int var41 = 0; Object var4;
try {
label525: {
boolean var29 = false; boolean var6;
label506: {
try {
var29 = true;
this.begin();
synchronized(this.blockingLock()) {
label480: {
label494: {
Object var5 = this.stateLock;
synchronized(this.stateLock) {
if(!this.isOpen()) {
var6 = false;
break label494;
} this.readerThread = NativeThread.current();
} if(!this.isBlocking()) {
do {
var41 = checkConnect(this.fd, false, this.readyToConnect);
} while(var41 == -3 && this.isOpen());
} else {
do {
while(true) {
var41 = checkConnect(this.fd, true, this.readyToConnect);
if(var41 == 0) {
continue;
}
break;
}
} while(var41 == -3 && this.isOpen());
} var29 = false;
break label480;
} var29 = false;
break label506;
}
}
} finally {
if(var29) {
Object var13 = this.stateLock;
synchronized(this.stateLock) {
this.readerThread = 0L;
if(this.state == 3) {
this.kill();
var41 = 0;
}
} this.end(var41 > 0 || var41 == -2); assert IOStatus.check(var41); }
} var4 = this.stateLock;
synchronized(this.stateLock) {
this.readerThread = 0L;
if(this.state == 3) {
this.kill();
var41 = 0;
}
} this.end(var41 > 0 || var41 == -2); assert IOStatus.check(var41);
break label525;
} Object var7 = this.stateLock;
synchronized(this.stateLock) {
this.readerThread = 0L;
if(this.state == 3) {
this.kill();
var41 = 0;
}
} this.end(var41 > 0 || var41 == -2); assert IOStatus.check(var41); return var6;
}
} catch (IOException var38) {
this.close();
throw var38;
} if(var41 > 0) {
var4 = this.stateLock;
synchronized(this.stateLock) {
this.state = 2;
if(this.isOpen()) {
this.localAddress = Net.localAddress(this.fd);
}
} var10000 = true;
return var10000;
} else {
var10000 = false;
return var10000;
}
}
}
}
fulfillConnectPromise会出发链接激活事件
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
} // trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess(); // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();//参照前面的说明
} // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
五、write过程
由于在服务端启动过程中已经多次分析了channel的read执行过程,因此在这里单独分析一下channel的write过程。首先看一下channe接口中关于write方法的定义:
/**
* Request to write a message via this {@link Channel} through the {@link ChannelPipeline}.
* This method will not request to actual flush, so be sure to call {@link #flush()}
* once you want to request to flush all pending data to the actual transport.
*/
ChannelFuture write(Object msg);
其在AbstractChannel中的实现为:
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
继续深入
@Override
public ChannelFuture write(Object msg) {
return tail.write(msg);
}
事件进入pipeline之后,会从tail context开始向前传播(因为write是个outbound事件)
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
继续
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
} if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
write(msg, false, promise);//false表示不flush缓冲区的意思 return promise;
}
继续
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeWrite(msg, promise);
if (flush) {
next.invokeFlush();
}
} else {
int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
Runnable task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, msg, size, promise);
} else {
task = WriteTask.newInstance(next, msg, size, promise);
}
safeExecute(executor, task, promise, msg);
}
}
看一下findContextOutbound的实现
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
找到下一个OutBound类型的Context之后,会调用Context中的Handler
private void invokeWrite(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
继续看handler的write实现
/**
* Calls {@link ChannelHandlerContext#write(Object)} to forward
* to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
可以看到,默认的实现是将事件继续沿pipeline向前传播,最终会传到head Context
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
unsafe会执行真正的IO操作
@Override
public final void write(Object msg, ChannelPromise promise) {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
} int size;
try {
msg = filterOutboundMessage(msg);
size = estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
} outboundBuffer.addMessage(msg, size, promise);
}
可以看到,unsafe的write操作并不是真正的将数据发送出去,而是在环形缓冲区中进行缓存。当channel调用flush时,最终会执行unsafe的flush
@Override
public final void flush() {
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
} outboundBuffer.addFlush();
flush0();
}
addFlush仅仅是对之前缓存的Message进行标记
/**
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null); // All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
接下来看一下真正的flush操作
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
} final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
} inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
} try {
doWrite(outboundBuffer);
} catch (Throwable t) {
boolean close = t instanceof IOException && config().isAutoClose();
// We do not want to trigger channelWritabilityChanged event if the channel is going to be closed.
outboundBuffer.failFlushed(t, !close);
if (close) {
close(voidPromise());
}
} finally {
inFlush0 = false;
}
}
doWrite执行真正的写操作
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) {
// All written so clear OP_WRITE
clearOpWrite();
break;
}
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in);
return;
case 1:
// Only one ByteBuf so use non-gathering write
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final int localWrittenBytes = ch.write(nioBuffer);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
default:
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
break;
} // Release the fully written buffers, and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes); if (!done) {
// Did not write all buffers completely.
incompleteWrite(setOpWrite);
break;
}
}
}
protected final void clearOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1; for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
break;
} if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
} boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
} flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
} in.progress(flushedAmount); if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transfered() >= region.count();
boolean setOpWrite = false; if (!done) {
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
} for (int i = writeSpinCount - 1; i >= 0; i--) {
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
} flushedAmount += localFlushedAmount;
if (region.transfered() >= region.count()) {
done = true;
break;
}
} in.progress(flushedAmount);
} if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
break;
}
} else {
// Should not reach here.
throw new Error();
}
}
}
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
/**
* Notify the {@link ChannelPromise} of the current message about writing progress.
*/
public void progress(long amount) {
Entry e = flushedEntry;
assert e != null;
ChannelPromise p = e.promise;
if (p instanceof ChannelProgressivePromise) {
long progress = e.progress + amount;
e.progress = progress;
((ChannelProgressivePromise) p).tryProgress(progress, e.total);
}
}
@Override
public boolean tryProgress(long progress, long total) {
if (total < 0) {
total = -1;
if (progress < 0 || isDone()) {
return false;
}
} else if (progress < 0 || progress > total || isDone()) {
return false;
} notifyProgressiveListeners(progress, total);
return true;
}