AbstractChannel和AbstractUnsafe抽象类
io.netty.channel.AbstractChannel
从本章开始,会有大量的篇幅涉及到代码分析。为了能够清晰简洁的地说明代码的结构和功能,我会用代码注释+独立段落的方式加以呈现。 所以,为你能更好地理解代码,请不要忽略代码中黑体字注释。
AbstractChannel和AbstractUnsafe之间的关系
AbstractChannel实现了Channel接口,AbstractUnsafe实现了Unsafe。这两个类是抽象类,他们实现了Channel和Unsafe的绝大部分接口。在AbstractChannel的实现中,每个方法都会直接或间接调用Unsafe对应的同名方法。所有的inbound和outbound方法都是通过pipeline间接调用,其他的辅助方法直接使用unsafe实例调用。pipline和unsafe实例在AbstractChannel的构造方法创建:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe(); //AbstractChannel没有实现这个方法
pipeline = newChannelPipeline(); // newChannelPipline的实现 return new DefaultChannelPipeline(this);
}
直接调用的例子:
@Override
public SocketAddress localAddres) {
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
try {
//这里直接调用了Unsafe的localAddress()方法
this.localAddress = localAddress = unsafe().localAddress();
} catch (Throwable t) {
// Sometimes fails on a closed socket in Windows.
return null;
}
}
return localAddress;
}
间接调用的例子
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress); //通过pipline间接调用Unsafe的bind方法
}
关于pipline是怎样调用Unsafe方法的,会在后面的Pipline相关章节详细分析,这里只需记住。pipeline所有方法调用最终都会(如果没有改变ChannelContextHandler的默认实现)通过使用newUnsafe创建的Unsafe实例调用Unsafe的同名方法(如果有的话)。
netty给出这一对Abstract实现有两个目的:
- 进一步明确接口的语意。
- 简化Channel接口的实现。
下面来具体看一下AbstractUnsafe的主要方法实现。
AbstractUnsafe的重要实现
register实现
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) { //检查是否已经注册, 避免重复只需注册动作。
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {//检查eventloop是否满足Channel的要求,由子类实现
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//设置Channel的EventLoop实例
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) { //检查是否在当前线程中,如果是,直接调用
register0(promise);
} else {
//如果不是,把register0包装到runnable中放到eventloop中调用。
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
这个方法的实现为我们展示了netty使用I/O线程的一般套路
if(eventLoop.inEventLoop()){
doSomething();
}else{
eventLoop.execute(new Runnable(){
@Override
public void run() {
doSomething();
}
});
}
对于某个需要放到I/O线性中执行的方法,先检查当前线程是不是I/O线程,是就直接执行,不是就把它包装到Ruannable中放到eventLoop中执行。
register的功能总结一句话就是调用register0, 下面看看register0的实现。
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
//确保promise没有被取消同时Channel没有被关闭才能执行后面的动作
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); //执行真正的register操作,留改子类实现
neverRegistered = false;
registered = true; //设置Channel已经处于registed状态
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
//触发handlerAdded事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered(); //触发channelRegistered事件
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {//确保Channel只有在第一次register 的时候被触发
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
//对于设置了autoRead的Channel执行beginRead();
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
register语义:
- 把channel和eventLoop绑定,eventLoop线程就是I/O线程。
- 确保真正的register操作在I/O线程中执行。
- 确保每个channel的register操作只执行一次。
- 真正的register操作执行成功后, 触发channelRegistered事件,如果channel此时仍处于active状态,触发channelActive事件,并确保这些事件只触发一次。
- 真正的register操作执行成功后, 如果channel此时仍处于active状态,并且channel的配置支持autoRead, 则执行beginRead操作,让eventLoop可以自动触发channel的read事件。
bind实现
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
//先保存是否active的状态
boolean wasActive = isActive();
try {
doBind(localAddress); //调用doBind, 需要子类实现这个方法完成真正的bind操作
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
//如果执行完doBind后从非active状态变成active装,则触发channelActive事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
bind语义:
- 调用抽象方法doBind, 它需要子类实现。
- 如果channel的状态从非active变成active状态,则触发channelActive事件
disconnect实现
disconnect和bind的实现类型,不同的是他调用的是doDisconnect方法,这个方法同样是抽象方法需要子类实现。当channel的状态从非active变成active状态时,调用pipeline.fireChannelInactive()触发channelInactive事件。
close实现
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify)
{
if (!promise.setUncancellable()) {
return;
}
if (closeInitiated) { //这段代码的作用就是防止多次执行close操作
if (closeFuture.isDone()) {
// Closed already.
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
closeInitiated = true;
final boolean wasActive = isActive();
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//把outboundBuffer置空,在这之后无法进行write或flush操作
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = prepareToClose(); //这个方法默认实现是return null. 如果有些可以在子类中覆盖这个方法添加关闭前的准备代
//下面的if..else执行的是相同的操作,不同的是如果closeExecutor可以用,就在这个executor中执行,否则在当前线程总执行
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
doClose0(promise); //执行close操作
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
// close完成之后的操作, 在eventLoop中执行
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
//对outboundBuffer中的数据进行错误处理
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
//执行deregister操作, 如果channel由active变成非active状态就触发channelInactive事件
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
//如果正在执行flush操作,把deregister操作放在eventLoop中执行
invokeLater(new Runnable() {
@Override
public void run() {
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
fireChannelInactiveAndDeregister(wasActive);
}
}
}
private void doClose0(ChannelPromise promise) {
try {
doClose(); //调用doClose执行真正的close操作,它是一个抽象方法,需要在子类中实现。
closeFuture.setClosed();
safeSetSuccess(promise);
} catch (Throwable t) {
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
close实现的代码虽然比较多,但做的事情比较简单:首先执行close操作,然后实现deregister操作,触发channelInactive事件。
在close的实现中,先调用assertEventLoop方法确保当前方法是在eventLoop中执行,然后多次使用invokeLater方法吧一系列操作放在放在Runnable中执行,这样做的目的是事为了保证接下来的操作一定在当前操作完成之后才会执行,这一点是有eventLoop来保证的,eventLoop执行Runnable的顺序和调用execute的顺序一致,相关实现会在后面eventLoop章节具体讨论。
deregister实现
@Override
public final void deregister(final ChannelPromise promise) {
assertEventLoop();
deregister(promise, false);
}
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
if (!promise.setUncancellable()) {
return;
}
if (!registered) { //避免多次执行deregister操作
safeSetSuccess(promise);
return;
}
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
// we need to ensure we do the actual deregister operation later. This is needed as for example,
// we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
// the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
// the deregister operation this could lead to have a handler invoked by different EventLoop and so
// threads.
//
// See:
// https://github.com/netty/netty/issues/4435
invokeLater(new Runnable() {
@Override
public void run() {
try {
doDeregister(); //执行真正的deregister操作,这方法默认没做任何事情,子类可以根据需要覆盖实现
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
pipeline.fireChannelInactive(); // 触发channelInactive事件
}
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered, so check
// if it was registered.
if (registered) {
registered = false;
pipeline.fireChannelUnregistered(); //触发channelUnregistered事件
}
safeSetSuccess(promise);
}
}
});
}
语义:
- 调用doDeregister执行真正的deregister操作
- 根据参数可能需要触发channelInactive事件
- 触发channelUnregistered事件
write实现
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
//如果outboundBuffer是null, 意味着这个channel已经被close掉了,需要使用promise返回错误,然后释放掉msg
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg); //过滤msg, 默认实现中没有做任何操作,把msg原样返回, 资料可以根据需要覆盖实现。
size = pipeline.estimatorHandle().size(msg); //计算msg序列化之后的长度
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise); //把msg放入outboundBuffer中
}
write的操作比较简单,他只是把消息放到outboundBuffer中,并没有做实际的写操作。
flush实现
@Override
public final void flush() {
assertEventLoop(); //确保在eventLoop中执行
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
//如果outboundBuffer不是null才可以进入真正的write阶段
flush0();
}
protected void flush0() {
if (inFlush0) { //确保不被多个线程同时执行
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) { //确保outboundBuffer有数据是才执行下面的步骤
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) { //如果channel不是active状态,返回错误
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer); //执行真正的写操作,这是一个抽象方法,需要子类实现。
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
//如是I/O异常,并且channel配置允许自动关闭,则关闭channel
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t); //关闭output通道,不允许执行write操作。
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
语义:
- 调用doWrite方法执行真正的写操作
- 如果写操作失败,调用close或者shutdownOutput进行善后。
至此,已经分析完了AbstractChannel和AbstractUnsafe的所有重要的实现,回头总结一下,这个类主要做了这么几件事:
1. 明确了AbstractChannel和AbstractUnsafe方法之间的调用关系,或通过unsafe实例直接调用,或通过pipleline间接调用。
2. 规定了Unsafe方法的执行线程,有些必须在eventLoop中执行,这样的方法第一行就调用assertEventLoop来确保当前方法是在eventLoop线性中,有些不需要一定在eventLoop中执行的则没有这个调用
3. 确保多线程多线程环境下的执行顺序,这一点通过把一系列操作包装成Runnable放入eventLoop中来保证,invokeLater方法就是一个典型的例子。
4. 定义了事件的触发条件,在前面的代码分析中,频繁地出现pipeline.fireXXX()的调用,这些调用就是在触发特定的事件,大部分情况下用户不要自己去触发事件。
5. 优化多线程环境下的数据同步性能,使用volatile减少synchronized和Lock的使用, 典型的用法如下所示:
private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
......
return;
}
....
doWrite(outboundBuffer);
AbstractUnsafe的扩展点
前面说过,AbstractUnsafe做了很多事,但把临门一脚的工作交给子类完成,这样让子类的实现变得简单很多。AbstractUsafe把这些工作定义成形如doXXX的抽象方法或是没有干任何事的空方法。下面是这些方法的列表:
方法
|
说明
|
protected abstract SocketAddress localAddress0()
|
被localAddress调用,执行真正的获取本地地址的操作。
|
protected abstract SocketAddress remoteAddress0()
|
被remoteAddress调用,是真正的获取远程地址的操作。
|
protected abstract boolean isCompatible(EventLoop loop)
|
检查eventLoop是是否和这个Channel兼容。
|
protected void doRegister()
|
调用链register->register0->doRegister, 真正的注册操作。
|
protected abstract void doBind(SocketAddress localAddress)
|
被bind调用,执行真正绑定本地地址的操作。
|
protected abstract void doDisconnect()
|
被disconnect调用,执行真正的断开连接操作。
|
protected abstract void doClose()
|
被close掉,执行真正的关闭channel操作。
|
protected void doShutdownOutput()
|
被shutdownOutput调用,用来关闭output通道,使Channel不能write。它的的默认实现是调用doClose
|
protected void doDeregister()
|
被deregister调用,是真正的注销操作,虽然不是抽象方法,然而只有一个{}, 还是要等你来搞定。
|
protected abstract void doBeginRead()
|
调用链register->register0->beginRead->doBeginRead, 实现让eventLoop可以自动触发read事件。
|
protected abstract void doWrite(ChannelOutboundBuffer in)
|
调用链flush->flush0->doWrite, 执行真正的写操作。
|
protected Object filterOutboundMessage(Object msg)
|
被write调用,在消息被放到outboundBuffer之前对消息进行处理,默认啥事都没干,就是把你传进去的msg还给你。
|