TCP半关闭状态分析和skynet对半关闭状态的支持

时间:2022-12-26 11:52:50

一、背景

TCP四次挥手中的半关闭状态是否需要解决,依赖于使用场景,大多数场景不解决也不会有影响,但有些场景(特别是游戏服务器)还是需要关注这个半关闭状态的。
所谓半关闭,就是只关闭读端或写端;半关闭状态是接收到FIN包并返回ACK包时 到 发送FIN包前的状态。

对半关闭状态进行了解决的有JAVA的netty、skynet开源框架。

大多数网络连接程序在read=0时即调用close()关闭TCP连接;但是,在read=0到调用close()之间,可能还有很多数据需要发送(send),如果read=0时即调用close()那么对端就收不到这些数据了。

对TCP三次握手和四次挥手不了解的,可以查阅前面的文章《Linux网络设计之TCP网络协议栈》。

二、TCP四次挥手流程

需要四次挥手的原因:希望将FIN包控制权交给应用程序去处理,以便处理断开连接的善后工作,即对端可能还有数据要发送。
TCP半关闭状态分析和skynet对半关闭状态的支持
四次挥手流程:

  1. 服务器收到FIN包,自动回复ACK包,进入CLOSE_WAIT状态。
  2. 此时TCP协议栈会为FIN包插入一个文件描述符EOF到内核态网络读缓冲区。
  3. 应用程序通过read=0来感知这个FIN包。
  4. 如果此时应用程序有数据要发送,则发送数据后调用关闭连接操作(发送FIN包到对端)。
  5. 双端关闭都需要一个FIN和一个ACK流程。
Client Server 四次挥手 FIN 进入FIN_WAIT_1状态 ACK 进入CLOSE_WAIT状态 进入FIN_WAIT_2状态 发送数据 FIN 进入LAST_ACK状态 ACK 进入TIME_WAIT状态 完成断开 alt [close] Client Server

三、发送FIN包的场景

  1. 关闭读端:调用shutdown(fd,SHUT_RD)。
  2. 关闭写端:调用shutdown(fd,SHUT_WR);半关闭连接,仍旧接收数据,继而等待对方关闭。
  3. 关闭双端:调用close(fd) 或者shoutdown(fd,SHUT_RDWR)。
  4. 关闭进程:内核协议栈会自动发送FIN包。

如果客户端想进入半关闭状态,需要关闭写端,服务端会把自己的读端关闭,从而进入半关闭状态。

如果客户端是关闭读端或者把读写端都关闭了(调用close(fd) 或者shoutdown(fd,SHUT_RDWR))或者是关闭了进程,那么会进入快速关闭流程:

Client Server 四次挥手 FIN 进入FIN_WAIT_1状态 ACK 进入CLOSE_WAIT状态 进入FIN_WAIT_2状态 FIN 进入LAST_ACK状态 RST 进入CLOSE状态,完成断开 完成断开 alt [close] Client Server

接受到对端回复的FIN包之后回复RST包,直接进入CLOSE状态,而没有TIME_WAIT状态,这是解决有大量TIME_WAIT现象的常用方法

思考:四次挥手中,为什么存在TIME_WAIT状态?
因为ACK不会重传,防止没有LAST_ACK或LAST_ACK丢失,对端没有收到LAST_ACK而一直重发FIN包。一直重发已经不存在的socket,可能会对新建立的连接造成干扰。

四、skynet 网络封装支持半关闭状态

skynet 网络层支持半关闭状态。skynet 采用 reactor 网络编程模型;reactor 网络模型是一种异步事件模型。
TCP半关闭状态分析和skynet对半关闭状态的支持
通过 epoll_ctl 设置 struct epoll_event 中 data.ptr =(struct socket *)ud; 来完成 fd 与 actor绑定。skynet 通过 socket.start(fd, func) 来完成 actor 与 fd 的绑定。

4.1、连接的建立

客户端与服务端建立连接处理:建立成功的标识是 listenfd,有读事件触发。
服务端与第三方服务建立连接:建立成功的标识是 connect 返回的fd,有可写事件触发。

4.2、连接断开

skynet 网络层支持半关闭状态。

(1)读写端都关闭,epoll的事件EPOLLHUP 读写段都关闭:
(socket_epoll.h)

static int 
sp_wait(int efd, struct event *e, int max) {
	struct epoll_event ev[max];
	int n = epoll_wait(efd , ev, max, -1);
	int i;
	for (i=0;i<n;i++) {
		e[i].s = ev[i].data.ptr;
		unsigned flag = ev[i].events;
		e[i].write = (flag & EPOLLOUT) != 0;
		e[i].read = (flag & EPOLLIN) != 0;
		e[i].error = (flag & EPOLLERR) != 0;
		e[i].eof = (flag & EPOLLHUP) != 0;
	}

	return n;
}

(socket_server.c)

// 处理:直接关闭并向 actor 返回事件 SOCKET_CLOSE
if (e->eof) {
	// For epoll (at least), FIN packets are exchanged both ways.
	// See: https://*.com/questions/52976152/tcp-when-is-epollhup-generated
	int halfclose = halfclose_read(s);
	force_close(ss, s, &l, result);
	 // 如果前面因为关闭读端已经发送SOCKET_CLOSE,在这里避免重复 SOCKET_CLOSE
	if (!halfclose) {
		return SOCKET_CLOSE;
	}
}

(2)检测读端关闭:
(socket_server.c)

// return -1 (ignore) when error
static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
	int sz = s->p.size;
	char * buffer = MALLOC(sz);
	int n = (int)read(s->fd, buffer, sz);
	if (n<0) {
		FREE(buffer);
		switch(errno) {
		case EINTR:
		case AGAIN_WOULDBLOCK:
			break;
		default:
			return report_error(s, result, strerror(errno));
		}
		return -1;
	}
	if (n==0) {
		FREE(buffer);
		if (s->closing) {	// 如果该连接的 socket 已经关闭
			// Rare case : if s->closing is true, reading event is disable, and SOCKET_CLOSE is raised.
			if (nomore_sending_data(s)) {
				force_close(ss,s,l,result);
			}
			return -1;
		}
		int t = ATOM_LOAD(&s->type);
		if (t == SOCKET_TYPE_HALFCLOSE_READ) {	// 如果已经处理过读端关闭
			// Rare case : Already shutdown read.
			return -1;
		}
		if (t == SOCKET_TYPE_HALFCLOSE_WRITE) {	// 如果之前已经处理过写端关闭,则直接 close
			// Remote shutdown read (write error) before.
			force_close(ss,s,l,result);
		} else {
			close_read(ss, s, result);
		}
		return SOCKET_CLOSE;
	}

	if (halfclose_read(s)) {
		// discard recv data (Rare case : if socket is HALFCLOSE_READ, reading event is disable.)
		FREE(buffer);
		return -1;
	}

	stat_read(ss,s,n);

	result->opaque = s->opaque;
	result->id = s->id;
	result->ud = n;
	result->data = buffer;

	if (n == sz) {
		s->p.size *= 2;
		return SOCKET_MORE;
	} else if (sz > MIN_READ_BUFFER && n*2 < sz) {
		s->p.size /= 2;
	}

	return SOCKET_DATA;
}

(3)检测写端关闭:write < 0 && errno == EPIPE,能检测对端读端关闭。
(socket_server.c)

static int
send_list_tcp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
	while (list->head) {
		struct write_buffer * tmp = list->head;
		for (;;) {
			ssize_t sz = write(s->fd, tmp->ptr, tmp->sz);
			if (sz < 0) {
				switch(errno) {
				case EINTR:
					continue;
				case AGAIN_WOULDBLOCK:
					return -1;
				}
				return close_write(ss, s, l, result);
			}
			stat_write(ss,s,(int)sz);
			s->wb_size -= sz;
			if (sz != tmp->sz) {
				tmp->ptr += sz;
				tmp->sz -= sz;
				return -1;
			}
			break;
		}
		list->head = tmp->next;
		write_buffer_free(ss,tmp);
	}
	list->tail = NULL;

	return -1;
}

4.3、消息到达

单线程读。读策略:

int n = read(fd, buf, sz);

sz 初始值为 64,根据从网络接收数据情况,动态调整 sz 的大小。
(socket_server.c)

// return -1 (ignore) when error
static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
	int sz = s->p.size;
	char * buffer = MALLOC(sz);
	int n = (int)read(s->fd, buffer, sz);
	if (n<0) {
		FREE(buffer);
		switch(errno) {
		case EINTR:
		case AGAIN_WOULDBLOCK:
			break;
		default:
			return report_error(s, result, strerror(errno));
		}
		return -1;
	}
	if (n==0) {
		FREE(buffer);
		if (s->closing) {
			// Rare case : if s->closing is true, reading event is disable, and SOCKET_CLOSE is raised.
			if (nomore_sending_data(s)) {
				force_close(ss,s,l,result);
			}
			return -1;
		}
		int t = ATOM_LOAD(&s->type);
		if (t == SOCKET_TYPE_HALFCLOSE_READ) {
			// Rare case : Already shutdown read.
			return -1;
		}
		if (t == SOCKET_TYPE_HALFCLOSE_WRITE) {
			// Remote shutdown read (write error) before.
			force_close(ss,s,l,result);
		} else {
			close_read(ss, s, result);
		}
		return SOCKET_CLOSE;
	}

	if (halfclose_read(s)) {
		// discard recv data (Rare case : if socket is HALFCLOSE_READ, reading event is disable.)
		FREE(buffer);
		return -1;
	}

	stat_read(ss,s,n);

	result->opaque = s->opaque;
	result->id = s->id;
	result->ud = n;
	result->data = buffer;

	if (n == sz) {
		s->p.size *= 2;
		return SOCKET_MORE;
	} else if (sz > MIN_READ_BUFFER && n*2 < sz) {
		s->p.size /= 2;
	}

	return SOCKET_DATA;
}

4.4、消息发送完毕

多线程写。同一个 fd 可以在不同的 actor 中发送数据。skynet底层通过加锁确保数据正确发送到 socket 的写缓冲区。

int 
socket_server_send(struct socket_server *ss, struct socket_sendbuffer *buf) {
	int id = buf->id;
	struct socket * s = &ss->slot[HASH_ID(id)];
	if (socket_invalid(s, id) || s->closing) {
		free_buffer(ss, buf);
		return -1;
	}

	struct socket_lock l;
	socket_lock_init(s, &l);
	
	// 确保能在fd上写数据,连接可用状态,检测 socket 线程是否在对该fd操作
	if (can_direct_write(s,id) && socket_trylock(&l)) {
		// may be we can send directly, double check
		if (can_direct_write(s,id)) {	// 双检测
			// send directly
			struct send_object so;
			send_object_init_from_sendbuffer(ss, &so, buf);
			ssize_t n;
			if (s->protocol == PROTOCOL_TCP) {
				// 尝试在 work 线程直接写,如果n >0,则写成功
				n = write(s->fd, so.buffer, so.sz);
			} else {
				union sockaddr_all sa;
				socklen_t sasz = udp_socket_address(s, s->p.udp_address, &sa);
				if (sasz == 0) {
					skynet_error(NULL, "socket-server : set udp (%d) address first.", id);
					socket_unlock(&l);
					so.free_func((void *)buf->buffer);
					return -1;
				}
				n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz);
			}
			if (n<0) {
				// ignore error, let socket thread try again
				 // 如果失败,不要在 work 线程中处理异常,所有网络异常在 socket 线程中处理
				n = 0;
			}
			stat_write(ss,s,n);
			if (n == so.sz) {
				// write done
				socket_unlock(&l);
				so.free_func((void *)buf->buffer);
				return 0;
			}
			// write failed, put buffer into s->dw_* , and let socket thread send it. see send_buffer()
			s->dw_buffer = clone_buffer(buf, &s->dw_size);
			s->dw_offset = n;

			socket_unlock(&l);

			struct request_package request;
			request.u.send.id = id;
			request.u.send.sz = 0;
			request.u.send.buffer = NULL;
			 // 如果写失败,可能写缓冲区满,或被中断打断,直接交由 socket 线程去重试。
			 // 这里通过 pipe 来与 socket 线程通信。
			// let socket thread enable write event
			send_request(ss, &request, 'W', sizeof(request.u.send));

			return 0;
		}
		socket_unlock(&l);
	}

	inc_sending_ref(s, id);

	struct request_package request;
	request.u.send.id = id;
	request.u.send.buffer = clone_buffer(buf, &request.u.send.sz);

	send_request(ss, &request, 'D', sizeof(request.u.send));
	return 0;
}

注意:在 work 线程中调用的。如果work线程写失败,可能写缓冲区满,或被中断打断,直接交由 socket 线程去重试。这里通过 pipe 来与 socket 线程通信。

五、测试skynet对半关闭的支持

main.lua

local skynet = require "skynet"
local socket = require "skynet.socket"

skynet.start(function ()
    local listenfd = socket.listen("0.0.0.0", 8888)
    socket.start(listenfd, function (clientfd, addr)
        print("receive a client:", clientfd, addr)
        socket.start(clientfd)
        while true do
            local data = socket.readline(clientfd, "\n")
            if not data then -- read = 0
                -- for i=1,10 do
                    socket.write(clientfd, "FINAL\n")
                -- end
                socket.close(clientfd)
                print("closed", clientfd)
                return
            end
            if data == "quit" then
                print("recv quit", clientfd)
                socket.close(clientfd)
                return
            else
                print("S recv:", data)
                socket.write(clientfd, "=> "..data.."\n")
            end
        end
    end)
    
end)

client.lua

package.cpath = "./skynet/luaclib/?.so"
package.path = "./skynet/lualib/?.lua;./?.lua"

if _VERSION ~= "Lua 5.4" then
	error "Use lua 5.4"
end

local socket = require "client.socket"

local fd = assert(socket.connect("127.0.0.1", 8888))

local function send_package(pack)
	socket.send(fd, pack .. "\n")
end

local function unpack_package(text)
	local size = #text
	if size < 1 then
		return nil, text
	end
    local pos = text:find("\n", 1, true)
	if not pos then
		return nil, text
	end

	return text:sub(1,pos-1), text:sub(pos+1)
end

local function recv_package(last)
	local result
	result, last = unpack_package(last)
	if result then
		return result, last
	end
	local r = socket.recv(fd)
	if not r then
		return nil, last
	end
	if r == "" then
		error "Server closed"
	end
	return unpack_package(last .. r)
end

local last = ""

local function dispatch_package()
	while true do
		local v
		v, last = recv_package(last)
		if not v then
			break
		end
        print(v)
	end
end

while true do
	dispatch_package()
	local cmd = socket.readstdin()
	if cmd then
		if cmd == "quit" then
			send_package("quit")
        elseif cmd == "shut_r" then
            -- send_package("shut_r")
            socket.shutdown(fd, "r")
            -- send_package("shut_r")
        elseif cmd == "shut_w" then
            -- send_package("shut_w")
            send_package("shut_w1")
            send_package("shut_w2")
            send_package("shut_w3")
            send_package("shut_w4")
            socket.shutdown(fd, "w")
		else
			send_package(cmd)
		end
	else
		socket.usleep(100)
	end
end

config

thread=4
logger=nil
harbor=0
start="main" -- 启动第一个服务
lua_path="./skynet/lualib/?.lua;".."./skynet/lualib/?/init.lua;".."./lualib/?.lua;"
luaservice="./skynet/service/?.lua;./app/?.lua"
lualoader="./skynet/lualib/loader.lua"
cpath="./skynet/cservice/?.so"
lua_cpath="./skynet/luaclib/?.so"

Makefile

SKYNET_PATH?=./skynet

all:
	cd $(SKYNET_PATH) && $(MAKE) PLAT='linux'

clean:
	cd $(SKYNET_PATH) && $(MAKE) clean

项目结构:

-- mytest
----+ skynet 
----| app
-------- main.lua
-------- client.lua
----+ config
----+ Makefile

编译和运行:

make
./skynet/skynet config

启动client.lua的方式:

./skynet/3rd/lua/lua ./app/client.lua

5.1、测试直接关闭进程

启动client.lua,然后CTL+C直接关闭进程。

./skynet/3rd/lua/lua ./app/client.lua
  1. 此时内核协议栈会收到一个FIN包,内核协议栈的读缓冲区插入EOF,并触发读事件。
  2. skynet开始读数据,读到EOF(即read()==0);开始关闭读事件和读端。
  3. 然后进入main.lua的while循环,尝试发送消息;但是消息是无法发送出去了,因为客户端进程已经关闭,资源已经被释放,协议栈会把其丢弃。
  4. 注意,读数据、写数据、业务逻辑都在线程池执行,socket网络线程只负责监听触发事件;socket网络线程和线程池通过pipeline消息交互,epoll可以监听这个消息,然后注册读、写事件。
socket_server.c
skynet_socket.c
skynet_start.c
close_read()
forward_message_tcp()
socket_server_poll()
forward_message()
skynet_socket_poll()
wakeup
thread_socket()

5.2、测试关闭读端

启动client.lua,输入shut_r命令。

$ ./skynet/3rd/lua/lua ./app/client.lua 
shut_r
  1. 关闭读端,会走RST流程,立即进入CLOSE状态,服务端会报出“Connection reset by peer”错误。
  2. 服务端关闭写端,但仍然能接受到客户端发送的数据。
  3. 接受完数据后才调用关闭读端,结束连接。
receive a client:       4       127.0.0.1:35384
S recv: shut_r
S recv: shut_r2
closed  4
[:00000008] socket: error on 4 Connection reset by peer

5.2、测试关闭写端

启动client.lua,输入shut_w命令。

$ ./skynet/3rd/lua/lua ./app/client.lua 
shut_w
  1. 关闭写端,服务器会关闭读端。
  2. 在完全关闭连接之前,服务器还可以发送数据到客户端。

server端:

receive a client:       5       127.0.0.1:35486
S recv: shut_w1
S recv: shut_w2
S recv: shut_w3
S recv: shut_w4
closed  5

client端:

shut_w
SHUTDOWN 3 1
=> shut_w1
=> shut_w2
=> shut_w3
=> shut_w4
FINAL

总结

ACK包是不会重发的,如果ACK包丢失了,那么发送端在一定时间内接受不到数据就会重发FIN包,一般三次FIN包都没有收到ACK就会直接进入close。

后言

本专栏知识点是通过<零声教育>的系统学习,进行梳理总结写下文章,对c/c++linux系统提升感兴趣的读者,可以点击链接查看详细的服务:C/C++服务器开发