本篇将介绍客户端与游戏逻辑服务器连接建立以后,mangosd如何接收、解析和处理客户端发过来的协议。本篇不再讨论mangosd与客户端的认证及建立最终RC4流加密的过程,想了解这部分内容请看该系列的第一篇。
一、acceptor socket的监听启动及注册
mangosd的main ()函数调用单例对象sMaster的Run ()函数,启动监听socket的代码如下:
1: int Master::Run()
2: {
3: ........
4:
5: ///- Launch the world listener socket
6: uint16 wsport = sWorld.getConfig (CONFIG_UINT32_PORT_WORLD);
7: std::string bind_ip = sConfig.GetStringDefault ("BindIP", "0.0.0.0");
8:
9: if (sWorldSocketMgr->StartNetwork (wsport, bind_ip) == -1)
10: {
11: sLog.outError ("Failed to start network");
12: Log::WaitBeforeContinueIfNeed();
13: World::StopNow(ERROR_EXIT_CODE);
14: // go down and shutdown the server
15: }
16:
17: sWorldSocketMgr->Wait ();
18:
19: ........
20: }
mangosd用WorldSocketMgr类来管理socket。StartNetwork ()会调用StartReactiveIO ()来启动监听socket,处理代码如下:
1: int WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
2: {
3: ........
4:
5: //(1)
6: m_NetThreadsCount = static_cast<size_t> (num_threads + 1);
7: m_NetThreads = new ReactorRunnable[m_NetThreadsCount];
8:
9: // -1 means use default
10: m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1);
11: m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536);
12: if ( m_SockOutUBuff <= 0 )
13: {
14: sLog.outError ("Network.OutUBuff is wrong in your config file");
15: return -1;
16: }
17:
18: //(2)
19: WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;
20: m_Acceptor = acc;
21:
22: ACE_INET_Addr listen_addr (port, address);
23: if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1)
24: {
25: sLog.outError ("Failed to open acceptor ,check if the port is free");
26: return -1;
27: }
28:
29: //(3)
30: for (size_t i = 0; i < m_NetThreadsCount; ++i)
31: m_NetThreads[i].Start ();
32:
33: return 0;
34: }
(1)ReactorRunnable类继承了ACE_Task_Base,ACE_Task_Base 是ACE 中的任务或主动对象“处理结构”的基类。在ACE 中使用了此类来实现主动对象模式。所有希望成为“主动对象”的对象都必须从此类派生。可以把ACE_TASK 看作是更高级的、更为面向对象的线程类[1]。ACE_Task_Base调用时继承类必须重写svc方法,并且在使用时保证调用了activate ()方法。
(2)指定监听地址,端口并把Acceptor绑定到第一个线程的Reactor上。启动Acceptor开始监听网络IO。
(3)启动所有线程,每个线程上有一个单独的ACE_Reactor* m_Reactor;,这里的Reactor使用的是多线程的ACE_TP_Reactor。可以各自单独完成事件的多路复用。
二、线程体函数
线程体函数ReactorRunnable::svc () 如下:
1: virtual int svc ()
2: {
3: //(1)
4: WorldDatabase.ThreadStart ();
5:
6: SocketSet::iterator i, t;
7: while (!m_Reactor->reactor_event_loop_done ())
8: {
9: // dont be too smart to move this outside the loop
10: // the run_reactor_event_loop will modify interval
11: ACE_Time_Value interval (0, 10000);
12:
13: //(2)
14: if (m_Reactor->run_reactor_event_loop (interval) == -1)
15: break;
16:
17: //(3)
18: AddNewSockets ();
19:
20: for (i = m_Sockets.begin (); i != m_Sockets.end ();)
21: {
22: //(4)
23: if ((*i)->Update () == -1)
24: {
25: t = i;
26: ++i;
27: (*t)->CloseSocket ();
28: (*t)->RemoveReference ();
29: --m_Connections;
30: m_Sockets.erase (t);
31: }
32: else
33: ++i;
34: }
35: }
36:
37: WorldDatabase.ThreadEnd ();
38: DEBUG_LOG ("Network Thread Exitting");
39:
40: return 0;
41: }
(1)会调用mysql_thread_init ()函数,初始化与该线程相关的变量。
(2)run_reactor_event_loop ()函数为多路复用的等待函数,当注册的事件发生、运行超时或者出现错误时返回。
(3)AddNewSockets ()函数会将缓存在m_NewSockets里的新到达的socket添加到SocketSet m_Sockets;里,同时检查并处理m_Sockets;里已经closed的socket。
(4)循环每一个WorldSocket,调用其Update ()方法,这里只处理每个socket的handle_output,即每个在此线程上的写事件,向客户端发送数据。下一节详细介绍:
三、WorldSocket::Update () 方法
Update方法用于处理每个socket的输出:
1: int WorldSocket::Update (void)
2: {
3: if (closing_)
4: return -1;
5:
6: //(1)
7: if (m_OutActive || m_OutBuffer->length () == 0)
8: return 0;
9:
10: return handle_output (get_handle ());
11: }
(1)m_OutBuffer有数据时才会调用handle_output,handle_output ()用于处理输出,如果输出不能一次性做完,会调用schedule_wakeup_output ()再次激活write事件。当输出处理完毕后则调用cancel_wakeup_output ()取消激活write事件,使reactor恢复到正常的loop ()循环中。详细过程如下:
1: int WorldSocket::handle_output (ACE_HANDLE)
2: {
3: //(1)
4: ACE_GUARD_RETURN (LockType, Guard, m_OutBufferLock, -1);
5:
6: if (closing_)
7: return -1;
8:
9: const size_t send_len = m_OutBuffer->length ();
10: if (send_len == 0)
11: return cancel_wakeup_output (Guard);
12:
13: #ifdef MSG_NOSIGNAL
14: ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
15: #else
16: ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len);
17: #endif // MSG_NOSIGNAL
18:
19: if (n == 0)
20: return -1;
21: else if (n == -1)
22: {
23: if (errno == EWOULDBLOCK || errno == EAGAIN) //----------(2)
24: return schedule_wakeup_output (Guard);
25:
26: return -1;
27: }
28: else if (n < (ssize_t)send_len) //now n > 0 //----------(3)
29: {
30: m_OutBuffer->rd_ptr (static_cast<size_t> (n));
31:
32: // move the data to the base of the buffer
33: m_OutBuffer->crunch ();
34:
35: return schedule_wakeup_output (Guard);
36: }
37: else //now n == send_len //----------(4)
38: {
39: m_OutBuffer->reset ();
40:
41: if (!iFlushPacketQueue ())
42: return cancel_wakeup_output (Guard);
43: else
44: return schedule_wakeup_output (Guard);
45: }
46:
47: ACE_NOTREACHED (return 0);
48: }
(1)对m_OutBuffer加锁。
(2)考虑信号打断的情况等,暂时不能写。
(3)只发送了部分数据则继续wakeup该线程对应的Reactor。
(4)检查m_OutBuffer数据发送完毕同时等待buffer(PacketQueueT m_PacketQueue;)里已经没有数据时,cancel wakeup让Reactor恢复正常。
四、socket到对应线程的指派
上一节内容可以看到线程内如何处理socket,及新到达的socket。但从第一节中可知只有第一个线程注册为acceptor线程,那么新连接到达时,是如何被指派到对应的“接待”线程的呢?
可以先看一下ACE_Acceptor的处理时序图:
图3.1 连接到达处理时序 [2]
上图可以看出,当连接到达时,acceptor会调用对应的SVC_HANDLER的open ()函数,在mangosd里就是acceptor对应的int WorldSocket::open (void *a),如下:
1: int WorldSocket::open (void *a)
2: {
3: ........
4:
5: // Hook for the manager.
6: if (sWorldSocketMgr->OnSocketOpen (this) == -1)
7: return -1;
8:
9: ........
10: }
OnSocketOpen方法:
1: int WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
2: {
3: ........
4:
5: // we skip the Acceptor Thread
6: size_t min = 1;
7:
8: MANGOS_ASSERT (m_NetThreadsCount >= 1);
9:
10: //(1)
11: for (size_t i = 1; i < m_NetThreadsCount; ++i)
12: if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ())
13: min = i;
14:
15: return m_NetThreads[min].AddSocket (sock);
16: }
(1)将WorldSocket均衡的分配给每个线程。AddSocket ()将socket添加到m_NewSockets中做缓存,待该线程自行调用AddNewSockets ()添加到处理队列里。
总结:
mangosd对socket的处理因为使用了ACE,逻辑处理代码相对比较简单,写事件的异常处理主要涉及
(1)一次不能写完则不断的wakeup Reactor。
(2)信号中断等错误的判断。似乎这里并没有考虑全面(见附录)
(3)使用另一个buffer缓存,因写缓存m_OutBuffer满而带来的多出的数据。
References:
[1] http://blog.csdn.net/yecao_kinux/article/details/1546914
[2] http://postfiles12.naver.net/data41/2009/4/11/187/33_kbkim007.jpg?type=w3
附录:
不同平台不同版本的read/write、send/recv有些不同,挺郁闷的一件事……………
EWOULDBLOCK:基于 berkeley 实现,当客户端异常终止连接时
ECONNABORTED:基于 posix 实现,当客户端异常终止连接时
EPROTO:基于 SVR4 实现,当客户端异常终止连接时)以及 EINTR