「java」从websocket服务器的启动分析netty3.10源码

时间:2022-09-05 14:34:15

**

1.首先是创建bootstrap对象

**
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
创建bootstrap对象的时候先创建一个频道工厂ChannelFactory,它会初始化boss线程和多个worker线程。

public NioServerSocketChannelFactory(
Executor bossExecutor, int bossCount, WorkerPool workerPool) {
this(new NioServerBossPool(bossExecutor, bossCount, null), workerPool);
}

worker线程的初始化方法在NioWorkerPool的构造函数调用

NioServerBossPool的初始化方法会创建NioWorker对象,每个worker对象都会创建一个死锁检测worker线程,worker对象的数量由用户定义

NioServerBoss构造方法会调用AbstractNioSelector.openSelector(determiner)方法打开多路复用器,同时调用DeadLockProofWorker.start()创建一个死锁检测worker线程,在这个线程中会通过用户设置的执行器executor跑NioWorker对象(它是个runable)。

NioServerBossPool的构造方法最后调用waitForWorkerThreads()方法等待所有的worker线程初始化完毕,这是通过闭锁来进行判断的,闭锁的完成条件是NioWorker对象的父方法run()运行起来后调用startupLatch.countDown()方法。

boss线程的初始化和worker差不多

**

2.绑定本地地址

**

public Channel bind(final SocketAddress localAddress) {
//bind的异步操作结果对象(里面会创建一个新的频道)
ChannelFuture future = bindAsync(localAddress);

//阻塞到绑定结束 Wait for the future.
future.awaitUninterruptibly();
if (!future.isSuccess()) {
future.getChannel().close().awaitUninterruptibly();
throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
}

return future.getChannel();

}

public ChannelFuture bindAsync(final SocketAddress localAddress) {
if (localAddress == null) {
throw new NullPointerException(“localAddress”);
}
//本质是一个频道处理器(专用于绑定频道到指定的本地地址)
Binder binder = new Binder(localAddress);//绑定者
//获取用户设置的父频道处理器
ChannelHandler parentHandler = getParentHandler();
//新建一个默认管道作为boss管道
ChannelPipeline bossPipeline = pipeline();
//添加绑定者到boss管道中
bossPipeline.addLast(“binder”, binder);
//用户设置了父频道处理器就加到boss管道中
if (parentHandler != null) {
bossPipeline.addLast(“userHandler”, parentHandler);
}

//通过用户设置的频道工厂创建一个持有boss管道的频道
Channel channel = getFactory().newChannel(bossPipeline);

//结果future
final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
//添加回调以得到结果
binder.bindFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
bfuture.setSuccess();
} else {
// Call close on bind failure
bfuture.getChannel().close();
bfuture.setFailure(future.getCause());
}
}
});
return bfuture;

}
在通过频道工厂NioServerSocketChannelFactory创建频道对象Channel的时候,NioServerSocketChannel的构造函数最后会调用fireChannelOpen(this)方法

public static void fireChannelOpen(Channel channel) {
// Notify the parent handler.
if (channel.getParent() != null) {
fireChildChannelStateChanged(channel.getParent(), channel);
}

channel.getPipeline().sendUpstream(
new UpstreamChannelStateEvent(
channel, ChannelState.OPEN, Boolean.TRUE));

}
可以看到在这里管道pipeline的sendUpstream()方法被调用,它会让管道持有的处理器handler对象顺着链表依次执行handleUpstream()方法。

而Binder是第一个注册的,所以它当然是第一个执行sendUpstream()方法(这是Binder父类SimpleChannelUpstreamHandler的方法)。最后会调用Binder的channelOpen()方法

private final class Binder extends SimpleChannelUpstreamHandler {

private final SocketAddress localAddress;
private final Map<String, Object> childOptions =
new HashMap<String, Object>();
private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false);
Binder(SocketAddress localAddress) {
this.localAddress = localAddress;
}

@Override
public void channelOpen(
ChannelHandlerContext ctx,
ChannelStateEvent evt) {

try {
evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

// Split options into two categories: parent and child.
Map<String, Object> allOptions = getOptions();
Map<String, Object> parentOptions = new HashMap<String, Object>();
for (Entry<String, Object> e: allOptions.entrySet()) {
if (e.getKey().startsWith("child.")) {
childOptions.put(
e.getKey().substring(6),
e.getValue());
} else if (!"pipelineFactory".equals(e.getKey())) {
parentOptions.put(e.getKey(), e.getValue());
}
}

// Apply parent options.
evt.getChannel().getConfig().setOptions(parentOptions);
} finally {
//调用下一个handler的sendUpstream()方法
ctx.sendUpstream(evt);
}
//调用频道Channel的bind()方法绑定本地地址
evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
bindFuture.setSuccess();
} else {
bindFuture.setFailure(future.getCause());
}
}
});
}

可以看到方法的结尾会调用频道Channel的bind()方法绑定本地地址。最后他会调用ChannelPipelin的sendDownStream()方法。

public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}

sendDownstream(tail, e);

}

如果有DownstreamHandler,它会让管道持有的处理器handler对象顺着链表依次执行handleDownstream()方法。而没有的时候就是执行ChannelSink的eventSunk()方法。这个sink是ChannelFactory创建channel创建的NioServerSocketPipelineSink。

public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception {
Channel channel = e.getChannel();
if (channel instanceof NioServerSocketChannel) {
handleServerSocket(e);
} else if (channel instanceof NioSocketChannel) {
handleAcceptedSocket(e);
}
}

接着调用handleServerSocket()方法处理绑定地址事件BOUND

private static void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}

ChannelStateEvent event = (ChannelStateEvent) e;
NioServerSocketChannel channel =
(NioServerSocketChannel) event.getChannel();
ChannelFuture future = event.getFuture();
ChannelState state = event.getState();
Object value = event.getValue();

switch (state) {
case OPEN:
if (Boolean.FALSE.equals(value)) {
((NioServerBoss) channel.boss).close(channel, future);
}
break;
case BOUND:
if (value != null) {
((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value);
} else {
((NioServerBoss) channel.boss).close(channel, future);
}
break;
default:
break;
}

}
最终调用到Boss的bind()方法,注册一个任务到boss对象的任务队列。这个任务会放到任务线程中进行处理。
void bind(final NioServerSocketChannel channel, final ChannelFuture future,
final SocketAddress localAddress) {
registerTask(new RegisterTask(channel, future, localAddress));
}
我们来看看这个任务做了什么

public void run() {
boolean bound = false;
boolean registered = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;

    future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);

registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}

}
可以看到它是通过nio的SelectableChannel绑定本地端口,有兴趣可以去了解下jdk的nio是怎么实现的。

public void run() {
boolean bound = false;
boolean registered = false;
try {
//通过nio的SelectableChannel绑定本地端口
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;

    future.setSuccess();
//绑定完本地地址后在发一个BOUND事件给上行处理器
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);

registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}

}
至此服务器基本启动完成