前言
上一篇中提到了惊群现象,笔者本篇将非常严谨的记录关于惊群的一系列问题,同样根据UNP的编排进行。
1. thundering herd
- When the program starts, N children are created, and all N call accept and all are put to sleep by the kernel (line 140, p. 458 of TCPv2). When the first client connectionarrives, all N children are awakened.
- This is because all N have gone to sleep on the same ‘‘wait channel,’’ the so_timeo member of the socket structure, because all N share the same listening descriptor, which points to the same socket structure. Even though all N are awakened, the first of the N to run will obtain the connection and the remaining N − 1 will all go back to sleep, because when each of the remaining N − 1 execute the statement on line 135 of p. 458 of TCPv2, the queue length will be 0 since the first child to run already took the connection.
- This is sometimes called the thundering herd problem because all N are awakened even though only one will obtain the connection. Nevertheless, the code works, with the performance side effect of waking up too many processes each time a connection is ready to be accepted.
以上引用来自UNP,甚至给出了内核级别的解释,言简意赅的说,就是当多个进程阻塞在accept
时,当有client请求连接时,唤醒全部阻塞在该函数的进程,但却只有一个进程可以得到连接。造成了不必要的进程调度的时间的浪费。
2. 现状
从linux 2.6
以后,由内核给出了解决方案,accept
不再造成thundering herd
现象。具体可以参考论文:accept() scal ability on linux。但是本文还是继续探讨这个问题,因为如果使用select
或者poll
,epoll
处理listen sockfd时,还是可能造成惊群现象的发生。
特别的UNP中都是比较老的API接口,现在大部分的Server都使用epoll
,本文也将根据UNP的方式但是选择使用epoll
代替原文的select
。
3. epoll回顾
笔者在APUE就讨论过epoll
的使用方法,但是并没有区别下ET和LT模式,在该篇进行一下更进一步的学习,以下引用来自《MAN》手册:
The epoll event distribution interface is able to behave both as edge-triggered (ET) and as level-triggered (LT). The dif‐ference between the two mechanisms can be described as follows. Suppose that this scenario happens:
学过单片机的很容易理解边沿触发和水平触发的区别。
- The file descriptor that represents the read side of a pipe (rfd) is registered on the epoll instance.
- A pipe writer writes 2 kB of data on the write side of the pipe.
- A call to epoll_wait(2) is done that will return rfd as a ready file descriptor.
- The pipe reader reads 1 kB of data from rfd.
- A call to epoll_wait(2) is done.
MAN手册根据以上场景说明这个问题,非常清晰,容易理解:
边沿触下的情况:
- If the rfd file descriptor has been added to the epoll interface using the EPOLLET (edge-triggered) flag, the call to epoll_wait(2) done in step 5 will probably hang despite the available data still present in the file input buffer; meanwhile the remote peer might be expecting a response based on the data it already sent. The reason for this is that edge-triggered mode delivers events only when changes occur on the monitored file descriptor. So, in step 5 the caller might end up waiting for some data that is already present inside the input buffer.
- In the above example, an event on rfd will be generated because of the write done in 2 and the event is consumed in 3. Since the read operation done in 4 does not consume the whole buffer data, the call to epoll_wait(2) done in step 5 might block indefinitely.
所以这里说明了边沿触发的特点,及引发问题的原因:在第一次epoll返回时,没有读取干净缓冲区。
- An application that employs the EPOLLET flag should use nonblocking file descriptors to avoid having a blocking read or write starve a task that is handling multiple file descriptors.
The suggested way to use epoll as an edge-triggered (EPOLLET) interface is as follows:
- with nonblocking file descriptors; and
使用边沿触发,首先监控的fd设置为非阻塞
- by waiting for an event only after read(2) or write(2) return EAGAIN.
读或者写直到EAGAIN错误出现,才进行第二轮epoll调用
水平触发下的情况:
这里MAN手册一笔带过,和poll一样,也就是说epoll返回的条件:只要读缓冲区还有数据或者写缓冲区还有空间。
3. 惊群避免方式
3.1 综述
UNP中给出了避免惊群的三种方式:文件锁,线程锁和文件描述符传递。虽然那个时候是针对accept
函数来说的,现在笔者将方法运用到epoll
模型上,为了不浪费额外的时间,这里只针对线程锁在进程中的应用,这种解决方法进行编程实现。最后再简单和Nginx中的实现方式进行对比,来结束本篇。
3.2 进程间mutex
关于进程间的mutex,笔者早在多线程之Mutex(二)里面详细的记录过使用方法,现在可以快速编程了,以下程序修改自笔者以前的网络编程之Client/Server Design Prefork Server(三):
//30_lock_preser.c
#include <sys/socket.h>
#include <sys/un.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <errno.h>
#include <err.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/wait.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <pthread.h>
#include <sys/epoll.h>
#define KEY 0x1 /* key for first message queue */
#define LISTENQ 10
#define MAXLINE 20
#define MAXN 16384
#define handle_error(msg) \
do { perror(msg); exit(EXIT_FAILURE); } while (0)
#define handle_error_en(en, msg) \
do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)
typedef struct {
long mtype;
char mtext[MAXLINE];
} Mymsg;
static int read_cnt;
static char *read_ptr;
static char read_buf[MAXLINE];
char flag;
ssize_t readline(int fd, void *vptr, size_t maxlen);
static ssize_t my_read(int fd, char *ptr) ;
ssize_t writen(int fd, const void *vptr, size_t n);
void pr_cpu_time(void);
pid_t child_make(int, long *, int listenfd);
void child_main(int,long *, int listenfd);
static pthread_mutex_t *mptr;
int main(int argc, char **argv){
int listenfd, connfd,ident,flags;
socklen_t clilen;
int nchildren;
in_port_t port;
pid_t *pids;
long * ptr;
Mymsg msg;
int i=0;
int on=1;
struct sockaddr_in cliaddr, servaddr;
pthread_mutexattr_t mattr;
int err;
if (argc != 4)
errx(1,"tcp_fork_server <addr> <port> <childnum>\n");
nchildren = atoi(argv[3]);
if((pids = calloc(nchildren, sizeof(pid_t))) == NULL)
handle_error("calloc");
port = atoi(argv[2]);
if((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
handle_error("socket");
if(setsockopt(listenfd, SOL_SOCKET,SO_REUSEADDR, &on, sizeof(on)) == -1)
handle_error("setsockopt");
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
if(inet_pton(AF_INET, argv[1], &servaddr.sin_addr) == -1)
handle_error("inet_pton");
//这种匿名映射共享内存只能用于具有一定关系的进程中通信
if((ptr = mmap(0, nchildren * sizeof(long), PROT_READ | PROT_WRITE,MAP_ANON | MAP_SHARED, -1, 0)) == MAP_FAILED)
handle_error("mmap");
if((mptr = mmap(0, nchildren * sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE,MAP_ANON | MAP_SHARED, -1, 0)) == MAP_FAILED)
handle_error("mmap");
if((err = pthread_mutexattr_init(&mattr)) != 0)
handle_error_en(err,"pthread_mutexattr_init");
if((err = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) != 0)
handle_error_en(err,"pthread_mutexattr_setpshared");
if((err = pthread_mutex_init(mptr, &mattr)) != 0)
handle_error_en(err,"pthread_mutex_init");
if((flags = fcntl(listenfd, F_GETFL, 0)) == -1)
handle_error("fcntl");
else
if(fcntl(listenfd, F_SETFL, flags | O_NONBLOCK) == -1)
handle_error("fcntl");
if(bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) == -1)
handle_error("bind");
if(listen(listenfd, LISTENQ) == -1)
handle_error("listen");
for (i = 0; i < nchildren; i++)
pids[i] = child_make(i,ptr,listenfd); /* parent returns */
for(;;){
//和上一篇的逻辑比起来,这里就需要把message IPC的判断放在这里
if(!flag){
if((ident=msgget(KEY,0660)) == -1 )
continue;
flag=1;
}
if(flag)
//每次判断是否client发送消息给server
if (msgrcv(ident,&msg,MAXLINE,1,IPC_NOWAIT) == -1){
if(errno != ENOMSG)
handle_error("msgrcv");
}
else{
for (i = 0; i < nchildren; i++)
kill(pids[i], SIGTERM);
while (wait(NULL) > 0);
if (errno != ECHILD)
errx(1,"wait error");
pr_cpu_time();
for(i =0;i<nchildren;i++)
printf("child %d connected number:%d\n",i,ptr[i]);
msg.mtype=2;
memcpy(msg.mtext,"done",5);
if (msgsnd(ident,&msg,MAXLINE,0) == -1 )
handle_error("msgrcv");
return 0;
}
}
}
void pr_cpu_time(void){
double user, sys;
struct rusage myusage, childusage;
if (getrusage(RUSAGE_SELF, &myusage) < 0)
handle_error("getrusage error");
if (getrusage(RUSAGE_CHILDREN, &childusage) < 0)
handle_error("getrusage error");
user = (double) myusage.ru_utime.tv_sec +myusage.ru_utime.tv_usec / 1000000.0;
user += (double) childusage.ru_utime.tv_sec +childusage.ru_utime.tv_usec / 1000000.0;
sys = (double) myusage.ru_stime.tv_sec + myusage.ru_stime.tv_usec / 1000000.0;
sys += (double) childusage.ru_stime.tv_sec + childusage.ru_stime.tv_usec / 1000000.0;
printf("\nuser time = %g, sys time = %g\n", user, sys);
}
pid_t child_make(int i, long * ptr, int listenfd){
pid_t pid;
if ((pid = fork()) <0)
handle_error("fork");
else if(pid > 0)
return (pid); /* parent */
else
child_main(i, ptr,listenfd); /* never returns */
}
void child_main(int j, long * ptr, int listenfd){
int connfd;
socklen_t clilen;
struct sockaddr *cliaddr;
int err,efd,m;
struct epoll_event even1,events[2];
int ntowrite;
ssize_t nread;
char line[MAXLINE], result[MAXN];
even1.events=EPOLLIN;
even1.data.fd=listenfd;
if((efd=epoll_create(2)) == -1)
handle_error("epoll_create");
if(epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&even1) == -1)
handle_error("epoll_ctl");
if((cliaddr = malloc(sizeof(struct sockaddr_in))) == NULL)
handle_error("malloc");
printf("child %ld starting\n", (long) getpid());
for ( ; ; ) {
if((err = pthread_mutex_lock(mptr)) != 0)
handle_error_en(err,"pthread_mutex_lock");
//debug
//printf("hold the mutex : %d child\n",j);
if((m=epoll_wait(efd,events,2,-1)) == -1)
handle_error("epoll_wait");
for(int i=0;i<m;i++)
if(events[i].events & EPOLLIN == EPOLLIN){
if(events[i].data.fd == listenfd){
clilen = sizeof(struct sockaddr_in);
if((connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &clilen)) == -1 )
handle_error("accept");
even1.data.fd=connfd;
if(epoll_ctl(efd,EPOLL_CTL_DEL,listenfd,NULL) == -1)
handle_error("epoll_ctl");
if(epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&even1) == -1)
handle_error("epoll_ctl");
break;
}
if(events[i].data.fd == connfd){
if((nread=readline(connfd, line, MAXLINE)) == -1)
handle_error("readline");
else if(nread == 0){
even1.data.fd=listenfd;
if(epoll_ctl(efd,EPOLL_CTL_DEL,connfd,NULL) == -1)
handle_error("epoll_ctl");
if(epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&even1) == -1)
handle_error("epoll_ctl");
//在共享存储区域,每个client对应一个long存储区域
ptr[j]++;
if(close(connfd) == -1)
handle_error("close"); /* parent closes connected socket */
break;
}
ntowrite = atol(line);
if ((ntowrite <= 0) || (ntowrite > MAXN))
errx(1,"client request for %d bytes,max size is %d\n", ntowrite,MAXN);
if(writen(connfd, result, ntowrite) == -1)
handle_error("writen");
break;
}
}
if((err = pthread_mutex_unlock(mptr)) != 0)
handle_error_en(err,"pthread_mutex_unlock");
}
}
ssize_t writen(int fd, const void *vptr, size_t n){
size_t nleft;
ssize_t nwritten;
const char *ptr;
ptr = vptr;
nleft = n;
while (nleft > 0){
if ( (nwritten = write(fd, ptr, nleft)) <= 0){
if (nwritten < 0 && errno == EINTR)
nwritten = 0; /* and call write() again */
else
return (-1); /* error */
}
nleft -= nwritten;
ptr += nwritten;
}
return (n);
}
static ssize_t my_read(int fd, char *ptr){
if (read_cnt <= 0) {
again:
if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) {
if (errno == EINTR)
goto again;
return (-1);
} else if (read_cnt == 0)
return (0);
read_ptr = read_buf;
}
read_cnt--;
*ptr = *read_ptr++;
return (1);
}
ssize_t readline(int fd, void *vptr, size_t maxlen){
ssize_t n, rc;
char c, *ptr;
ptr = vptr;
for (n = 1; n < maxlen; n++) {
if ( (rc = my_read(fd, &c)) == 1) {
*ptr++ = c;
if (c == '\n')
break; /* newline is stored, like fgets() */
} else if (rc == 0) {
*ptr = 0;
return (n - 1); /* EOF, n - 1 bytes were read */
} else
return (-1); /* error, errno set by read() */
}
*ptr = 0; /* null terminate like fgets() */
return (n);
}
运行结果:
//testbench端
[root@localhost ~]# ./30_testbench 127.0.0.1 12345 5 100 2000
[root@localhost ~]# ./30_testbench 127.0.0.1 12345 5 100 2000
[root@localhost ~]# ./30_lock_preser 127.0.0.1 12345 5
child 1710 starting
child 1709 starting
child 1708 starting
child 1707 starting
child 1706 starting
user time = 1.77946, sys time = 3.42755
child 0 connected number:0
child 1 connected number:0
child 2 connected number:0
child 3 connected number:0
child 4 connected number:500
[root@localhost ~]# ./30_lock_preser 127.0.0.1 12345 5
child 1722 starting
child 1721 starting
child 1720 starting
child 1719 starting
child 1718 starting
user time = 0.930036, sys time = 1.72412
child 0 connected number:0
child 1 connected number:107
child 2 connected number:0
child 3 connected number:393
child 4 connected number:0
现在笔者敲的这个server,论代码量和复杂度都完虐Tinyhttp啦,不过可以看到,负载一点也不平衡,这意味着多进程的优势也不能得到最大程度的发挥。特别的如果全部的500个连接的请求平摊到5个进程,响应速度应该更快,所以笔者还要优化自己的server:
for ( ; ; ) {
//笔者就是简单的平衡下负载,且该方式是hard code
if( ptr[j] >100)
continue;
if((err = pthread_mutex_lock(mptr)) != 0)
handle_error_en(err,"pthread_mutex_lock");
....
结果:
[root@localhost ~]# ./30_lock_preser 127.0.0.1 12345 5
child 2118 starting
child 2117 starting
child 2116 starting
child 2115 starting
child 2114 starting
user time = 1.50234, sys time = 2.46653
child 0 connected number:101
child 1 connected number:101
child 2 connected number:101
child 3 connected number:96
child 4 connected number:101
虽然负载平衡了,但是运行时间仿佛并没有改善,不增加篇幅,第二篇将对比Nginx的处理方式。