【Java TCP/IP Socket编程】----深入剖析----TCP数据传输中的死锁和性能

时间:2024-03-28 15:37:06

目录

 

死锁问题

数据传输性能

案例


--------笔记来自于书籍《Java TCP/IP Socket编程》

死锁问题

在TCP数据传输底层实现中(详细参见https://blog.csdn.net/lili13897741554/article/details/83104539)可能会出现死锁的情况,因此程序协议必须设计得非常小心,避免死锁的发生。以下情况可能导致死锁:

1.每个对等端都在阻塞等待其他端完成一些工作,例如:如果在连接建立后,客户端和服务器端都立即尝试接收数据,显然将导致死锁。

2.SendQ和RecvQ缓冲区的容量在具体实现时会收到一定的限制,如果与TCP的流向控制机制结合使用,有可能到产生死锁的情况。

1)详细说明

      虽然SendQ和RecvQ缓冲区容量使用的实际大小会动态地增大和收缩,还是需要一个硬性限制,防止行为异常的程序所控制的单独一个TCP连接将系统的内存全部耗尽,由于这些缓冲区的容量有限,它们可能被填满。一旦RecvQ已满,TCP流控制机制就会产生作用。它将阻止传输发送端主机的SendQ中的任何数据,直到接收者调用输入流的read()方法后腾出空间。(使用流控制机制的目的就是为了保证发送者不会传输太多数据,而超出了接收系统的处理能力。)发送程序可以持续地写出数据,直到SendQ队列被填满,然而,如果SendQ队列已满时调用out.write()方法,则将阻塞等待,直到有新的空间为止,也就是说直到一些字节传输到了接收到套接字的RecvQ队列中。如果此时RecvQ队列也已经被填满,所有操作都将停止,直到接收程序调用in.read()方法将一些字节传输到Delivered队列中。

    假设SendQ队列和RecvQ队列大小分别是SQS和RQS。将一个大小为n的字节数组传递给write()方法调用,其中n>SQS,直到有至少n-SQS字节传递到接收端主机的RecvQ对列后,该方法才会返回。如果n大小超过了(SQS+RQS),write()方法则将在接收程序从输入流中读取了n-(SQS+RQS)字节后才会返回。如果接收程序没有调用read()方法,大数量的send()调用则无法成功。特别是当连接的两端同时分别调用它们输出流的write()方法,而它们的缓冲区大小又大于SQS+RQS时将发生死锁,两个write操作都不能完成,两个程序都将永远保持阻塞状态。

2)具体实例

     主机A上的程序和主机B上的程序之间有一个连接,假设A和B上的SQS和RQS都是500字节,两个程序试图同时发送1500字节时的情况。主机A上的程序中前500字节已经传输到了另一端,另外500字节已经复制到了主机A的SendQ队列红,余下的500字节则无法发送(因此out.write()方法将无法返回)直到主机B上的RecvQ队列有空间空出来,然后主机B上程序也遇到同样的情况,因此两个程序的write()方法调用都永远无法完成。

    注意点:要仔细设计协议,以避免两个方向上传输大量数据时产生死锁。

【Java TCP/IP Socket编程】----深入剖析----TCP数据传输中的死锁和性能

解决方法:

1)方案之一,是在不同的线程中执行客户端的write()循环和read()循环。一个线程在客户端写完数据后调用该套接字的shutdownOutput()方法,另外一个线程从连接到服务器的输入流中反复读取服务器端反馈给客户端的信息,直到到达输入流的结尾(即服务器端关闭套接字)。如果一个线程阻塞了,另外一个线程仍然可以独立运行。

2)不使用多线程,使用NIO来解决(非阻塞Channel和Selector)

数据传输性能

    在TCP实现中,将用户数据复制到SendQ队列中不仅是因为可能重传数据,这还与性能有关。尤其是SendQ和RecvQ缓冲队列的大小,会对TCP连接的数据吞吐量产生影响。吞吐量是指用户数据字节从发送端发送到接收程序的频率。在要传输大量数据的程序中,我们希望最大化这个频率。在没有网络容量或其他限制的情况下,越大的缓冲区通常能够实现越高吞吐量。

    如果传输n字节的数据,使用大小为n的缓冲区调用一次write()方法,通常要比使用大小为1字节的缓冲区调用n次write()方法效率高很多。然而,如果调用writer()方法是使用了比SQS(SendQ队列大小)大很多的缓冲区,系统还需要将数据从用户地址转换为大小为SQS的块。即套接字底层实现先将SendQ队列缓冲区填满,等待TCP协议将数据转移出去,再重新填满SendQ队列缓冲区,再等待数据转移,反复进行。套接字底层实现每次都要等待数据从SendQ队列中移除,这就一系统消息的形式消耗形式(系统需要上下文切换)浪费一些时间。由此可知,调用write()方法时实际有效缓冲区大小受到SQS限制,同样read()方法也会受到RQS大小的限制。

    需要注意的是:只有当程序一次发送比缓冲区容量大很多的数据,并且要求程序的吞吐量时,可以考虑通过Socket的setSendBufferSize()和sendReceiveBufferSize()方法来改变发送和接收缓冲区的大小。

案例

    案例实现:客户端读取文件,然后将文件发送到服务器端,服务器端接收到未压缩的数据,将数据进行简单地压缩并返回给客户端。

1.客户端代码

public class CompressClient {
  private static final int BUFSIZE = 256;
  private static final String FILENAME = "D:\\java.txt";

  public static void main(String[] args) throws IOException {
    FileInputStream fileIn = new FileInputStream(FILENAME);
    FileOutputStream fileOut = new FileOutputStream(FILENAME + ".gz");
    Socket clientSocket = new Socket("127.0.0.1", 1234);
    InputStream in = clientSocket.getInputStream();
    sendBytes(clientSocket, fileIn);
    int bytesRead;
    byte[] buffer = new byte[BUFSIZE];
    while ((bytesRead = in.read(buffer)) != -1) {
      fileOut.write(buffer, 0, bytesRead);
      System.out.println("R");
    }
    fileIn.close();
    fileOut.close();
    in.close();
  }

  private static void sendBytes(Socket socket, InputStream fileIn) throws IOException {
    OutputStream out = socket.getOutputStream();
    int bytesRead = 0;
    byte[] buffer = new byte[BUFSIZE];
    while ((bytesRead = fileIn.read(buffer)) != -1) {
      out.write(buffer, 0, bytesRead);
      System.out.println("W");
    }
    socket.shutdownOutput();
  }
}

2.服务器端的代码

public class CompressServerExecutor {
  public static void main(String[] args) throws IOException {
    ServerSocket serverSocket = new ServerSocket(1234);
    ExecutorService service = Executors.newCachedThreadPool();
    Logger logger = Logger.getLogger("pratical");
    while(true) {
      Socket clientSocket = serverSocket.accept();
      service.submit(new CompressProtocol(clientSocket,logger));
    }
  }
}
public class CompressProtocol implements Runnable {
  private static final int BUFSIZE= 1024;
  private Socket clientSocket;
  private Logger logger;
  public CompressProtocol(Socket socket,Logger logger) {
    this.clientSocket=socket;
    this.logger=logger;
  }
  
  private static void handleCompressClient(Socket clientSocket,Logger logger) {
    try {
      InputStream in = clientSocket.getInputStream();
      GZIPOutputStream out = new GZIPOutputStream(clientSocket.getOutputStream());
      byte[] buffer = new byte[BUFSIZE];
      int bytesRead;
      while((bytesRead = in.read(buffer))!=-1) {
        out.write(buffer, 0, bytesRead);
        out.finish(); //刷新流
      }
      logger.info("client "+clientSocket.getRemoteSocketAddress()+" finished");
      clientSocket.close();
    } catch (IOException e) {
      logger.log(Level.WARNING,"Exception in echo Protocol",e);
    }
  }
  
  @Override
  public void run() {
    handleCompressClient(clientSocket,logger);
  }
}

分析可能出现的情况:

客户端和服务器端的SendQ队列和RecvQ队列中都有500字节的数据,而客户端发送了一个大小为10000字节(未压缩)的文件,同时假设对于这个文件,服务器读取1000字节并返回500字节,即压缩比2:1。当客户端发送了2000字节后,服务器端最终全部读取这些字节,并发回1000字节,此时客户端的RecvQ队列和服务器端的SendQ队列都将被填满。当客户端又发送了1000字节并被服务器全部读取后,服务器端后续的任何write操作尝试都将阻塞。当客户端又发送了另外1000字节后,客户端的SendQ队列和服务器端的RecvQ队列都将填满,后续客户端write操作将阻塞,从而形成死锁。

解决方案之一:在不同的线程里面执行客户端write()和read()循环。即将CompressClient.java中sendBytes()方法放到线程里面

public class CompressClient {
  private static final int BUFSIZE = 256;
  private static final String FILENAME = "D:\\java.txt";

  public static void main(String[] args) throws IOException {
    FileInputStream fileIn = new FileInputStream(FILENAME);
    FileOutputStream fileOut = new FileOutputStream(FILENAME + ".gz");
    Socket clientSocket = new Socket("127.0.0.1", 1234);
    InputStream in = clientSocket.getInputStream();
    //sendBytes(clientSocket, fileIn);
    Thread thread = new Thread() {
      public void run() {
        try {
          sendBytes(clientSocket,fileIn);
        }catch(IOException e) {
        }
      }
    };
    thread.start();
    int bytesRead;
    byte[] buffer = new byte[BUFSIZE];
    while ((bytesRead = in.read(buffer)) != -1) {
      fileOut.write(buffer, 0, bytesRead);
      System.out.println("R");
    }
    fileIn.close();
    fileOut.close();
    in.close();
  }

  private static void sendBytes(Socket socket, InputStream fileIn) throws IOException {
    OutputStream out = socket.getOutputStream();
    int bytesRead = 0;
    byte[] buffer = new byte[BUFSIZE];
    while ((bytesRead = fileIn.read(buffer)) != -1) {
      out.write(buffer, 0, bytesRead);
      System.out.println("W");
    }
    socket.shutdownOutput();
  }
}