epoll 惊群处理

时间:2021-04-28 23:00:11
 #include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netdb.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/wait.h>
#include <unistd.h>
#include <semaphore.h>
#include <sys/shm.h>
#define IP "127.0.0.1"
#define PORT 8888
#define PROCESS_NUM 4
#define MAXEVENTS 64 static int
create_and_bind ()
{
int fd = socket (PF_INET, SOCK_STREAM, );
struct sockaddr_in serveraddr;
serveraddr.sin_family = AF_INET;
inet_pton (AF_INET, IP, &serveraddr.sin_addr);
serveraddr.sin_port = htons (PORT);
bind (fd, (struct sockaddr *) &serveraddr, sizeof (serveraddr));
return fd;
} static int
make_socket_non_blocking (int sfd)
{
int flags, s;
flags = fcntl (sfd, F_GETFL, );
if (flags == -)
{
perror ("fcntl");
return -;
}
flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -)
{
perror ("fcntl");
return -;
}
return ;
} void
worker (int sfd, int efd, struct epoll_event *events, int k, sem_t * sem)
{
/* The event loop */
struct epoll_event event;
// struct epoll_event *events;
efd = epoll_create (MAXEVENTS);
if (efd == -)
{
perror ("epoll_create");
abort ();
}
int epoll_lock = ;
while ()
{
int n, i;
int s;
event.data.fd = sfd;
event.events = EPOLLIN;
if ( == sem_trywait (sem))
{
//拿到锁的进程将listen 描述符加入epoll
if (!epoll_lock)
{
fprintf (stderr, "%d >>>get lock\n", k);
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -)
{
perror ("epoll_ctl");
abort ();
}
epoll_lock = ;
}
}
else
{
fprintf (stderr, "%d not lock\n", k);
//没有拿到锁的进程 将lisfd 从epoll 中去掉
if (epoll_lock)
{
fprintf (stderr, "worker %d return from epoll_wait!\n", k);
if (- == epoll_ctl (efd, EPOLL_CTL_DEL, sfd, &event))
{
if (errno == ENOENT)
{
fprintf (stderr, "EPOLL_CTL_DEL\n");
}
}
epoll_lock = ;
}
}
//epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
// fprintf(stderr, "ok\n");
//不能设置为-1 为了能让拿不到锁的进程再次拿到锁
n = epoll_wait (efd, events, MAXEVENTS, );
for (i = ; i < n; i++)
{
if (sfd == events[i].data.fd)
{
/* We have a notification on the listening socket, which means one or more incoming connections. */
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
in_len = sizeof in_addr;
while ((infd = accept (sfd, &in_addr, &in_len)) > )
{
fprintf(stderr, "get one\n");
close (infd);
}
}
}
if (epoll_lock)
{
//这里将锁释放
sem_post (sem);
epoll_lock = ;
epoll_ctl (efd, EPOLL_CTL_DEL, sfd, &event);
}
}
} int
main (int argc, char *argv[])
{
int shmid;
sem_t *acctl;
//建立共享内存
shmid = shmget (IPC_PRIVATE, sizeof (sem_t), );
acctl = (sem_t *) shmat (shmid, , );
//进程间信号量初始化 要用到上面的共享内存
sem_init (acctl, , );
int sfd, s;
int efd;
// struct epoll_event event;
// struct epoll_event *events;
sfd = create_and_bind ();
if (sfd == -)
{
abort ();
}
s = make_socket_non_blocking (sfd);
if (s == -)
{
abort ();
}
s = listen (sfd, SOMAXCONN);
if (s == -)
{
perror ("listen");
abort ();
}
efd = ;
int k;
for (k = ; k < PROCESS_NUM; k++)
{
printf ("Create worker %d\n", k + );
int pid = fork ();
if (pid == )
{
struct epoll_event *events;
events = calloc (MAXEVENTS, sizeof (struct epoll_event));
worker (sfd, efd, events, k, acctl);
break;
}
}
int status;
wait (&status);
close (sfd);
return EXIT_SUCCESS;
}
/*
* 这里处理惊群 用到了进程的锁(信号量, 共享内存), 根据试验的结果多个进程时accept接收客户端连接的效率并没有提高太多
* 但是处理其他可读可写(非监听描述符)时, 要比单个进程要快很多。
*/

  在早期的kernel中, 多线程或多进程调用accept就会出现如下情况, 当前多个进程阻塞在accept中, 此时有客户端连接时, 内核就会通知阻塞在accept的所有进程, 这时就会造成惊群现象, 也就是所有accept都会返回 但是只有一个能拿到有效的文件描述符, 其他进程最后都会返回无效描述符。但在linux kernel 版本2.6 以上时, accept惊群的问题已经解决, 大致方案就是选一个阻塞在accept的进程返回。

  但是在IO复用中, select/poll/epoll 还是存在这种现象,其原因就是这些阻塞函数造成了以上同样的问题。这里就给出了类似Nginx的解决方案, 给监听描述符竞争枷锁, 保证只有一个进程处理监听描述符。 这里还可以控制锁的频率,如果一个进程接受连接的数量达到一定数量就不再申请锁。  这里要注意的是epoll_create的位置要在fork之后的子进程中, 这是因为若在父进程中create 那么fork之后子进程保留这个描述符副本,epoll_create其实是内核中建立的文件系统 保存在内核中, 那么其子进程都会共用这个文件系统, 那么多任务的epoll_ctl就会出现问题。子进程中建立各自的epoll fd 就可以避免这种情况。