epoll_create, epoll_ctl和epoll_wait

时间:2024-11-14 13:33:27

参考代码

 #include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h> using namespace std; #define MAXLINE 5
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5000
#define INFTIM 1000 void setnonblocking(int sock)
{
int opts;
opts=fcntl(sock,F_GETFL);
if(opts<)
{
perror("fcntl(sock,GETFL)");
exit();
}
opts = opts|O_NONBLOCK;
if(fcntl(sock,F_SETFL,opts)<)
{
perror("fcntl(sock,SETFL,opts)");
exit();
}
} int main(int argc, char* argv[])
{
int i, maxi, listenfd, connfd, sockfd,epfd,nfds, portnumber;
ssize_t n;
char line[MAXLINE];
socklen_t clilen; if ( == argc )
{
if( (portnumber = atoi(argv[])) < )
{
fprintf(stderr,"Usage:%s portnumber/a/n",argv[]);
return ;
}
}
else
{
fprintf(stderr,"Usage:%s portnumber/a/n",argv[]);
return ;
} //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件 struct epoll_event ev,events[];
//生成用于处理accept的epoll专用的文件描述符 epfd=epoll_create();
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET, SOCK_STREAM, );
//把socket设置为非阻塞方式 //setnonblocking(listenfd); //设置与要处理的事件相关的文件描述符 ev.data.fd=listenfd;
//设置要处理的事件类型 ev.events=EPOLLIN|EPOLLET;
//ev.events=EPOLLIN; //注册epoll事件 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
char *local_addr="127.0.0.1";
inet_aton(local_addr,&(serveraddr.sin_addr));//htons(portnumber); serveraddr.sin_port=htons(portnumber);
bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
listen(listenfd, LISTENQ);
maxi = ;
for ( ; ; ) {
//等待epoll事件的发生 nfds=epoll_wait(epfd,events,,);
//处理所发生的所有事件 for(i=;i<nfds;++i)
{
if(events[i].data.fd==listenfd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。 {
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
if(connfd<){
perror("connfd<0");
exit();
}
//setnonblocking(connfd); char *str = inet_ntoa(clientaddr.sin_addr);
cout << "accapt a connection from " << str << endl;
//设置用于读操作的文件描述符 ev.data.fd=connfd;
//设置用于注测的读操作事件 ev.events=EPOLLIN|EPOLLET;
//ev.events=EPOLLIN; //注册ev epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
}
else if(events[i].events&EPOLLIN)//如果是已经连接的用户,并且收到数据,那么进行读入。 {
cout << "EPOLLIN" << endl;
if ( (sockfd = events[i].data.fd) < )
continue;
if ( (n = read(sockfd, line, MAXLINE)) < ) {
if (errno == ECONNRESET) {
close(sockfd);
events[i].data.fd = -;
} else
std::cout<<"readline error"<<std::endl;
} else if (n == ) {
close(sockfd);
events[i].data.fd = -;
}
line[n] = '/0';
cout << "read " << line << endl;
//设置用于写操作的文件描述符 ev.data.fd=sockfd;
//设置用于注测的写操作事件 ev.events=EPOLLOUT|EPOLLET;
//修改sockfd上要处理的事件为EPOLLOUT //epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); }
else if(events[i].events&EPOLLOUT) // 如果有数据发送 {
sockfd = events[i].data.fd;
write(sockfd, line, n);
//设置用于读操作的文件描述符 ev.data.fd=sockfd;
//设置用于注测的读操作事件 ev.events=EPOLLIN|EPOLLET;
//修改sockfd上要处理的事件为EPOLIN epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
}
}
}
return ;
}
 //
// a simple echo server using epoll in linux
//
// 2009-11-05
// 2013-03-22:修改了几个问题,1是/n格式问题,2是去掉了原代码不小心加上的ET模式;
// 本来只是简单的示意程序,决定还是加上 recv/send时的buffer偏移
// by sparkling
//
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <iostream>
using namespace std;
#define MAX_EVENTS 500
struct myevent_s
{
int fd;
void (*call_back)(int fd, int events, void *arg);
int events;
void *arg;
int status; // 1: in epoll wait list, 0 not in
char buff[]; // recv data buffer
int len, s_offset;
long last_active; // last active time
};
// set event
void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = ;
ev->arg = arg;
ev->status = ;
bzero(ev->buff, sizeof(ev->buff));
ev->s_offset = ;
ev->len = ;
ev->last_active = time(NULL);
}
// add/mod an event to epoll
void EventAdd(int epollFd, int events, myevent_s *ev)
{
struct epoll_event epv = {, {}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events;
if(ev->status == ){ op = EPOLL_CTL_MOD;
}
else{
op = EPOLL_CTL_ADD;
ev->status = ;
}
if(epoll_ctl(epollFd, op, ev->fd, &epv) < )
printf("Event Add failed[fd=%d], evnets[%d]\n", ev->fd, events);
else
printf("Event Add OK[fd=%d], op=%d, evnets[%0X]\n", ev->fd, op, events);
}
// delete an event from epoll
void EventDel(int epollFd, myevent_s *ev)
{
struct epoll_event epv = {, {}};
if(ev->status != ) return;
epv.data.ptr = ev;
ev->status = ;
epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);
}
int g_epollFd;
myevent_s g_Events[MAX_EVENTS+]; // g_Events[MAX_EVENTS] is used by listen fd
void RecvData(int fd, int events, void *arg);
void SendData(int fd, int events, void *arg);
// accept new connections from clients
void AcceptConn(int fd, int events, void *arg)
{
struct sockaddr_in sin;
socklen_t len = sizeof(struct sockaddr_in);
int nfd, i;
// accept
if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -)
{
if(errno != EAGAIN && errno != EINTR)
{
}
printf("%s: accept, %d", __func__, errno);
return;
}
do
{
for(i = ; i < MAX_EVENTS; i++)
{
if(g_Events[i].status == )
{
break;
}
}
if(i == MAX_EVENTS)
{
printf("%s:max connection limit[%d].", __func__, MAX_EVENTS);
break;
}
// set nonblocking
int iret = ;
if((iret = fcntl(nfd, F_SETFL, O_NONBLOCK)) < )
{
printf("%s: fcntl nonblocking failed:%d", __func__, iret);
break;
}
// add a read event for receive data
EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);
EventAdd(g_epollFd, EPOLLIN, &g_Events[i]);
}while();
printf("new conn[%s:%d][time:%d], pos[%d]\n", inet_ntoa(sin.sin_addr),
ntohs(sin.sin_port), g_Events[i].last_active, i);
}
// receive data
void RecvData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// receive data
len = recv(fd, ev->buff+ev->len, sizeof(ev->buff)--ev->len, );
EventDel(g_epollFd, ev);
if(len > )
{
ev->len += len;
ev->buff[len] = '\0';
printf("C[%d]:%s\n", fd, ev->buff);
// change to send event
EventSet(ev, fd, SendData, ev);
EventAdd(g_epollFd, EPOLLOUT, ev);
}
else if(len == )
{
close(ev->fd);
printf("[fd=%d] pos[%d], closed gracefully.\n", fd, ev-g_Events);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
}
// send data
void SendData(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s*)arg;
int len;
// send data
len = send(fd, ev->buff + ev->s_offset, ev->len - ev->s_offset, );
if(len > )
{
printf("send[fd=%d], [%d<->%d]%s\n", fd, len, ev->len, ev->buff);
ev->s_offset += len;
if(ev->s_offset == ev->len)
{
// change to receive event
EventDel(g_epollFd, ev);
EventSet(ev, fd, RecvData, ev);
EventAdd(g_epollFd, EPOLLIN, ev);
}
}
else
{
close(ev->fd);
EventDel(g_epollFd, ev);
printf("send[fd=%d] error[%d]\n", fd, errno);
}
}
void InitListenSocket(int epollFd, short port)
{
int listenFd = socket(AF_INET, SOCK_STREAM, );
fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking
printf("server listen fd=%d\n", listenFd);
EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);
// add listen socket
EventAdd(epollFd, EPOLLIN, &g_Events[MAX_EVENTS]);
// bind & listen
sockaddr_in sin;
bzero(&sin, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);
bind(listenFd, (const sockaddr*)&sin, sizeof(sin));
listen(listenFd, );
}
int main(int argc, char **argv)
{
unsigned short port = ; // default port
if(argc == ){
port = atoi(argv[]);
}
// create epoll
g_epollFd = epoll_create(MAX_EVENTS);
if(g_epollFd <= ) printf("create epoll failed.%d\n", g_epollFd);
// create & bind listen socket, and add to epoll, set non-blocking
InitListenSocket(g_epollFd, port);
// event loop
struct epoll_event events[MAX_EVENTS];
printf("server running:port[%d]\n", port);
int checkPos = ;
while(){
// a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
long now = time(NULL);
for(int i = ; i < ; i++, checkPos++) // doesn't check listen fd
{
if(checkPos == MAX_EVENTS) checkPos = ; // recycle
if(g_Events[checkPos].status != ) continue;
long duration = now - g_Events[checkPos].last_active;
if(duration >= ) // 60s timeout
{
close(g_Events[checkPos].fd);
printf("[fd=%d] timeout[%d--%d].\n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);
EventDel(g_epollFd, &g_Events[checkPos]);
}
}
// wait for events to happen
int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, );
if(fds < ){
printf("epoll_wait error, exit\n");
break;
}
for(int i = ; i < fds; i++){
myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
// free resource
return ;
}

参考资料

epoll相关资料整理

epoll_create函数实现源码分析

epoll_create epoll_ctl epoll_wait close epoll和select的简单比较

epoll使用详解(精髓)

epoll源码分析

epoll_create函数实现源码分析

学习使用epoll