文章目录
- 1、完善Epoll简单服务器
- 2、打造统一的分开处理的体系
- 3、epoll工作模式
- 4、ET模式
- 5、继续完善,处理写事件
- 6、引入自定义协议,处理写事件
本篇基于上篇代码继续改进,很长。关于Reactor的说明在后一篇
1、完善Epoll简单服务器
上面的代码在处理读事件时,用的request数组是临时的,如果有数据没读完,那么下次再来到这里,就没有这些数据了。所以得让每一个fd都有自己的缓冲区。建立一个Connection类,然后有一个map结构,让这个类和每个fd建立映射。Start函数改一下,不管超时还是出错,就只处理数据,处理的部分交给HandlerEvent,改名成LoopOnce,也就是说,Start那里还是有循环,每次循环都去执行L函数,L函数用Wait提取一次,然后处理。
void Start()
{
//1、将listensock添加到epoll中,要先有epoll模型
bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);//只关心读事件
assert(r);//可以做别的判断
(void)r;
struct epoll_event revs_[gnum];
int timeout = 1000;
while(true)
{
LoopOnce(timeout);
}
}
void Accepter()
{
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0) return ;
logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
// 还不能recv,即使有了连接但也不知道有没有数据
// 只有epoll知道具体情况,所以将sock添加到epoll中
bool r = epoller_.AddEvent(sock, EPOLLIN);
assert(r);
(void)r;
}
void Recver(int fd)
{
char request[1024];
ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
if (s > 0)
{
request[s - 1] = 0; // 对打印格式
request[s - 2] = 0; // 做一下调整
std::string response = func_(request);
send(fd, response.c_str(), response.size(), 0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ...");
else
logMessage(Warning, "recv error, client quit...");
close(fd);
// 将文件描述符移除
// 在处理异常的时候,fd必须合法才能被处理
epoller_.DelEvent(fd);
}
}
void LoopOnce(int timeout)
{
int n = epoller_.Wait(revs_, gnum, timeout);
for(int i = 0; i < n; i++)
{
int fd = revs_[i].data.fd;
uint32_t events = revs_[i].events;
logMessage(Debug, "当前正在处理%d上的%s", fd, (events&EPOLLIN) ? "EPOLLIN" : "OTHER");
if(events & EPOLLIN)//判断读事件就绪
{
if (fd == listensock_.Fd())
{
// 1、新连接到来
Accepter();
}
else
{
// 2、读事件
Recver(fd);
}
}
}
}
class Connection
{
public:
Connection(int fd): fd_(fd)
{}
~Connection()
{}
public:
int fd_;
std::string inbuffer_;
std::string outbuffer_;
};
std::unordered_map<int, Connection*> connections_;
把Start的初始化任务交给InitServer
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
epoller_.Create();
logMessage(Debug, "init server success");
//为listensock创建对应的connection对象
Connection* conn = new Connection(listensock_.Fd());
//将listensock和connection对象添加到connections_
connections_.insert(std::pair<int, Connection*>(listensock_.Fd(), conn));
//将listensock添加到epoll中
bool r = epoller_.AddEvent(listensock_.Fd(), EPOLLIN);
assert(r);
(void)r;
}
void Start()
{
struct epoll_event revs_[gnum];
int timeout = 1000;
while(true)
{
LoopOnce(timeout);
}
}
同样地,Accepter有添加到epoll的fd也要映射上自己的Connection类,Recver那里就可以也改一下了
void Accepter()
{
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0) return ;
logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
// 还不能recv,即使有了连接但也不知道有没有数据
// 只有epoll知道具体情况,所以将sock添加到epoll中
Connection* conn = new Connection(sock);
connections_.insert(std::pair<int, Connection*>(sock, conn));
bool r = epoller_.AddEvent(sock, EPOLLIN);
assert(r);
(void)r;
}
void Recver(int fd)
{
char request[1024];
ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
if (s > 0)
{
request[s - 1] = 0; // 对打印格式
request[s - 2] = 0; // 做一下调整
connections_[fd]->inbuffer_ += request;
std::string response = func_(request);
send(fd, response.c_str(), response.size(), 0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ...");
else
logMessage(Warning, "recv error, client quit...");
close(fd);
// 将文件描述符移除
// 在处理异常的时候,fd必须合法才能被处理
epoller_.DelEvent(fd);
}
}
所有就绪的fd,不只包含我们关心的fd,都要有Connection类。Accepter那里,得到连接后,获取套接字,不直接读取,因为不知道是否有数据,就交给epoll,不过获取套接字后,每个套接字都需要正确读取自己的报文,所以Connection有了两个buffer。
所有就绪的fd,不仅要有Connection类,还要被epoll管理。但这样的代码并不高效,删除的时候要从epoll里删,还要从connections_里删,且代码也不够简洁。
封装并修改一下形式
class Connection
{
public:
Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
: fd_(fd), clientip_(clientip), clientport_(clientport)
{}
~Connection()
{}
public:
int fd_;
std::string inbuffer_;
std::string outbuffer_;
std::string clientip_;
uint16_t clientport_;
};
//...
void InitServer()
{
listensock_.Socket();
listensock_.Bind(port_);
listensock_.Listen();
epoller_.Create();
//为listensock创建对应的connection对象
//将listensock和connection对象添加到connections_
//将listensock添加到epoll中
AddConnection(listensock_.Fd(), EPOLLIN);
logMessage(Debug, "init server success");
}
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
//1、构建connection对象,交给connections_管理
Connection* conn = new Connection(fd, ip, port);
connections_.insert(std::pair<int, Connection*>(fd, conn));
//2、fd和events写到内核中
bool r = epoller_.AddEvent(fd, events);
assert(r);
(void)r;
logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
}
void Accepter()
{
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0) return ;
logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
// 还不能recv,即使有了连接但也不知道有没有数据
// 只有epoll知道具体情况,所以将sock添加到epoll中
AddConnection(sock, EPOLLIN, clientip, clientport);
}
void Recver(int fd)
{
char request[1024];
ssize_t s = recv(fd, request, sizeof(request) - 1, 0);
if (s > 0)
{
request[s - 1] = 0; // 对打印格式
request[s - 2] = 0; // 做一下调整
connections_[fd]->inbuffer_ += request;
std::string response = func_(request);
send(fd, response.c_str(), response.size(), 0);
}
else
{
if (s == 0)
logMessage(Info, "client quit ...");
else
logMessage(Warning, "recv error
// 在处理异常的时候,fd必须合法才能被处理
epoller_.DelEvent(fd);
}
}
2、打造统一的分开处理的体系
现有的Accepter、Recver都是处理写事件的,LoopOnce那里可以加个读事件的判断,但相关的处理函数要怎么写?为了简便,这里再引入回调函数。
const static int gport = 8888;
class Connection;
using func_t = std::function<std::string (std::string)>;
using callback_t = std::function<void(Connection*)>;
class Connection
{
public:
Connection(const int& fd, const std::string& clientip, const uint16_t& clientport)
: fd_(fd), clientip_(clientip), clientport_(clientport)
{}
void Register(callback_t recver, callback_t sender, callback_t excepter)
{
recver_ = recver;
sender_ = sender;
excepter_ = excepter;
}
~Connection()
{}
public:
//IO信息
int fd_;
std::string inbuffer_;
std::string outbuffer_;
//IO处理
callback_t recver_;
callback_t sender_;
callback_t excepter_;
//用户信息
std::string clientip_;
uint16_t clientport_;
};
Register为注册方法,也就是要使用的方法。在AddConnection函数中,要判断一下,是我们关心的和不是我们关心的,都调用注册方法,但传的参数不一样。
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
//2、构建connection对象,交给connections_管理
Connection* conn = new Connection(fd, ip, port);
if(fd == listensock_.Fd())
{
conn->Register();
}
else
{
conn->Register();
}
connections_.insert(std::pair<int, Connection*>(fd, conn));
//3、fd和events写到内核中
bool r = epoller_.AddEvent(fd, events);
assert(r);
(void)r;
logMessage(Debug, "AddConnection success, fd: %d, clientinfo: [%s:%d]", fd, ip.c_str(), port);
}
Accepter那里,里面有AddConnection函数。当LoopOnce调用Accepter时,这个函数也要用回调函数,这样就是一个类的成员函数要调用另一个类的回调函数。
void Accepter(Connection* conn)
{
(void) conn;//先闲置不用
std::string clientip;
uint16_t clientport;
int sock = listensock_.Accept(&clientip, &clientport);
if (sock < 0) return ;
logMessage(Debug, "%s:%d 已经连上服务器了", clientip.c_str(), clientport);
// 还不能recv,即使有了连接但也不知道有没有数据
// 只有epoll知道具体情况,所以将sock添加到epoll中
AddConnection(sock, EPOLLIN, clientip, clientport);
}
AddConnection中,Regsiter三个参数都是callback_t类型的,我们可以这样写
if(fd == listensock_.Fd())
{
conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
这样设置,当我们关心的套接字上有事件就绪时,读方法就绑定Accepter。是其它套接字的话
else
{
conn->Register(std::bind(&EpollServer::Recver, this, std::placeholders::_1),
std::bind(&EpollServer::Sender, this, std::placeholders::_1),
std::bind(&EpollServer::Excepter, this, std::placeholders::_1));
}
void AddConnection(int fd, uint32_t events, std::string ip = "127.0.0.1", uint16_t port = gport)
{
//2、构建connection对象,交给connections_管理
Connection* conn = new Connection(fd, ip, port);
if(fd == listensock_.Fd())
{
conn->Register(std::bind(&EpollServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}
else
{
conn->Register(std::bind(