基于kbengine 0.4.2
MMOPG服务端是一种高品质的工程项目,品读开源的kbe是一种乐趣。本文档我带童鞋们一起领略一下。囿于我知识面和经验方面所限,文中所述之处难免有错误存在,还请读童鞋们睁大慧眼,如果你发现错误,可以 电邮至shawhen2012@hotmail.com。(因为我个人懒散或者时间仓促的关系,这个文档的排版有点小乱。。。)
其他牛逼哄哄的前言就不说了。
从理论上来讲,我们阅读一份源代码,首先应该基于现有的文档从整体上把握项目的架构之后再庖丁解牛一般地细分阅读,不过在我写这个文档的现在,我暂时没发现这样的文档,所以我就按照我自己的阅读顺序从而编排这个文档的内容。
从已有的文档可知(我得假设你已经大致看完了kbe官网的现有文档),kbe由几个组件共同协作,所以我们先看看组件们:
各个组件被设计为独立的app,使用网络通信进行协作。C++程序自然是从main函数开始。
看起来似乎所有的组件都有一个这样的宏(KBENGINE_MAIN)来包裹main函数
int KBENGINE_MAIN(int argc, char* argv[])
{
ENGINE_COMPONENT_INFO& info = g_kbeSrvConfig.getXXX();
return kbeMainT<XXX>(argc, argv, YYY, info.externalPorts_min,
info.externalPorts_max, info.externalInterface, 0, info.internalInterface);
}
这个宏展开是这样子:
kbeMain(int argc, char* argv[]); \
int main(int argc, char* argv[]) \
{ \
loadConfig(); \
g_componentID = genUUID64(); \
parseMainCommandArgs(argc, argv); \
char dumpname[MAX_BUF] = {0}; \
kbe_snprintf(dumpname, MAX_BUF, "%"PRAppID, g_componentID); \
KBEngine::exception::installCrashHandler(1, dumpname); \
int retcode = -1; \
THREAD_TRY_EXECUTION; \
retcode = kbeMain(argc, argv); \
THREAD_HANDLE_CRASH; \
return retcode; \
} \
稍微整理一下之后很像是这个样子:
int kbeMain(int argc, char* argv[]);
int main(int argc, char* argv[])
{
loadConfig();
g_componentID = genUUID64();
parseMainCommandArgs(argc, argv);
char dumpname[MAX_BUF] = {0};
kbe_snprintf(dumpname, MAX_BUF, "%"PRAppID, g_componentID);
KBEngine::exception::installCrashHandler(1, dumpname);
int retcode = -1;
THREAD_TRY_EXECUTION;
retcode = kbeMain(argc, argv);
THREAD_HANDLE_CRASH;
return (retcode);
}
int kbeMain(int argc, char* argv[])
{
ENGINE_COMPONENT_INFO& info = g_kbeSrvConfig.getXXX();
return kbeMainT<XXX>(argc, argv, YYY, info.externalPorts_min, info.externalPorts_max, info.externalInterface, 0, info.internalInterface);
}
嗯。。。基本可以理解为每个组件的main函数流程都是一样的,只是在特化kbeMainT时所给参数不一样。
我们跟着main函数的loadConfig进去看看(kbemain.h)
inline void loadConfig()
{
Resmgr::getSingleton().initialize();
// "../../res/server/kbengine_defs.xml"
g_kbeSrvConfig.loadConfig("server/kbengine_defs.xml");
// "../../../assets/res/server/kbengine.xml"
g_kbeSrvConfig.loadConfig("server/kbengine.xml");
}
在serverconfig.h中可以看到这样的代码:
#define g_kbeSrvConfig ServerConfig::getSingleton()
Resmgr和ServerConfig,这两个类都是被搞成单例了的(kbe的单例不算太严格的单例,线程安全,编译时阻断都无法满足,我们先不细究),
Resmgr是资源管理器,在resmgr.h/.cpp中声明和定义,在FixedMessages类(fixed_messages.h/.cpp)的构造函数中被new出来。(有点小隐晦,我是调试了一下才跟到的。。。)
过程是这样,在各个组件的xxx_interface.cpp中,有这样的代码:(摘自loginapp)
#include "loginapp_interface.h"
#define DEFINE_IN_INTERFACE
#define LOGINAPP
#include "loginapp_interface.h"
xxx_interface.h中,有这样的代码:
#if defined(DEFINE_IN_INTERFACE)
#undef KBE_LOGINAPP_INTERFACE_H
#endif
#ifndef KBE_LOGINAPP_INTERFACE_H
#define KBE_LOGINAPP_INTERFACE_H
大意可以理解为在xxx_interface.cpp中通过在包含xxx_interface.h前后定义DEFINE_IN_INTERFACE和LOGINAPP宏,使得xxx_interface.h被包含了两次(但产生的代码确实不同的),从而对xxx_interface.h内的一些变量实现了声明(第一次)和定义(第二次)。
在xxx_interface.h中有这样一句:
NETWORK_INTERFACE_DECLARE_BEGIN(LoginappInterface)
展开就是:
// 定义接口域名称
#ifndef DEFINE_IN_INTERFACE
#define NETWORK_INTERFACE_DECLARE_BEGIN(INAME) \
namespace INAME \
{ \
extern Network::MessageHandlers messageHandlers; \
#else
#define NETWORK_INTERFACE_DECLARE_BEGIN(INAME) \
namespace INAME \
{ \
Network::MessageHandlers messageHandlers; \
#endif
#define NETWORK_INTERFACE_DECLARE_END() }
在第一次包含xxx_interface.h的时候就是extern Network….这样的外部全局变量引用声明,第二次包含的时候就是Network::M…..这样的全局变量的定义了。
在Network::MessageHandles的构造函数(message_handler.cpp)中,有:
MessageHandlers::MessageHandlers():
msgHandlers_(),
msgID_(1),
exposedMessages_()
{
g_fm = Network::FixedMessages::getSingletonPtr();
if(g_fm == NULL)
g_fm = new Network::FixedMessages;
Network::FixedMessages::getSingleton().loadConfig("server/messages_fixed.xml");
messageHandlers().push_back(this);
}
至此,Network::FixedMessages类被有机会实例化,构造函数中有:
FixedMessages::FixedMessages():
_infomap(),
_loaded(false)
{
new Resmgr();
Resmgr::getSingleton().initialize();
}
ServerConfig在各个组件(比如loginapp在loginapp.cpp)的类的定义文件中实例化。
ServerConfig g_serverConfig;
KBE_SINGLETON_INIT(Loginapp);
我们再次回到loadConfig,里面的函数小跟一下就能读明白了,我们继续跟进主干流程。
下面的语句给组件生成一个随机id
g_componentID = genUUID64();
下面的语句解析主函数的参数:(比如设定指定的组件id,以及gus,我也还没了解到gus搞啥用的。。。不影响我们阅读整体流程,不细究)
parseMainCommandArgs(argc, argv);
下面的语句进行crash处理:(不影响我们阅读整体流程,不细究)
char dumpname[MAX_BUF] = {0}; \
kbe_snprintf(dumpname, MAX_BUF, "%"PRAppID, g_componentID); \
KBEngine::exception::installCrashHandler(1, dumpname);
下面的语句就是一个标准的main函数转向:
int retcode = -1; \
THREAD_TRY_EXECUTION; \
retcode = kbeMain(argc, argv); \
THREAD_HANDLE_CRASH; \
return retcode;
在kbemain.h中可以看到KBENGINE_MAIN针对不同的平台有不同的定义。。。其实就是非win32平台没有crash处理。
kbeMain是在各个组件的main.cpp中定义的:(摘自loginapp)
{
ENGINE_COMPONENT_INFO& info = g_kbeSrvConfig.getLoginApp();
return kbeMainT<Loginapp>(argc, argv, LOGINAPP_TYPE, info.externalPorts_min,
info.externalPorts_max, info.externalInterface, 0, info.internalInterface);
}
第一句就是获取到各个组件相关的信息,然后流程再次转向,到了kbeMainT函数。
kbeMainT是一个模板函数,根据各个组件的主类的不同,产生不同的代码(。。。我是不是有点过了)
前面的一些语句我们暂且不看(后面会需要看的),有设置环境变量的,设置密钥对的。可以看到kbeMainT的流程以各个组件的实例的run接口而终止。
大致看了几个组件的run函数,有的是调用ServerApp的run接口,有的是直接调用dispatcher的processXXX接口,总的来讲,就是依靠事件派发器来进行工作了。所以我们有必要去看看kbe的事件派发器了。
l loginapp组件的run接口:
bool Loginapp::run()
{
return ServerApp::run();
}
l machine组件的run接口:
bool Machine::run()
{
bool ret = true;
while(!this->dispatcher().hasBreakProcessing())
{
threadPool_.onMainThreadTick();
this->dispatcher().processOnce(false);
networkInterface().processChannels(&MachineInterface::messageHandlers);
KBEngine::sleep(100);
};
return ret;
}
l baseapp组件的run接口:
bool Baseapp::run()
{
return EntityApp<Base>::run();
}
实际上EntityApp也是继承自ServerApp
在kbeMainT中可以看到:
Network::EventDispatcher dispatcher;
这里构造了事件派发器,我们得看看它所谓的process到底干了些什么。
在event_dispatcher.cpp可以看到:
int EventDispatcher::processOnce(bool shouldIdle)
{
if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING)
breakProcessing_ = EVENT_DISPATCHER_STATUS_RUNNING;
this->processTasks();
if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){
this->processTimers();
}
this->processStats();
if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){
return this->processNetwork(shouldIdle);
}
return 0;
}
这里我们有必要明白一些常识,对于从select,poll,epoll,iocp,kequeue的api,到boost的asio,ace,libevent,libev的库,这些网络i/o复用模型都是为了让我们监听nic上的事件,然后提交给相应的handler处理。他们除了工作方式导致的性能和编码方式有区别外,还有回调的时机的区别,iocp是对于资源的指定动作完成后回调,其他的unix族(kequeue是bsd的)接口都是资源对于指定动作准备好时回调。
所以我们要解读这个事件派发器得找到两个重点,如何产生事件,如何交付到应用处理。
EventDispatcher的processTasks可以稍微跟一下,发现它是处理所有“任务”的一个接口,但什么是任务,我们还不知道,不继续跟了。
processTimers, processStats也暂时不跟了。。。
我们跟到processNetwork里面去:
int EventDispatcher::processNetwork(bool shouldIdle)
{
double maxWait = shouldIdle ? this->calculateWait() : 0.0;
return pPoller_->processPendingEvents(maxWait);
}
这个pPoller是一个EventPoller的实例。EventPoller是一个抽象类(event_poller.h/.cpp),它目前有两个派生子类,SelectorPoller(poller_select.h/.cpp)和EpollPoller(poller_epoll.h/.cpp),顾名思义,它们分别是利用select和epoll系统api进行异步i/o工作的。(在win32上面如果使用iocp的话性能应该是可以和epoll匹敌的,不过由于iocp和epoll工作方式不一样,我估计这是kbe里面没有在win32上使用iocp的原因,如果要将这两个工作方式抽象为一种,工作量估计比kbe本身小不了多少,如果是我的话,我会直接使用asio或者libevent,不过kbe作者为啥没用,可能就像redis的作者的解释一样,虽然我觉得那是很操蛋的解释,使用现有库的好处是显而易见的,如果碰巧像我这种对asio或者libevent有经验的,那这里kbe的网络底层我可以一掠而过,我也可以把更多的精力放在这个项目本身要解决的问题上。继续发牢骚可能)
select和epoll的工作方式一致,所以我们任选一个阅读都行,我倾向于使用epoll。
int EpollPoller::processPendingEvents(double maxWait)
{
const int MAX_EVENTS = 10;
struct epoll_event events[ MAX_EVENTS ];
int maxWaitInMilliseconds = int(ceil(maxWait * 1000));
#if ENABLE_WATCHERS
g_idleProfile.start();
#else
uint64 startTime = timestamp();
#endif
KBEConcurrency::onStartMainThreadIdling();
int nfds = epoll_wait(epfd_, events, MAX_EVENTS, maxWaitInMilliseconds);
KBEConcurrency::onEndMainThreadIdling();
#if ENABLE_WATCHERS
g_idleProfile.stop();
spareTime_ += g_idleProfile.lastTime_;
#else
spareTime_ += timestamp() - startTime;
#endif
for (int i = 0; i < nfds; ++i)
{
if (events[i].events & (EPOLLERR|EPOLLHUP))
{
this->triggerError(events[i].data.fd);
}
else
{
if (events[i].events & EPOLLIN)
{
this->triggerRead(events[i].data.fd);
}
if (events[i].events & EPOLLOUT)
{
this->triggerWrite(events[i].data.fd);
}
}
}
return nfds;
}
大意就是对Poller内注册的文件描述符进行事件等待,然后对事件进行区分之后触发读(triggerRead)或者写(triggerWrite)的接口。我们跟一下这两个接口:
bool EventPoller::triggerRead(int fd)
{
FDReadHandlers::iterator iter = fdReadHandlers_.find(fd);
if (iter == fdReadHandlers_.end())
{
return false;
}
iter->second->handleInputNotification(fd);
return true;
}
//-------------------------------------------------------------------------------------
bool EventPoller::triggerWrite(int fd)
{
FDWriteHandlers::iterator iter = fdWriteHandlers_.find(fd);
if (iter == fdWriteHandlers_.end())
{
return false;
}
iter->second->handleOutputNotification(fd);
return true;
}
可以看到就是对注册的文件描述符查找相应的输入输出处理接口(这里也指导了我们下一步的阅读方向,找到注册文件描述符的地方)。至此我们找到了事件如何产生。(其实我一直不习惯poll流的模型,我比较喜欢iocp的模型,虽然理论上来讲poll会给予应用层更多的变化点。打个不太形象的比喻,你让poll或者iocp给你准备三辆车。你给poll说完,poll稍后会告诉你:老爷,车备好了,在车库里,但具体有几辆我也不清楚,您自个去看看。你给iocp三把钥匙,iocp稍后会告诉你:老爷,三辆车准备好了,就停在门外。)
为了找到注册文件描述符和事件处理接口的流程,我们再次回到kbeMainT。映入眼帘的是这行代码:
Network::NetworkInterface networkInterface(&dispatcher,
extlisteningPort_min, extlisteningPort_max, extlisteningInterface,
channelCommon.extReadBufferSize, channelCommon.extWriteBufferSize,
(intlisteningPort != -1) ? htons(intlisteningPort) : -1, intlisteningInterface,
channelCommon.intReadBufferSize, channelCommon.intWriteBufferSize);
Network::NetworkInterface的构造函数:
NetworkInterface::NetworkInterface(Network::EventDispatcher * pDispatcher,
int32 extlisteningPort_min, int32 extlisteningPort_max, const char * extlisteningInterface,
uint32 extrbuffer, uint32 extwbuffer,
int32 intlisteningPort, const char * intlisteningInterface,
uint32 intrbuffer, uint32 intwbuffer):
extEndpoint_(),
intEndpoint_(),
channelMap_(),
pDispatcher_(pDispatcher),
pExtensionData_(NULL),
pExtListenerReceiver_(NULL),
pIntListenerReceiver_(NULL),
pDelayedChannels_(new DelayedChannels()),
pChannelTimeOutHandler_(NULL),
pChannelDeregisterHandler_(NULL),
isExternal_(extlisteningPort_min != -1),
numExtChannels_(0)
{
if(isExternal())
{
pExtListenerReceiver_ = new ListenerReceiver(extEndpoint_, Channel::EXTERNAL, *this);
this->recreateListeningSocket("EXTERNAL", htons(extlisteningPort_min), htons(extlisteningPort_max),
extlisteningInterface, &extEndpoint_, pExtListenerReceiver_, extrbuffer, extwbuffer);
// 如果配置了对外端口范围, 如果范围过小这里extEndpoint_可能没有端口可用了
if(extlisteningPort_min != -1)
{
KBE_ASSERT(extEndpoint_.good() && "Channel::EXTERNAL: no available port, "
"please check for kbengine_defs.xml!\n");
}
}
if(intlisteningPort != -1)
{
pIntListenerReceiver_ = new ListenerReceiver(intEndpoint_, Channel::INTERNAL, *this);
this->recreateListeningSocket("INTERNAL", intlisteningPort, intlisteningPort,
intlisteningInterface, &intEndpoint_, pIntListenerReceiver_, intrbuffer, intwbuffer);
}
KBE_ASSERT(good() && "NetworkInterface::NetworkInterface: no available port, "
"please check for kbengine_defs.xml!\n");
pDelayedChannels_->init(this->dispatcher(), this);
}
在recreateListeningSocket接口的代码中:
bool NetworkInterface::recreateListeningSocket(const char* pEndPointName, uint16 listeningPort_min, uint16 listeningPort_max,
const char * listeningInterface, EndPoint* pEP, ListenerReceiver* pLR, uint32 rbuffer,
uint32 wbuffer)
{
KBE_ASSERT(listeningInterface && pEP && pLR);
if (pEP->good())
{
this->dispatcher().deregisterReadFileDescriptor( *pEP );
pEP->close();
}
Address address;
address.ip = 0;
address.port = 0;
pEP->socket(SOCK_STREAM);
if (!pEP->good())
{
ERROR_MSG(fmt::format("NetworkInterface::recreateListeningSocket({}): couldn't create a socket\n",
pEndPointName));
return false;
}
/*
pEP->setreuseaddr(true);
*/
this->dispatcher().registerReadFileDescriptor(*pEP, pLR);
找到了事件派发器注册文件描述符的地方,而注册的事件处理接口也就是这个ListenerReceiver(listener_receiver.h/.cpp)。
跟到ListenerReceiver的handleInputNotification接口:
int ListenerReceiver::handleInputNotification(int fd)
{
int tickcount = 0;
while(tickcount ++ < 256)
{
EndPoint* pNewEndPoint = endpoint_.accept();
if(pNewEndPoint == NULL){
if(tickcount == 1)
{
WARNING_MSG(fmt::format("PacketReceiver::handleInputNotification: accept endpoint({}) {}!\n",
fd, kbe_strerror()));
this->dispatcher().errorReporter().reportException(
REASON_GENERAL_NETWORK);
}
break;
}
else
{
Channel* pChannel = Network::Channel::ObjPool().createObject();
bool ret = pChannel->initialize(networkInterface_, pNewEndPoint, traits_);
if(!ret)
{
ERROR_MSG(fmt::format("ListenerReceiver::handleInputNotification: initialize({}) is failed!\n",
pChannel->c_str()));
pChannel->destroy();
Network::Channel::ObjPool().reclaimObject(pChannel);
return 0;
}
if(!networkInterface_.registerChannel(pChannel))
{
ERROR_MSG(fmt::format("ListenerReceiver::handleInputNotification: registerChannel({}) is failed!\n",
pChannel->c_str()));
pChannel->destroy();
Network::Channel::ObjPool().reclaimObject(pChannel);
}
}
}
return 0;
}
如果你手工撸过epoll就会知道在监听套接口上如果有可读事件,则代表着有新的连接进来,
通常我们就是使用accept来接收这个新连接,然后注册到epoll上,但是kbe在这个ListenerReceiver中不是这么做的,它只是关心监听套接口的可读事件,然后将新的连接封装到它所谓的一个Channel中去了。(kbe用一个EndPoint来表征一个socket终端)。所以我们再跟进这个Channel看看。
bool Channel::initialize(NetworkInterface & networkInterface,
const EndPoint * pEndPoint,
Traits traits,
ProtocolType pt,
PacketFilterPtr pFilter,
ChannelID id)
{
id_ = id;
protocoltype_ = pt;
traits_ = traits;
pFilter_ = pFilter;
pNetworkInterface_ = &networkInterface;
this->pEndPoint(pEndPoint);
KBE_ASSERT(pNetworkInterface_ != NULL);
KBE_ASSERT(pEndPoint_ != NULL);
if(protocoltype_ == PROTOCOL_TCP)
{
if(pPacketReceiver_)
{
if(pPacketReceiver_->type() == PacketReceiver::UDP_PACKET_RECEIVER)
{
SAFE_RELEASE(pPacketReceiver_);
pPacketReceiver_ = new TCPPacketReceiver(*pEndPoint_, *pNetworkInterface_);
}
}
else
{
pPacketReceiver_ = new TCPPacketReceiver(*pEndPoint_, *pNetworkInterface_);
}
KBE_ASSERT(pPacketReceiver_->type() == PacketReceiver::TCP_PACKET_RECEIVER);
// UDP不需要注册描述符
pNetworkInterface_->dispatcher().registerReadFileDescriptor(*pEndPoint_, pPacketReceiver_);
// 需要发送数据时再注册
// pPacketSender_ = new TCPPacketSender(*pEndPoint_, *pNetworkInterface_);
// pNetworkInterface_->dispatcher().registerWriteFileDescriptor(*pEndPoint_, pPacketSender_);
}
else
{
if(pPacketReceiver_)
{
if(pPacketReceiver_->type() == PacketReceiver::TCP_PACKET_RECEIVER)
{
SAFE_RELEASE(pPacketReceiver_);
pPacketReceiver_ = new UDPPacketReceiver(*pEndPoint_, *pNetworkInterface_);
}
}
else
{
pPacketReceiver_ = new UDPPacketReceiver(*pEndPoint_, *pNetworkInterface_);
}
KBE_ASSERT(pPacketReceiver_->type() == PacketReceiver::UDP_PACKET_RECEIVER);
}
pPacketReceiver_->pEndPoint(pEndPoint_);
if(pPacketSender_)
pPacketSender_->pEndPoint(pEndPoint_);
startInactivityDetection((traits_ == INTERNAL) ? g_channelInternalTimeout :
g_channelExternalTimeout,
(traits_ == INTERNAL) ? g_channelInternalTimeout / 2.f:
g_channelExternalTimeout / 2.f);
return true;
}
可以看到在这个initialize接口中,新的EndPoint还是注册到了事件派发器中,只是处理的方式变为了PacketReceiver(虽然实现了UDP和TCP两种PacketReceiver,不过目前似乎只有TCPPacketReceiver被用到了)。
看看PacketReceiver的handleInputNotification:
int PacketReceiver::handleInputNotification(int fd)
{
if (this->processRecv(/*expectingPacket:*/true))
{
while (this->processRecv(/*expectingPacket:*/false))
{
/* pass */;
}
}
return 0;
}
我们跟进processRecv(tcp_packet_receiver.cpp):
bool TCPPacketReceiver::processRecv(bool expectingPacket)
{
Channel* pChannel = getChannel();
KBE_ASSERT(pChannel != NULL);
if(pChannel->isCondemn())
{
return false;
}
TCPPacket* pReceiveWindow = TCPPacket::ObjPool().createObject();
int len = pReceiveWindow->recvFromEndPoint(*pEndpoint_);
if (len < 0)
{
TCPPacket::ObjPool().reclaimObject(pReceiveWindow);
PacketReceiver::RecvState rstate = this->checkSocketErrors(len, expectingPacket);
if(rstate == PacketReceiver::RECV_STATE_INTERRUPT)
{
onGetError(pChannel);
return false;
}
return rstate == PacketReceiver::RECV_STATE_CONTINUE;
}
else if(len == 0) // 客户端正常退出
{
TCPPacket::ObjPool().reclaimObject(pReceiveWindow);
onGetError(pChannel);
return false;
}
Reason ret = this->processPacket(pChannel, pReceiveWindow);
if(ret != REASON_SUCCESS)
this->dispatcher().errorReporter().reportException(ret, pEndpoint_->addr());
return true;
}
不难发现正常情况会调用processPacket,我们跟进(packet_receiver.cpp):
Reason PacketReceiver::processPacket(Channel* pChannel, Packet * pPacket)
{
if (pChannel != NULL)
{
pChannel->onPacketReceived(pPacket->length());
if (pChannel->pFilter())
{
return pChannel->pFilter()->recv(pChannel, *this, pPacket);
}
}
return this->processFilteredPacket(pChannel, pPacket);
}
在没有filter的情况,流程会转向processFilteredPacket(tcp_packet_receiver.cpp):
Reason TCPPacketReceiver::processFilteredPacket(Channel* pChannel, Packet * pPacket)
{
// 如果为None, 则可能是被过滤器过滤掉了(过滤器正在按照自己的规则组包解密)
if(pPacket)
{
pChannel->addReceiveWindow(pPacket);
}
return REASON_SUCCESS;
}
我们跟进addReceiveWindow(channel.cpp):
void Channel::addReceiveWindow(Packet* pPacket)
{
bufferedReceives_.push_back(pPacket);
uint32 size = (uint32)bufferedReceives_.size();
if(Network::g_receiveWindowMessagesOverflowCritical > 0 && size > Network::g_receiveWindowMessagesOverflowCritical)
{
if(this->isExternal())
{
if(Network::g_extReceiveWindowMessagesOverflow > 0 &&
size > Network::g_extReceiveWindowMessagesOverflow)
{
ERROR_MSG(fmt::format("Channel::addReceiveWindow[{:p}]: external channel({}), receive window has overflowed({} > {}), Try adjusting the kbengine_defs.xml->receiveWindowOverflow.\n",
(void*)this, this->c_str(), size, Network::g_extReceiveWindowMessagesOverflow));
this->condemn();
}
else
{
WARNING_MSG(fmt::format("Channel::addReceiveWindow[{:p}]: external channel({}), receive window has overflowed({} > {}).\n",
(void*)this, this->c_str(), size, Network::g_receiveWindowMessagesOverflowCritical));
}
}
else
{
if(Network::g_intReceiveWindowMessagesOverflow > 0 &&
size > Network::g_intReceiveWindowMessagesOverflow)
{
WARNING_MSG(fmt::format("Channel::addReceiveWindow[{:p}]: internal channel({}), receive window has overflowed({} > {}).\n",
(void*)this, this->c_str(), size, Network::g_intReceiveWindowMessagesOverflow));
}
}
}
}
可以看到,正常情况下,一个包(客户端与服务端的一个有效链接上的负载)接收之后先被放到Channel的bufferedReceives队列。于是我们还需要找到何处处理这个包。
我们可以在Channel的processPackets接口内找到处理bufferedReceives(channel.cpp):
void Channel::processPackets(KBEngine::Network::MessageHandlers* pMsgHandlers)
{
lastTickBytesReceived_ = 0;
lastTickBytesSent_ = 0;
if(pMsgHandlers_ != NULL)
{
pMsgHandlers = pMsgHandlers_;
}
if (this->isDestroyed())
{
ERROR_MSG(fmt::format("Channel::processPackets({}): channel[{:p}] is destroyed.\n",
this->c_str(), (void*)this));
return;
}
if(this->isCondemn())
{
ERROR_MSG(fmt::format("Channel::processPackets({}): channel[{:p}] is condemn.\n",
this->c_str(), (void*)this));
//this->destroy();
return;
}
if(pPacketReader_ == NULL)
{
handshake();
}
try
{
BufferedReceives::iterator packetIter = bufferedReceives_.begin();
for(; packetIter != bufferedReceives_.end(); ++packetIter)
{
Packet* pPacket = (*packetIter);
pPacketReader_->processMessages(pMsgHandlers, pPacket);
RECLAIM_PACKET(pPacket->isTCPPacket(), pPacket);
}
}catch(MemoryStreamException &)
{
Network::MessageHandler* pMsgHandler = pMsgHandlers->find(pPacketReader_->currMsgID());
WARNING_MSG(fmt::format("Channel::processPackets({}): packet invalid. currMsg=(name={}, id={}, len={}), currMsgLen={}\n",
this->c_str()
, (pMsgHandler == NULL ? "unknown" : pMsgHandler->name)
, pPacketReader_->currMsgID()
, (pMsgHandler == NULL ? -1 : pMsgHandler->msgLen)
, pPacketReader_->currMsgLen()));
pPacketReader_->currMsgID(0);
pPacketReader_->currMsgLen(0);
condemn();
}
bufferedReceives_.clear();
}
到这里我们又要弄清楚两个问题,这个接口何时被谁调用,调用又做了些什么,为了联通整个流程,我们还是先弄清楚这个接口在哪被谁调用。
通过vs的“Find All References”功能我顺利地找到了这个接口被调用的地方(network_interface.cpp):
void NetworkInterface::processChannels(KBEngine::Network::MessageHandlers* pMsgHandlers)
{
ChannelMap::iterator iter = channelMap_.begin();
for(; iter != channelMap_.end(); )
{
Network::Channel* pChannel = iter->second;
if(pChannel->isDestroyed())
{
++iter;
}
else if(pChannel->isCondemn())
{
++iter;
deregisterChannel(pChannel);
pChannel->destroy();
Network::Channel::ObjPool().reclaimObject(pChannel);
}
else
{
pChannel->processPackets(pMsgHandlers);
++iter;
}
}
}
同理,我们得找到processChannels被调用的地方。(几乎在每个继承自ServerApp的XXXApp的handleCheckStatusTick接口内都发现了类似下面的代码):(loginapp.cpp)
void Loginapp::handleCheckStatusTick()
{
threadPool_.onMainThreadTick();
networkInterface().processChannels(&LoginappInterface::messageHandlers);
pendingLoginMgr_.process();
pendingCreateMgr_.process();
}
在XXXApp的handleTimeout的接口内我们找到了下面的代码:(loginapp.cpp)
void Loginapp::handleTimeout(TimerHandle handle, void * arg)
{
switch (reinterpret_cast<uintptr>(arg))
{
case TIMEOUT_CHECK_STATUS:
this->handleCheckStatusTick();
return;
default:
break;
}
ServerApp::handleTimeout(handle, arg);
}
根据ServerApp的父类TimerHandler的handleTimeout接口顺利地找到了下面的代码:(timer.inl)
template <class TIME_STAMP>
void TimersT< TIME_STAMP >::Time::triggerTimer()
{
if (!this->isCancelled())
{
state_ = TIME_EXECUTING;
pHandler_->handleTimeout( TimerHandle( this ), pUserData_ );
if ((interval_ == 0) && !this->isCancelled())
{
this->cancel();
}
}
if (!this->isCancelled())
{
time_ += interval_;
state_ = TIME_PENDING;
}
}
找到调用triggerTimer的地方:(timer.inl)
template <class TIME_STAMP>
int TimersT< TIME_STAMP >::process(TimeStamp now)
{
int numFired = 0;
while ((!timeQueue_.empty()) && (
timeQueue_.top()->time() <= now ||
timeQueue_.top()->isCancelled()))
{
Time * pTime = pProcessingNode_ = timeQueue_.top();
timeQueue_.pop();
if (!pTime->isCancelled())
{
++numFired;
pTime->triggerTimer();
}
if (!pTime->isCancelled())
{
timeQueue_.push( pTime );
}
else
{
delete pTime;
KBE_ASSERT( numCancelled_ > 0 );
--numCancelled_;
}
}
pProcessingNode_ = NULL;
lastProcessTime_ = now;
return numFired;
}
找到调用TimerHandler的process接口的地方:(event_dispatcher.cpp,是不是感觉这个文件很熟悉。。。)
void EventDispatcher::processTimers()
{
numTimerCalls_ += pTimers_->process(timestamp());
}
最终终于找到了我们失散多年的整体流程:(event_dispatcher.cpp)
int EventDispatcher::processOnce(bool shouldIdle)
{
if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING)
breakProcessing_ = EVENT_DISPATCHER_STATUS_RUNNING;
this->processTasks();
if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){
this->processTimers();
}
this->processStats();
if(breakProcessing_ != EVENT_DISPATCHER_STATUS_BREAK_PROCESSING){
return this->processNetwork(shouldIdle);
}
return 0;
}
其实上面的这个过程有点小艰辛,花了好几个小时才完工。。。其实看到Channel::addReceiveWindow就应该想到这里的,因为这里的bufferedReceives的push_back操作没有加锁(stl的vector是非线程安全的),所以他应该是和EventDispatcher::processNetwork接口在同一个线程的同步调用流程上,如此一来就只可能是EventDispatcher::processTasks,EventDispatcher::processTimers,EventDispatcher::processStats中的一个了。
我们接着回到之前的包处理的分支流程(就是Channel的processPackets接口)。
在packet_reader.cpp内我们可以找到PacketReader::processMesages接口(代码太长,不贴了)。最终我们可以看到消息对应的handler的处理接口调用:
pMsgHandler->handle(pChannel_, *pFragmentStream_);
建立消息对应的handler的映射在XXXapp_interface.h中(各种宏,看的头都大了,留到后面各个组件的单独消息分析中再说)
至此,我们可以得出一个kbengine底层的大致流程的轮廓。
其实kbe里面还有很多“*”可以拿出来细说,像是单例,定时器,内存池,线程池,以及用来通讯的序列化和反序列化(MemoryStream)机制。。。。。。
总得来讲,大体因为底层网络(select,epoll)的异步回调机制,使得整个流程都有点小乱的感觉。“*”有点多,不利于利用现有的知识。一个高性能的c/c++项目几乎总会用到一些语言的 奇淫技巧,不过对于分布式的项目,既然我们已经做好了不追求单点高性能的心理准备,就不应该再为了追求性能而损失可读性。在单点性能可接受的程度上提高代码的可读性,提高整体的水平扩展能力,才是分布式项目的正道。
1. machine
2. loginapp
3. dbmgr
4. caseapp
5. baseappmgr
6. cellapp
7. cellappmgr
脚本逻辑层:
各种工具: