1 传统socket网络编程
1.1 实战
服务端:ServerBoot
package com.javaedge.netty.ch2;
/**
* @author JavaEdge
*/
public class ServerBoot {
private static final int PORT = 8000;
public static void main(String[] args) {
Server server = new Server(PORT);
server.start();
}
}
Server
package com.javaedge.netty.ch2;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author JavaEdge
*/
public class Server {
private ServerSocket serverSocket;
public Server(int port) {
try {
this.serverSocket = new ServerSocket(port);
System.out.println("Server starts success,端口:" + port);
} catch (IOException exception) {
System.out.println("Server starts failed");
}
}
public void start() {
new Thread(() -> doStart()).start();
}
private void doStart() {
while (true) {
try {
Socket client = serverSocket.accept();
new ClientHandler(client).start();
} catch (IOException e) {
System.out.println("Server failure");
}
}
}
}
ClientHandler
package com.javaedge.netty.ch2;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
/**
* @author JavaEdge
*/
public class ClientHandler {
public static final int MAX_DATA_LEN = 1024;
private final Socket socket;
public ClientHandler(Socket socket) {
this.socket = socket;
}
public void start() {
System.out.println("新客户端接入");
new Thread(() -> doStart()).start();
}
private void doStart() {
try {
InputStream inputStream = socket.getInputStream();
while (true) {
byte[] data = new byte[MAX_DATA_LEN];
int len;
while ((len = inputStream.read(data)) != -1) {
String message = new String(data, 0, len);
System.out.println("客户端传来消息: " + message);
socket.getOutputStream().write(data);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
Client
package com.javaedge.netty.ch2;
import java.io.IOException;
import java.net.Socket;
/**
* @author JavaEdge
*/
public class Client {
private static final String HOST = "127.0.0.1";
private static final int PORT = 8000;
private static final int SLEEP_TIME = 5000;
public static void main(String[] args) throws IOException {
final Socket socket = new Socket(HOST, PORT);
new Thread(() -> {
System.out.println("客户端启动成功!");
while (true) {
try {
String message = "hello world";
System.out.println("客户端发送数据: " + message);
socket.getOutputStream().write(message.getBytes());
} catch (Exception e) {
System.out.println("写数据出错!");
}
sleep();
}
}).start();
}
private static void sleep() {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
先后启动 ServerBoot
、Client
,输出
Server starts success,端口:8000
新客户端接入
客户端传来消息: hello worldhello world
客户端传来消息: hello world
客户端传来消息: hello world
客户端传来消息: hello world
客户端传来消息: hello world
客户端启动成功!
客户端发送数据: hello world
客户端发送数据: hello world
客户端发送数据: hello world
1.2 传统HTTP服务器原理
- 创建一个
ServerSocket
- 监听并绑定一个端口一系列客户端来请求这个端口服务器使用Accept,获得一个来自客户端的Socket连接对象
- 启动一个新线程处理连接读Socket,
- 得到字节流解码协议
- 得到Http请求对象处理Http请求
- 得到一个结果
- 封装成一个HttpResponse对象编码协议
- 将结果序列化字节流写Socket,
- 将字节流发给客户端
1.3 C/S 交互流程
2 Netty版socket网络编程
3 Netty核心组件
3.1 NioEventLoop
3.1.1 简介
① EventLoop
一个 EventLoop 就是一个 eventexecutor:
package io.netty.channel;
import io.netty.util.concurrent.OrderedEventExecutor;
/**
* 一旦Channel注册了,将处理所有的 I/O 操作
* 一个 EventLoop 实例通常处理一个以上的Channel ,但这取决于执行的细节和内幕
*/
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
NioEventLoopGroup
是一个处理 I/O 操作的多线程事件循环。Netty 为不同类型的传输提供各种 EventLoopGroup
实现。
示例代码中实现服务器端应用程序,将使用两个NioEventLoopGroup:
- boss,接受传入的连接。因为accept事件只需建立一次连接,连接是可以复用的,accept只接受一次
- work,在上司接受连接并登记到工作人员后,处理接受连接的流量。使用多少线程及如何映射到创建的通道,取决于 EventLoopGroup 实现,甚至可能通过构造函数配置
Netty的发动机:
Server端
Client端
while(true)
就对应一个 run 方法。
NioEventLoop#run
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
}
}
netty有不同I/O编程模型实现。以NIO为例,对IO事件的处理是在NioEventLoop里,事件的注册:
private void processSelectedKeys() {
if (selectedKeys != null) {
// 不用 JDK 的 selector.selectedKeys,性能更好(%1-2%),GC更少
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
不同事件调用unsafe的不同方法,Netty对底层socket的操作都通过
unsafe
- NioMessageUnsafe
NioServerSocketChannel
使用NioMessageUnsafe做socket操作
- NioByteUnsafe
NioSocketChannel
使用NioByteUnsafe做socket操作
处理每个连接:
② EventExecutorGroup
负责:
- 经由其使用
next()
方法,提供EventExecutor
- 处理自己的生命周期,并允许在全局模式中关闭它们
③ EventExecutor
特殊的EventExecutorGroup
,附带一些快捷方法,看是否有Thread
在事件循环执行。
④ EventLoopGroup
特殊的 EventExecutorGroup
,允许注册 Channel
,即事件循环期间可执行 channel 操作,得到处理,供以后选用。
3.2 Channel
- 以服务端的NioMessageUnsafe为例来看下read()方法的实现,对应是否有新连接进来的情况
直接把底层的 channel 封装成 NioSocketChannel
3.3 ByteBuf
3.4 Pipeline
netty 将其抽象成逻辑链,netty怎么把每个 pipeline 加入到客户端连接的呢?
AbstractChannel
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
3.5 ChannelHandler