详谈高性能UDP服务器的开发[转]

时间:2021-06-27 19:45:47
详谈高性能UDP服务器的开发

上一篇文章我详细介绍了如何开发一款高性能的TCP服务器的网络传输层.本章我将谈谈如何开发一个高性能的UDP服务器的网络层.UDP服务器的网络层开发相对与TCP服务器来说要容易和简单的多,UDP服务器的大致流程为创建一个socket然后将其绑定到完成端口上并投递一定数量的recv操作.当有数据到来时从完成队列中取出数据发送到接收队列中即可。
   测试结果如下:
     WindowsXP Professional,Intel Core Duo E4600 双核2.4G , 2G内存。同时30K个用户和该UDP服务器进行交互其CPU使用率为10%左右,内存占用7M左右。
   下面详细介绍该服务器的架构及流程:
下面将主要介绍UdpSer类, 该类主要用来管理UDP服务.其定义如下:
1     class DLLENTRY UdpSer
2     {
3     public:
4         UdpSer();
5         ~UdpSer();
6
7         /**//************************************************************************
8         * Desc : 初始化静态资源,在申请UDP实例对象之前应先调用该函数, 否则程序无法正常运行
9         ************************************************************************/
10         static void InitReource();
11
12         /**//************************************************************************
13         * Desc : 在释放UDP实例以后, 掉用该函数释放相关静态资源
14         ************************************************************************/
15         static void ReleaseReource();
16
17         //用指定本地地址和端口进行初始化
18         BOOL StartServer(const CHAR* szIp = "0.0.0.0", INT nPort = 0);
19
20         //从数据队列的头部获取一个接收数据, pCount不为null时返回队列的长度
21         UDP_RCV_DATA* GetRcvData(DWORD* pCount);
22
23         //向对端发送数据
24         BOOL SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen);
25
26         /**//****************************************************
27         * Name : CloseServer()
28         * Desc : 关闭服务器
29         ****************************************************/
30         void CloseServer();
31
32     protected:
33         SOCKET m_hSock;
34         vector<UDP_RCV_DATA* > m_RcvDataQue;                 //接收数据队列
35         CRITICAL_SECTION m_RcvDataLock;                         //访问m_RcvDataQue的互斥锁
36         long volatile m_bThreadRun;                                 //是否允许后台线程继续运行
37         BOOL m_bSerRun;                                             //服务器是否正在运行
38
39         HANDLE *m_pThreads;                 //线程数组
40         HANDLE m_hCompletion;                     //完成端口句柄
41
42         void ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);
43
44         /**//****************************************************
45         * Name : WorkThread()
46         * Desc : I/O 后台管理线程
47         ****************************************************/
48         static UINT WINAPI WorkThread(LPVOID lpParam);
49     };
1. InitReource() 主要对相关的静态资源进行初始化.其实大致和TcpServer::InitReource()大致相同.在UdpSer实例使用之前必须调用该函数进行静态资源的初始化, 否则服务器无法正常使用.

 

2.ReleaseReource() 主要对相关静态资源进行释放.只有在应用程序结束时才能调用该函数进行静态资源的释放.

3. StartServer()
该函数的主要功能启动一个UDP服务.其大致流程为先创建服务器UDP socket, 将其绑定到完成端口上然后投递一定数量的recv操作以接收客户端的数据.其实现如下:
1     BOOL UdpSer::StartServer(const CHAR* szIp /**//* =   */, INT nPort /**//* = 0 */)
2     {
3         BOOL bRet = TRUE;
4         const int RECV_COUNT = 500;
5         WSABUF RcvBuf = { NULL, 0 };
6         DWORD dwBytes = 0;
7         DWORD dwFlag = 0;
8         INT nAddrLen = sizeof(IP_ADDR);
9         INT iErrCode = 0;
10
11         try
12         {
13             if (m_bSerRun)
14             {
15                 THROW_LINE;
16             }
17
18             m_bSerRun = TRUE;
19             m_hSock = WSASocket(AF_INET, SOCK_DGRAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
20             if (INVALID_SOCKET == m_hSock)
21             {
22                 THROW_LINE;
23             }
24             ULONG ul = 1;
25             ioctlsocket(m_hSock, FIONBIO, &ul);
26
27             //设置为地址重用,优点在于服务器关闭后可以立即启用
28             int nOpt = 1;
29             setsockopt(m_hSock, SOL_SOCKET, SO_REUSEADDR, (char*)&nOpt, sizeof(nOpt));
30
31             //关闭系统缓存,使用自己的缓存以防止数据的复制操作
32             INT nZero = 0;
33             setsockopt(m_hSock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero));
34             setsockopt(m_hSock, SOL_SOCKET, SO_RCVBUF, (CHAR*)&nZero, sizeof(nZero));
35
36             IP_ADDR addr(szIp, nPort);
37             if (SOCKET_ERROR == bind(m_hSock, (sockaddr*)&addr, sizeof(addr)))
38             {
39                 closesocket(m_hSock);
40                 THROW_LINE;
41             }
42
43             //将SOCKET绑定到完成端口上
44             CreateIoCompletionPort((HANDLE)m_hSock, m_hCompletion, 0, 0);
45
46             //投递读操作
47             for (int nIndex = 0; nIndex < RECV_COUNT; nIndex++)
48             {
49                 UDP_CONTEXT* pRcvContext = new UDP_CONTEXT();
50                 if (pRcvContext && pRcvContext->m_pBuf)
51                 {
52                     dwFlag = 0;
53                     dwBytes = 0;
54                     nAddrLen = sizeof(IP_ADDR);
55                     RcvBuf.buf = pRcvContext->m_pBuf;
56                     RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;
57
58                     pRcvContext->m_hSock = m_hSock;
59                     pRcvContext->m_nOperation = OP_READ;            
60                     iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)
61                         , &nAddrLen, &(pRcvContext->m_ol), NULL);
62                     if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
63                     {
64                         delete pRcvContext;
65                         pRcvContext = NULL;
66                     }
67                 }
68                 else
69                 {
70                     delete pRcvContext;
71                 }
72             }
73         }
74         catch (const long &lErrLine)
75         {            
76             bRet = FALSE;
77             _TRACE("Exp : %s -- %ld ", __FILE__, lErrLine);            
78         }
79
80         return bRet;
81     }4. GetRcvData(), 从接收队列中取出一个数据包.
1     UDP_RCV_DATA *UdpSer::GetRcvData(DWORD* pCount)
2     {
3         UDP_RCV_DATA* pRcvData = NULL;
4
5         EnterCriticalSection(&m_RcvDataLock);
6         vector<UDP_RCV_DATA* >::iterator iterRcv = m_RcvDataQue.begin();
7         if (iterRcv != m_RcvDataQue.end())
8         {
9             pRcvData = *iterRcv;
10             m_RcvDataQue.erase(iterRcv);
11         }
12
13         if (pCount)
14         {
15             *pCount = (DWORD)(m_RcvDataQue.size());
16         }
17         LeaveCriticalSection(&m_RcvDataLock);
18
19         return pRcvData;
20     }
5. SendData() 发送指定长度的数据包.
1     BOOL UdpSer::SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen)
2     {
3         BOOL bRet = TRUE;
4         try
5         {
6             if (nLen >= 1500)
7             {
8                 THROW_LINE;
9             }
10
11             UDP_CONTEXT* pSendContext = new UDP_CONTEXT();
12             if (pSendContext && pSendContext->m_pBuf)
13             {
14                 pSendContext->m_nOperation = OP_WRITE;
15                 pSendContext->m_RemoteAddr = PeerAddr;        
16
17                 memcpy(pSendContext->m_pBuf, szData, nLen);
18
19                 WSABUF SendBuf = { NULL, 0 };
20                 DWORD dwBytes = 0;
21                 SendBuf.buf = pSendContext->m_pBuf;
22                 SendBuf.len = nLen;
23
24                 INT iErrCode = WSASendTo(m_hSock, &SendBuf, 1, &dwBytes, 0, (sockaddr*)&PeerAddr, sizeof(PeerAddr), &(pSendContext->m_ol), NULL);
25                 if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
26                 {
27                     delete pSendContext;
28                     THROW_LINE;
29                 }
30             }
31             else
32             {
33                 delete pSendContext;
34                 THROW_LINE;
35             }
36         }
37         catch (const long &lErrLine)
38         {
39             bRet = FALSE;
40             _TRACE("Exp : %s -- %ld ", __FILE__, lErrLine);            
41         }
42
43         return bRet;
44     }
6. CloseServer() 关闭服务
1     void UdpSer::CloseServer()
2     {
3         m_bSerRun = FALSE;
4         closesocket(m_hSock);
5     }
7. WorkThread() 在完成端口上工作的后台线程
1     UINT WINAPI UdpSer::WorkThread(LPVOID lpParam)
2     {
3         UdpSer *pThis = (UdpSer *)lpParam;
4         DWORD dwTrans = 0, dwKey = 0;
5         LPOVERLAPPED pOl = NULL;
6         UDP_CONTEXT *pContext = NULL;
7
8         while (TRUE)
9         {
10             BOOL bOk = GetQueuedCompletionStatus(pThis->m_hCompletion, &dwTrans, &dwKey, (LPOVERLAPPED *)&pOl, WSA_INFINITE);
11
12             pContext = CONTAINING_RECORD(pOl, UDP_CONTEXT, m_ol);
13             if (pContext)
14             {
15                 switch (pContext->m_nOperation)
16                 {
17                 case OP_READ:
18                     pThis->ReadCompletion(bOk, dwTrans, pOl);
19                     break;
20                 case OP_WRITE:
21                     delete pContext;
22                     pContext = NULL;
23                     break;
24                 }
25             }
26
27             if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0))
28             {
29                 break;
30             }
31         }
32
33         return 0;
34     }
8.ReadCompletion(), 接收操作完成后的回调函数
1     void UdpSer::ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
2     {
3         UDP_CONTEXT* pRcvContext = CONTAINING_RECORD(lpOverlapped, UDP_CONTEXT, m_ol);
4         WSABUF RcvBuf = { NULL, 0 };
5         DWORD dwBytes = 0;
6         DWORD dwFlag = 0;
7         INT nAddrLen = sizeof(IP_ADDR);
8         INT iErrCode = 0;
9
10         if (TRUE == bSuccess && dwNumberOfBytesTransfered <= UDP_CONTEXT::S_PAGE_SIZE)
11         {
12#ifdef _XML_NET_
13             EnterCriticalSection(&m_RcvDataLock);
14
15             UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered, pRcvContext->m_RemoteAddr);
16             if (pRcvData && pRcvData->m_pData)
17             {
18                 m_RcvDataQue.push_back(pRcvData);
19             }    
20             else
21             {
22                 delete pRcvData;
23             }
24
25             LeaveCriticalSection(&m_RcvDataLock);
26#else
27             if (dwNumberOfBytesTransfered >= sizeof(PACKET_HEAD))
28             {
29                 EnterCriticalSection(&m_RcvDataLock);
30
31                 UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered, pRcvContext->m_RemoteAddr);
32                 if (pRcvData && pRcvData->m_pData)
33                 {
34                     m_RcvDataQue.push_back(pRcvData);
35                 }    
36                 else
37                 {
38                     delete pRcvData;
39                 }
40
41                 LeaveCriticalSection(&m_RcvDataLock);
42             }
43#endif
44
45             //投递下一个接收操作
46             RcvBuf.buf = pRcvContext->m_pBuf;
47             RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;
48
49             iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)
50                 , &nAddrLen, &(pRcvContext->m_ol), NULL);
51             if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
52             {
53                 ATLTRACE("/r/n%s -- %ld dwNumberOfBytesTransfered = %ld, LAST_ERR = %ld"
54                     , __FILE__, __LINE__, dwNumberOfBytesTransfered, WSAGetLastError());
55                 delete pRcvContext;
56                 pRcvContext = NULL;
57             }
58         }
59         else
60         {
61             delete pRcvContext;
62         }
63     }