【Java】同步阻塞式(BIO)TCP通信

时间:2022-03-07 21:18:48

TCP BIO

背景

网络编程的基本模型是Clien/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定的IP地址和监听端口),客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过socket进行通信。socket相当于通信的媒介,用来传输数据。

Java处理TCP的类主要有Socket和ServerSocket,基于传统同步阻塞模型(Blocking IO)开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功之后,双方通过输入和输出流进行同步阻塞式通信。

模型

下图是一个BIO的服务端通信模型:采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接受到客户端连接请求后为每一个客户端创建一个新的线程进行链路处理,处理完之后通过流应答给客户端,线程销毁。

【Java】同步阻塞式(BIO)TCP通信

这种模型的缺点十分明显,当客户端大量请求到达时,由于线程笨重、占用资源大等特点,服务端无法承受巨大的开销,易导致服务宕机。

代码

首先是客户端代码,在代码中可以看到,客户端向127.0.0.1:20006的地址发送连接请求,当连接建立时,通过IO流输出字符串到socket中,并读取socket返回的信息。代码比较简单,就没写注释了_

Client.java:

public class Client {
public static final int PORT = 20006;
public static final String ADDR = "127.0.0.1";
public static void main(String[] args) {
Socket socket = null;
BufferedReader in = null;
PrintStream out = null;
try {
socket = new Socket(ADDR, PORT);
socket.setSoTimeout(2000);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintStream(socket.getOutputStream());
out.println("1:Hello server!");
out.println("2:Hello server!");
out.println("bye");
String line = in.readLine();
System.out.println("Receive from server: " + line);
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (SocketTimeoutException e) {
System.out.println("server time out, exit!");
}
catch (IOException e) {
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out != null) {
out.close();
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

服务端代码如下,

服务端类首先创建ServerSocket对象,绑定要监听的端口,进行监听循环,当有客户端请求达到时,会触发accept事件并返回一个客户端socket对象,这时,就有多种处理方法了,我在Server.java中用注释写到三种处理客户端socket的方式:

1.单线程处理,即服务端只用一个线程处理客户端请求,一次只能处理一个请求,其他请求来了就在后面阻塞着吧。代码如注释中。

2.多线程处理,为每一个客户端请求分配一个线程去处理,处理完线程销毁,处理线程是Handler.java。

3.线程池处理,这个方法跟第二种方法区别主要在于分配了固定数量的线程,当一个线程处理完一个客户端请求时不会销毁而是回到线程池中。线程池类是HandlerExecutor.java。

Server.java

public class Server {
public static final int PORT = 20006;
public static void main(String[] args) throws IOException {
ServerSocket server = null;
try {
server = new ServerSocket(PORT);
Socket client = null;
System.out.println("Server is listening on " + PORT);
while (true) {
client = server.accept();
//单线程
// BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
// System.out.println(in.readLine());
// PrintWriter out = new PrintWriter(client.getOutputStream(), true);
// out.println("hello client");
// in.close();
// out.close(); //多线程
// new Thread(new Handler(client)).start(); //线程池
HandlerExecutor executor = new HandlerExecutor(10, 100);
executor.execute(new Handler(client));
// ExecutorService executor = Executors.newScheduledThreadPool(10);
// executor.submit(new Handler(client));
}
} finally {
if (server != null) {
System.out.println("server is closed!");
server.close();
}
}
}
}

处理类Handler.java

public class Handler implements Runnable {
private Socket socket;
public Handler (Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
while (true) {
String s = in.readLine();
if (s == null || "bye".equals(s)) {
break;
}
System.out.println(s);
}
PrintWriter out = new PrintWriter(this.socket.getOutputStream(), true);
out.println("hello client");
in.close();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

线程池类:HandlerExecutor.java

public class HandlerExecutor {
private ExecutorService executor;
public HandlerExecutor(int maxPoolSize, int queueSize) {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task) {
executor.submit(task);
}
}

参考文献

  • 《Netty权威指南》2.1章节