可伸缩的IO完成端口服务器模型
来源:CodeProject
翻译:Kevin Chen
Email:chcucl@sina.com
URL:http://www.codeproject.com/KB/IP/IOCP_how_to_cook.aspx
备注:此文是本人首次翻译,如果不精确或者难以理解的地方,请查阅原文章。
1、简介:
该文主要介绍三个主题:线程管理,内存管理,和客户端套接字处理结构。这里使用TCP套接字。
2、线程
通常,当开发服务器程序时,线程模型的使用可以被分为以下两种:
l 容易实现的,一个(服务)线程对应一个客户端连接/套接字
l 复杂一点的,使用线程池(如:固定大小线程池,基于超时的线程池,可变大小的线程池,等等)。
而第一个模型通常是最容易实现的一个,但是当服务于大量连接的时候不是一个最好的选择,有两点原因:
l 上下文切换,它是OS对相同或者不同进程的两个正在运行的进程或线程进行切换的处理,包括保存和载入运行线程/进程的上下文,在这种方式下好像每个个线程继续执行,但事实上,它们也许共享一个CPU。显然,上下文切换需要时间,并且这个时间基于操作系统的实现。在Windows操作系统中,切换大概需要十亿分之一秒。而在一些旧版本/分布式LINUX系统中,切换大概需要2毫秒,所以,假如创建了1000个线程,并且所有线程有相同的优先级,则有2毫秒乘以1000,约2秒耗费在上下文切换上,这是不可接受的。显然,需要一种不同的模型
l 通常,线程在“一个线程对应一个连接”的模型中,线程大部分时间什么也不做。比如:当执行套接字的阻塞I/O操作时线程会被挂起。创建大量的线程不过是浪费资源。
文章中所描述的IOCP框架所使用的线程模型是CSimpleThreadPool类。这个线程框架并没有什么特别的,我会提供一些我使用这种模型的原因:
l 它非常轻巧,非常有用而且容易使用。
l CSimpleThreadPool是一个固定大小的线程池。预先创建一定数量的线程将有助于减少运行时用于创建线程的时间,因为它们已经被创建。因为创建线程是需要时间的(操作系统必须创建一个系统对象,创建一个线程栈,等等)。所以这将获得更好的性能。
l 主要原因是,CSimpleThreadPool是一个包含一个优先级队列的线程池。下面详细描述其细节,IOCP队列不会对完成请求按优先级进行排序。所有的完成请求拥有相同的优先级。所以,可以写一个简单的程序,建立数千个连接,然后一下子全关闭它们,重复创建和关闭...。IOCP将在队列中记录大量的关闭请求,事实上,服务端会仅忙于处理关闭请求。这是一个拒绝服务攻击列子。IOCP没有使用优先级队列对完成请求进行排序是很糟糕的,但是,我猜想也许有什么技术原因导致没有使用优先级队列。然而,来自IOCP的完成请求会被传递到CSimpleThreadPool的队列,并且它们会被按照优先级排序。因此,关闭连接请求会有比其他完成请求更低的优先级。这是一种相对于前文所描述的攻击的保护策略
3、内存管理
和开发服务器程序相关的另一个问题是正确的内存管理,尽管非常熟悉的操作如new和delete(类似的,malloc和free,或者其它的低等级的WinSDK API)用起来非常繁琐,但问题是,当谈到这个程序运行一天24小时,每周7天,每年365天。
问题被称为内存碎片。这些操作(API)被设计为可以分配连续的内存块,但是如果程序频繁的执行new/delete,并且内存块大小不一,程序将以new返回NULL而结束(也许会时常发生),尽管程序在堆上有足够的空间。这种情况也许会经常发生,因为没有满足请求大小的连续的空余内存空间,换句话说,我们有内存碎片,或早或晚,服务器将失败。
有几种方案应对内存碎片问题:
l 实现垃圾回收,执行碎片整理。
l 自定义内存管理旨在防止潜在的内存碎片,对new/delete的重载,仅仅是为了是最终解决方案优雅而已。
l 使用new(malloc)操作分配一个指定大小的内存块,不过,内存仍然会有碎片,但是碎片是固定大小(或者是一个系数乘以固定大小)。这是一种非常容易实现的机制。然而,如果实际内存是从1KB到1MB,分配1MB的内存块,即使只需要1KB。这只不过是浪费内存。
l 预分配一块内存区域,在这种情况下,当可以估算服务一个连接所需要的内存大小,用单个连接所需内存大小值乘以允许的最大连接数(配置参数)。通过所获取的值,分配一块指定大小的连续内存并且在这块内存区域中执行操作。这是另一种非常容易使用机制,并且工作良好,前提是能够估算内存大小。
l 重用所使用的内存块。这种构思非常简单,一旦分配了一个内存块,当它不用时不要删除它,仅仅标记为未使用并且在下一次重用它。这也许看起来像内存泄露,但事实上,这仅仅是处理内存碎片的一种策略。
下面一个模板类,运用了以上所描述的“预分配内存区域”和“重用内存块”机制。
// A template class allowing re-using of the memory blocks.
template<class T>
class QueuedBlocks {
private:
QMutex m_qMutex;
set< T* > m_quBlocks;
vector< T* > m_allBlocks;
public:
QueuedBlocks(int nInitSize = 1):
m_qMutex(), m_quBlocks(), m_allBlocks() {...};
// get a free block from the queue, if one cannot be found
// then NULL is returned
T* GetFromQueue() {...};
// get a free block from the queue, if one cannot be found
// then a new one is created
T* Get() {...};
// Release the used block, place it
// back to the queue. For performance reason,
// we assume that the block was previously taken
// from the queue.
void Release(T* t) {...};
// Return all the blocks ever allocated.
vector<T*> *GetBlocks() {...};
~QueuedBlocks() {...};
};
这样,通过构造函数,我们预分配了一些需要的对象,并且:
l 如果仅仅需要“预分配内存区域”机制,仅仅使用GetFromQueue()和Release()函数,这样它将会“在已分配内存区域中”工作
l 如果需要“重用内存块”机制(同“预分配内存区域机制一起使用”),使用Get()和Release()函数,如果预分配的空间不够用,它将扩展预分配内存区域。
因为这是一个通用模板,它允许自定义一个不同类/结构的队列。这种实现的问题是,每个特定的类必须有一个默认的构造函数和一个 Clear() 函数。 Clear() 函数目的是在内存块将要被重用前执行内部清理,
另一个有用的模板类是:
// A template class used for providing free blocks as well
// as releasing unnecessary ones. Uses "QueuedBlocks" which
// allows reusing of the blocks.
template<class T>
class StaticBlocks {
private:
static QueuedBlocks<T> *blocks;
public:
static void Init(int nSize = 1) {...};
static T *Get() {...};
static void Release(T *b) {...};
};
它代表QueuedBlocks的一个适配器,但是所有函数都被定义为static,当需要在不同类*享内存块时这将非常有用。
4、关于IOCP的几个事实
如果你正在读这篇文章,你也许知道使用IOCP的优势。如果你不知道,那么...,IOCP是开发可伸缩的服务器程序最好的模型,它能仅使用少量线程处理数千个连接。这是如何实现的?通过使用一种称为“I/O完成端口”的系统对象,该系统对象使一个程序接收异步I/O的完成通知。所以,任何I/O操作都是异步(立即完成),程序(或线程)在I/O操作完成时收到通知,因此,程序(或者程序中的线程)有足够的时间去处理任何其它的操作(而不是等待I/O操作,这通常发生在“一个线程服务一个连接/套接字”模型上)。让我们概述下我们所知道的关于IOCP的事实。
首先,为了使套接字受益于IOCP,必须使用WinSock API的重叠I/O版本。
WSASend(...);
WSARecv(...);
第二点,一个套接字必须被设置为支持重叠API。
nSock = WSASocket(..., ..., ..., ..., ..., WSA_FLAG_OVERLAPPED);
同时,我们也需要创建一个IOCP句柄。
hIocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );
然后,我们需要将这个套接字关联到IOCP上。
CreateIoCompletionPort( (HANDLE) nSock, hIocp, ..., ... );
现在,当在套接字上使用重叠I/O操作时,I/O操作必须被注册到IOCP的队列里。这个注册项仅仅在执行I/O操作时发生,否则,这个注册项不会被执行。尽管这个套接字已经和IOCP关联。当一个重叠I/O操作的完成时,将通过下面的调用从IOCP队列中返回(事实上,仅仅是一些操作的详细信息,但是,它有些技巧,后面将详细介绍):
GetQueuedCompletionStatus( hIocp, &dwBytesTransferred, ..., lpOverlapped, dwTimeout );
至此,我们知道一个套接字的I/O操作已经成功完成。根据这些事实,让我们开始一些支持这些机制的类的实现。
5、Client/Server 套接字
首先,让我们分析ClientSocket 这个模板类(完整的源代码,请查看附件资源)
// A class wrapping basic client socket's operations
template<class T>
class ClientSocket {
private:
SOCKET m_ClientSock;
volatile unsigned int m_nSession;
QMutex m_qMutex;
struct sockaddr_in m_psForeignAddIn;
volatile bool m_blnIsBusy;
T m_objAttachment;
protected:
public:
ClientSocket(): m_objAttachment(), m_qMutex() {...}
~ClientSocket() {...}
// Is the object assigned a socket.
bool IsBusy() {...};
// Method is used by 'QueuedBlocks' from
// "mem_manager.h".
void Clear() {...}
// Returns the socket associated with the object.
SOCKET GetSocket() {...}
// Returns the session of the current client socket.
// Technically, this is the counter of how may times
// current instance was pooled, but really, this is used
// to check if session of operation (MYOVERLAPPED)
// is == with the session of the socket.
unsigned int GetSession() {...}
// Internally used by this implementation. Don't call it.
void Lock() {...}
// Internally used by this implementation. Don't call it.
void UnLock() {...}
// Associate the object with a socket.
int Associate(SOCKET sSocket, struct sockaddr_in *psForeignAddIn) {...}
// Returns the attachment of the object.
// Use the attachment to associate the socket with any other
// additional information required.
T* GetAttachment() {...}
// Write data to the socket,
// uses overlapped style.
int WriteToSocket(char *pBuffer, DWORD buffSize) {...}
// Read data from the socket,
// uses overlapped style.
int ReadFromSocket(char *pBuffer, DWORD buffSize) {...}
// Closes the socket associated with this instance
// of the object and performs clean-up.
void CloseSocket() {...}
};
下面是一些有关这个类的几个要点:
l 设计这个类仅仅是为了与ServerSocket类一起工作(下面将详细叙述),单独使用时是没什么用的。
l 它封装了重叠I/O的 WinSock API。
l 这是一个通用的模板类可以支持“附件”(查阅GetAttachment()函数)。虽然ClientSocket的基础功能非常的普通,但是,附件允许扩展ClientSocket类的功能(例如:保存套接字的当前状态,包含供读写的数据缓存,等等)。对用作附件的结构体/类的唯一的限制是:必须有一个默认的构造函数和一个Clear()函数(用作执行内部清理)。还有一些其它的要求,将在稍后叙述。
l 这个类的设计可以让ServerSocket重复使用已经创建的实例。如我们稍后所见,ServerSocket包含一个QueuedBlocks<ClientSocket<T> >(ClientSocket必须实现Clear()),这就是为什么套接字句柄不是传给构造函数,而是关联到一个空闲(IsBusy())的实例(通过Associate()函数)。
其次,让我们分析ServerSocket 这个模板类(完整的源代码,请查看附件资源)
// A class for handling basic server socket operations.
template<class T>
class ServerSocket {
private:
// Server socket.
SOCKET m_ServSock;
// Pool of client sockets.
QueuedBlocks<ClientSocket<T> > m_SockPool;
protected:
public:
// Set the size of the socket pool to the maximum number of accepted
// client connections.
ServerSocket(unsigned int nPort, unsigned int nMaxClients,
bool blnCreateAsync = false,
bool blnBindLocal = true): m_SockPool(nMaxClients) {...}
// Returns a pointer to the ClientSocket instance, bind
// to the accepted client socket (incoming connection).
ClientSocket<T>* Accept() {...}
// Release the client socket, will try closing connection and
// will place it back to the pool.
void Release(ClientSocket<T> *sock) {...}
vector<ClientSocket<T> *> *GetPool() {...}
~ServerSocket() {...}
};
下面是一些有关这个类的几个要点:
l 这是一个模板类,原因仅仅是它操作了模板类ClientSocket(重复,重点在“附件”)。
l 它和固定数量的ClientSocket实例一起工作,这就是为什么Accept(),在内部,调用m_SockPool的GetFromQueue()函数(而不仅仅是Get()函数)。事实上,这模拟了服务器在同一时间可以服务的最大的活动连接数。最大活动连接数是通过nMaxClients这个参数传递给构造函数的。
l Accept()函数返回准备使用的从m_SockPool中获取的一个ClientSocket实例。这个实例被关联到已接收到的连接的套接字。Release()函数用来关闭套接字句柄(如果没有关闭),并且将这个ClientSocket实例返回给m_SockPool队列,为了在后面重复使用。
l GetPool()函数覆盖了m_SockPool的GetBlocks()函数。它简单的返回了在队列中所有的ClientSocket实例(全部的,包括哪些没有被套接字句柄的关联的实例)。这个函数在将队列传递给CTimeOutChecker的时候被ServerService调用。稍后详细叙述。
l 这个函数的构造函数,在内部,执行了创建服务端套接句柄所需要的所有操作,绑定套接字到一个指定端口(通过nPort传递给构造函数)和IP地址,并且将这个套接字设置为监听模式(详细请参考代码)。如果这些操作中有任何一个失败,将抛出一个异常。
6、MYOVERLAPPED结构体
OVERLAPPED 对WinSock API 的重叠I/O操作和IOCP来说是一个非常重要结构体。它被IOCP系统对象使用(不仅仅如此,但是,我将跳过一些不需要的细节),用于“跟踪”I/O操作的状态。但是,OVERLAPPED 本身太没用了。这就是为什么要扩展的原因。有一个小窍门,当然(完整的定义在socket_client.h中)。
// Overlapped structure used with this implementation of the IOCP.
// Only the "Overlapped" member is used in context with the
// standard IOCP API, anything else is just to help this particular
// implementation.
typedef struct {
// It is very important "Overlapped" to be the very first
// member of the structure because pointer pointing to
// "this" is == to the address of the first member.
// This makes usage of the MYOVERLAPPED equivalent
// to the OVERLAPPED in context of the standard IOCP API (just a trick).
OVERLAPPED Overlapped;
// A structure that points to the data packet and size of the data
// packet associated with current overlapped operation.
WSABUF DataBuf;
// Operation type associated with current overlapped operation.
IO_SOCK_MODES OperationType;
// Internally used. If nSession != ClientSocket::m_nSession
// that means that socket was closed (recently) and
// operation retrieved from IOCP queue or TaskPool queue
// is not relevant, to the current session of the socket.
volatile unsigned int nSession;
// Method is used by 'QueuedBlocks' from
// "mem_manager.h".
void Clear() {...}
} MYOVERLAPPED;
窍门在哪里?窍门就是OVERLAPPED 结构体必须是MYOVERLAPPED结构体的第一个成员。这样,MYOVERLAPPED的一个实例的内存地址就等于结构体的第一个成员的地址(在这种情况下是OVERLAPPED )。因此,传递( WSASend()/WSARecv() )或者接收( GetQueuedCompletionStatus())一个MYOVERLAPPED的指针(当然,通过正确的转换)是相同的,如果我们使用的是原有的OVERLAPPED 结构。
这仅仅是一个小窍门,有几点益处:
l 关于MYOVERLAPPED,我们总是能通过OperationType类型知道已经成功完成得操作的类型(例如:READ或者WRITE)。因为这样,我冒着风险将一个MYOVERLAPPED实例定义成“操作”(为简单起见)。
l 关于DataBuf成员,我们知道成功完成得操作的数据缓冲区的详细信息(参考ClientSocket的WriteToSocket()和ReadFromSocket() )。
l nSession成员是这篇文章所描述的IOCP框架的另一个窍门。ClientSocket有一个Session( 参考上文第5段 ),并且MYOVERLAPPED也有一个Session。ClientSocket的实例的Session在每次ServerSocket::Release()被调用时递增(这意味着,这个实例将被重用)。当调用WriteToSocket()或者ReadFromSocket()函数时,在内部,它们将当前ClientSocket的Session设置为用于操作的MYOVERLAPPED实例的Session。当通过GetQueuedCompletionStatus()接收到完成操作时,并且如果返回操作的Session不同于ClientSocket实例所添加的Session,则这个操作时无意义的,因为ClientSocket实例已经成功关闭或者,也许,被重用了,当这个操作获得IOCP队列响应。所以,在代码里,你会看到一些这样的检查。
并且,因为MYOVERLAPPED被不同的类共享,以下描述(如第3段所描述的)是很重要的。
// a pool of the MYOVERLAPPED structures
typedef StaticBlocks<MYOVERLAPPED> Overlapped;
QueuedBlocks<MYOVERLAPPED> *Overlapped::blocks = NULL;
7、IOCPSample 类
IOCPSample是这篇文章所描述的架构中的其中一个核心模板类(关于完整的代码,请参考附件资源中iocp.h文件)。
// A template class (wrapper) to handle basic operations
// related to IO Completion Ports (IOCP).
template<class T>
class IOCPSimple {
private:
HANDLE m_hIocp;
DWORD m_dwTimeout;
protected:
// Useful method to queue a socket (handler in fact) in the IOCP's
// queue, without requesting any 'physical' I/O operations.
BOOL PostQueuedCompletionStatus( ClientSocket<T> *sock,
MYOVERLAPPED *pMov ) {...}
public:
~IOCPSimple() {:}
IOCPSimple( DWORD dwTimeout = 0 ) {...}
// Method associates Socket handler with the handler
// of the IOCP. It is important to do this association
// first of all, once Socket is accepted and is valid.
// In fact, this method registers socket with the IOCP,
// so IOCP will monitor every I/O operation happening
// with the socket.
void AssociateSocket( ClientSocket<T> *sock ) {...}
// Queue in the IOCP's queue with the status 'pending'.
// This is a fictive status (not real). Method is useful in
// cases when packets/data buffers to send via socket are yet
// not ready (e.g. due some internal, time consuming, calculations)
// and rather than keeping socket somewhere, in separate structures,
// place it in the queue with status pending. It will be returned
// shortly with the "GetQueuedCompletionStatus" call anyway.
BOOL SetPendingMode( ClientSocket<T> *sock ) {...}
// Queue in the IOCP's queue with the status 'close'.
// This is a fictive status (not real). Useful only
// when close socket cases should be treated/handled
// within the routine, handling I/O completion statuses,
// and just to not spread the code across different
// modules/classes.
BOOL SetCloseMode( ClientSocket<T> *sock,
MYOVERLAPPED *pMov = NULL ) {...}
// Queue in the IOCP's queue with the status 'accept'.
// This is a fictive status (not real). When a new
// client connection is accepted by the server socket,
// method is used to acknowledge the routine, handling
// I/O completion statuses, of this. Same reason as per
// 'close' status, just to have a unique piece of code
// handling all the possible statuses.
BOOL SetAcceptMode( ClientSocket<T> *sock ) {...}
// This method is just a wrapper. For more details,
// see MSDN for "GetQueuedCompletionStatus". The only
// difference is, method is adjusted to handle
// 'ClientSocket<T>' (as registered with 'AssociateSocket(...)')
// rather than pointer to the 'DWORD'.
BOOL GetQueuedCompletionStatus( LPDWORD pdwBytesTransferred,
ClientSocket<T> **sock, MYOVERLAPPED **lpOverlapped ) {...}
};
因此,这个类仅仅是关于IOCP的API函数的简单封装,并且这是一个通用模板类,因为ClientSocket是一个模板类,关于IOCP逻辑的大部分细节,我们已经在第4节讨论过了,但是,仍然有些额外的东西要写。
下面函数第3个参数:
CreateIoCompletionPort( ..., ..., Key, ... );
预定是一个DWORD,在API中它被命名为Key,这个Key在I/O操作完成被返回到下面函数的第3个参数。
GetQueuedCompletionStatus( ..., ..., &Key, ..., ... );
Key可以是任何可以被转换为DWORD类型的类型(当然,使用类型转换)。例如:Key可是一个指向对象的指针。如果我们使用这个指针,作为一个Key,则ClientSocket实例的指针是相对于被执行的那个I/O操作?是的,这使得我们不仅仅知道了完成的操作,而且,ClientSocket的实例是是被应用的那个I/O操作。这里有另一个窍门,是IOCPSimple 的AssociateSocket() 和GetQueuedCompletionStatus()方法。
8、IOCPSample的SendPendingMode方法
我应该写一点关于IOCPSample模板类的SetPendingMode() 方法。它太有用的同时也太危险。考虑一种情况,当你的程序正在从外部资源成块的读数据,并且将它们发送给ClientSocket的一个实例。如果外部资源没有准备好返回下一块数据(因为任何原因),则没有任何数据被发送到ClientSocket,因此,没有任何I/O操作在IOCP中被查询(如第4节所描述的)。所以,ClientSocket的状态是未定义的,并且非常有可能“失去”这样的一个套接字。使用SetPendingMode()方法,而不是实现一个缓冲存储区去“保持这些套接字并且当有数据时再处理它们”。这个方法将套接字推送到IOCP的队列,并且,一小段时间后,这个套接字将会被ISockEvent的OnPending() 事件返回(更多详细信息见下章)。很有可能的是,在那个时候,外部资源的下一个数据块已经准备好被发送了( 如果没有,再次使用OnPending() )。
这是非常有用的,但是,注意SetPendingMode()仅仅是将套接字推送到IOCP的队列中,没有实际注册任何I/O操作。非常有可能的是,当用这种方式推送套接字到IOCP队列中的时候,这个套接字也许被远端关闭了。不幸的是,IOCP将不会处理这类“意外的”关闭直到和这个套接相关联的一个实际的I/O操作被执行。这种情况将被CTimeOutChecker 很好的“过滤”掉(更详细的信息见下一节,所以,不要轻视它),并且或早或晚(根据传递的超时值),这样的套接字将会被“拒绝”,通过ISockEvent的OnClose() 事件,只要超时值大于0(并且CTimeOutChecker 被使用)。提醒自己注意。
9、CTimeOutChecker类
这个模板类在server_server.h文件中被定义,有一个作用是检查每一个活动连接是否超时,当过去的时间,自从上次的I/O操作,大于传递给构造函数的nTimeOutValue时一个超时情况发生。
// A template class designed for verifying client sockets
// against time-out cases. Time-out case = if no I/O actions
// happen with a socket during a configured number of
// seconds. This implies that the attachment of the
// client socket must implement "GetTimeElapsed()"
// and "ResetTime(...)" methods.
template<class T>
class CTimeOutChecker: public IRunnable {
private:
unsigned int m_nTimeOutValue;
vector<ClientSocket<T> *> *m_arrSocketPool;
IOCPSimple<T> *m_hIocp;
protected:
// method checks sockets to detect time-out cases
virtual void run() {...}
public:
// Constructor of the template class. Requires:
// - a pointer to a collection of pointers to client sockets.
// This is the collection of sockets to be checked against
// time-out cases.
// - a pointer to the IO completion port object responsible
// for checking I/O events against passed collection of
// client sockets.
// - value of the time-out, in seconds.
CTimeOutChecker( vector<ClientSocket<T> *> *arrSocketPool,
IOCPSimple<T> *hIocp, unsigned int nTimeOutValue ) {...}
// Nothing to destruct as inside the template class
// we keep/use just pointers obtained from external
// sources.
~CTimeOutChecker() {};
};
这个类被ServerService 模板类(下面描述)使用,并且,同样的,它的run() 方法将被关联到ServerService的线程池 (CSimpleThreadPool)中的一个线程执行,ServerService 也将传递套接字池(关联到ServerService ,通过GetPool()方法)到CTimeOutChecker (的实例)。这样,CTimeOutChecker 知道哪些套接字需要检查。
如果一个超时情况发生,则一个相关的ClientSocket(实例)被传递到IOCP队列,状态是“关闭”。IOCP,早晚会从队列中返回它,并且会将它“传递”给ServerService 。ServerService 将创建一个“事件任务”,并且将它传递给CSimpleThreadPool。“事件任务”,将会被CSimpleThreadPool中的一个线程执行,会“启动”ISockEvent(实现)的OnClose()“事件”,并且会关闭套接字。这就是它如何工作的。
另外,CTimeOutChecker 需要“附件”实现两个额外的方法:GetTimeElapsed() 和ResetTime( bool ).。因此,从技术上来说,CTimeOutChecker 的行为被下面两个方法的实现所控制:
l 如果一个特定的ClientSocket(的附件)的GetTimeElapsed()返回0,则超时情况对ClientSocket不适用,这是一种使指定套接字超时检查无效的方法。
l 尽管ResetTime( bool )的实现可以是任意的(根据开发者而定,必须依据特征)。实际需求是:
Ø 调用ResetTime( true )必须强制后面的GetTimeElapsed()调用返回0。这使得在某一时刻(当需要时)对一个特定套接字的超时检测无效。例如:当写一个Web(HTTP)服务时,当从ClientSocket中等待完成HTTP请求时,有必要开启超时检查。一旦收到完成HTTP请求,有必要关闭相关ClientSocket的超时检查。
Ø 调用ResetTime( false )必须强制后面的GetTimeElapsed()调用返回自动上次调用ResetTime( true )后实际已过去的时间,单位是秒。也就是说,它应该记录当前时间,并且GetTimeElapsed()应该返回当前时间到所记录的时间之间的间隔,以秒为单位。它是由开发者调用ResetTime( false ),当ClientSocket::WriteToSocket()或者ClientSocket::ReadFromSocket()成功完成时(根据服务器的开发策略)。
CTimeOutChecker 的有用之处在哪呢?仅仅是关闭那些经过一段(配置的)时间“没有响应”的套接字,这样可以为其它即将进来的连接释放一些空间(注意ServerSocker 类的nMaxClient参数)。
但是,如果这个逻辑不需要,有一种方式可以使CTimeOutChecker (完全的)无效。仅仅是将ServerService 构造函数的timeout参数设置为0。
10、ISockEvent和ServerService类
最后,让我开始这个结构的核心。如果你没能记住以上所描述的全部细节也是可以的,因为它们全部被封装进ServerService 模板类的逻辑(原因是相同的,“附件”元素),可是,记住第8和第9节是重要的。所有的类在server_service.h文件中被定义。让我们从ServerService 开始:
// Well, finally, here is the template class joining all
// the stuff together. Considering the Aspect Oriented paradigm,
// this template class may be seen as an Aspect. The "individualizations"
// of this aspect are "ISockEvent<T>" and "T" itself. "T" is nothing
// else but attachment of the client socket (see ClientSocket<T> template
// class for more details). Implementing "ISockEvent<T>" and "T" will
// define the individual behaviour of this aspect.
// It is a composition of the IOCP, server socket, time-out checker
// and thread pool. Class implements the business logic that makes all
// these entities working together.
template<class T>
class ServerService: public IRunnable {
private:
ServerSocket<T> m_ServerSocket;
IOCPSimple<T> m_hIocp;
ISockEvent<T> *m_pSEvent;
CTimeOutChecker<T> *m_TChecker;
// thread pool which will execute the tasks
CSimpleThreadPool *m_ThPool;
// a pool of the CSockEventTask<T> objects
QueuedBlocks<CSockEventTask<T> > m_SockEventTaskPool;
protected:
// This method will be executed by a thread
// of the task pool.
virtual void run() {...}
public:
// Constructor or the class.
// pSEvent - pointer to an instance implementing
// ISockEvent<T>. This instance will be used
// as a client socket event handler.
// nPort - port number to bind server socket to.
// nMaxClients - the maximum number of accepted (concurrent)
// client connections. To be passed to
// the server socket and also will be used
// as the initial size for the pool of the
// CSockEventTask<T> objects.
// nNoThreads - indicative (the real one is computed,
// see below) number of the threads
// to be created by the thread pool.
// timeout - the value of the time-out, in seconds.
// Will be passed to the time-out checker.
// If time-out is zero, time-out checker
// will not be created.
// blnBindLocal - see ServerSocket<T> for more details.
// If "true" then server socket is bind
// to localhost ("127.0.0.1").
// If "false" then server socket is bind
// to INADDR_ANY ("0.0.0.0").
ServerService( ISockEvent<T> *pSEvent, unsigned int nPort,
unsigned int nMaxClients, unsigned int nNoThreads,
unsigned int timeout, bool blnBindLocal = true ):
m_ServerSocket( nPort, nMaxClients, true, blnBindLocal ),
m_hIocp( 200 ), m_SockEventTaskPool( nMaxClients )
{...}
virtual ~ServerService() {...}
// Start the threat pool (== all the
// threads in the pool).
void start() {...}
};
这就是ServerService ,没有任何窍门。关于代码的注释应该足够强调响应的逻辑了。所以,我将会集中提供一点关于ISockEvent 模板类的详细信息(实际上,模板接口),该模板类作为一个参数被传递给ServerService 构造函数。
ISockEvent 是一个包含纯虚函数的模板接口:
// A template interface showing how the client socket event
// handler should look like.
template<class T>
class ISockEvent {
public:
// Client socket ("pSocket") is about to be closed.
virtual void OnClose( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
// Client socket ("pSocket") was just accepted by
// the server socket (new connection with a client
// is created).
virtual void OnAccept( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
// Client socket ("pSocket") was returned from the IOCP
// queue with status _PENDING. For more details see
// "IOCPSimple<T>::SetPendingMode(...)". This method
// is invoked only if there was a call to
// "IOCPSimple<T>::SetPendingMode(...)", performed by a
// user code, internally "SetPendingMode(...)"
// is never called.
virtual void OnPending( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
// Client socket ("pSocket") was returned from IOCP
// queue with status _READ. This means that overlapped
// reading operation, started previously with
// "ClientSocket<T>::ReadFromSocket(...)", was
// successfully finished.
// - "pOverlap->DataBuf" structure points to the data
// buffer and buffer's size that were passed to the
// "ClientSocket<T>::ReadFromSocket(...)".
// - "dwBytesTransferred" will indicate how many
// bytes were read.
virtual void OnReadFinalized( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
DWORD dwBytesTransferred,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
// Client socket ("pSocket") was returned from IOCP
// queue with status _WRITE. This means that overlapped
// writting operation, started previously with
// "ClientSocket<T>::WriteToSocket(...)", was
// successfully finished.
// - "pOverlap->DataBuf" structure points to the data
// buffer and buffer's size that were passed to the
// "ClientSocket<T>::WriteToSocket(...)".
// - "dwBytesTransferred" will indicate how many
// bytes were written.
virtual void OnWriteFinalized( ClientSocket<T> *pSocket,
MYOVERLAPPED *pOverlap,
DWORD dwBytesTransferred,
ServerSocket<T> *pServerSocket,
IOCPSimple<T> *pHIocp
) = 0;
};
这个接口的实现(当然还是有“附件”)将定义服务器程序。没有其它的需要被做的...真的!ISockEvent包含了用于跟踪和处理套接字的所有可能的状态所需要的全部方法。让我们来看看:
l 当一个新的连接被ServerSocket接收,ServerSevice将确保OnAccept()事件被调用。它将由开发决定如何处理这个事件。典型的,通过这个事件,你可能发起pSocket->ReadFromSocket()或者pSocket->WriteToSocket(),基于服务器开发的策略(服务器希望客户端发起会话或者反之亦然)。
l 如果一个套接字将要被关闭(远端关闭连接;CTimeOutChecker 发起这个操作,或者你在一个事件的代码中调用了pHIocp->SetCloseMode(pSocket)),然后OnClose()事件被调用。使用这个时间去纸箱任何需要的清理。如果在你没有或者忘记在OnClose()中关闭套接字也是可以的,因为在这个事件完成后,这个套接无论如何是会被关闭的。
l 如果,在一个事件的代码内,pHIocp->SetPendingMode(pSocket)被调用,是的,按上述第8节。确保你使用OnPending()处理了它。
l 如果,在一个事件的代码内(除了OnClose()),pSocket->ReadFromSocket()被调用(并且返回值不是一个错误值),OnReadFinalized()事件将会被调用,一旦“读”操作完成,但是不要指望传递的缓存会被完全填充到缓存的大小。避免在这期间(从ReadFromSocket() 到 OnReadFinalized())使用已传递的缓冲区。这是从IOCP API继承下来的要求,因此,我们在此不能做任何事情。
l 如果,在一个事件的代码内(除了OnClose()),pSocket->WriteToSocket()被调用(并且返回值不是一个错误值),OnWriteFinalized()事件将会被调用,一旦“写”操作完成。这意味着全部的缓存已经被发出去了。避免在这期间(从WriteToSocket() 到 OnWriteFinalized())使用已传递的缓冲区,原因如上。
最后,你不需要创建额外的线程,因为每个事件将被关联到ServerService的CSimpleThreadPool中的一个线程调用。如果你需要更多的线程,传递恰当的值给ServerService 的构造函数的nNoThreads 参数。并且,是的,确保每个事件的实现或早或晚的退出。我的意思是,努力避免关于事件的无限循环;否则,停止运行中停止ServerService 会有问题(尽管,如果在循环中检查CThread::currentThread().isInterrupted()仍然存在问题)
最后请那些计划在多核CPU上写服务器的人注意:如果环境中有N个CPU,则恰好CSimpleThreadPool中有N个线程执行ServerService::run()函数,为了获得更好的性能(如MSDN所推荐的那样)。确保你不要在一个事件内多次调用pSocket->ReadFromSocket() 或者 pSocket->WriteToSocket() (尽量设计程序使其只有正确的,没有错误的,祈祷吧)。这并不是说这个架构不能处理重复调用的情况。问题是,虽然发送/接受请求的顺序是保证能够符合调用顺序的(在IOCP队列中),但是报告操作完成的顺序不能够保证符合调用顺序,因为GetQueuedCompletionStatus()会被多线程调用。但是如果你不关心顺序是否正确,可以忽略此点。
11、一个简单模拟服务器的实现
作为一个“技术性验证测试”,让我们实现一个简单的模拟服务器,这个服务器从套接上读取7个字符,并且,一旦缓冲区被填充,将它们发送回去。完成得代码在资源中。
A.首先,让我们定义服务器的参数和缓冲区的大小。
#define BUFF_SIZE 8
#define MAX_CONNECTIONS 10
#define NO_THREADS 4
#define TIME_OUT 10
#define PORT 8080
B.现在,让我们定义“附件”
struct Attachment {
volatile time_t tmLastActionTime;
char sString<BUFF_SIZE>;
DWORD dwStringSize; // current string's size
Attachment() { Clear(); };
bool Commit( DWORD dwBytesTransferred ) {
DWORD dwSize = dwStringSize + dwBytesTransferred;
if ( dwBytesTransferred <= 0 ) return false;
if ( dwSize >= BUFF_SIZE ) return false;
dwStringSize = dwSize;
sString[dwStringSize] = 0;
return true;
};
// as requested by the API of the framework
void Clear() { memset(this, 0, sizeof(Attachment) ); };
// as requested by the API of the framework
void ResetTime( bool toZero ) {
if (toZero) tmLastActionTime = 0;
else {
time_t lLastActionTime;
time(&lLastActionTime);
tmLastActionTime = lLastActionTime;
}
};
// as requested by the API of the framework
long GetTimeElapsed() {
time_t tmCurrentTime;
if (0 == tmLastActionTime) return 0;
time(&tmCurrentTime);
return (tmCurrentTime - tmLastActionTime);
};
};
C.现在,我们必须实例化模板类
typedef ClientSocket<Attachment> MyCSocket;
typedef ServerSocket<Attachment> MySSocket;
typedef IOCPSimple<Attachment> MyIOCPSimple;
typedef ISockEvent<Attachment> MyISockEvent;
typedef ServerService<Attachment> MyServerService;
D.现在,我们实现ISockEvent<Attachment>:
class MyISockEventHandler: public MyISockEvent {
public:
MyISockEventHandler() {};
~MyISockEventHandler() {};
// empty method, not used
virtual void OnClose( MyCSocket *pSocket, MYOVERLAPPED *pOverlap,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {};
// empty method, not used
virtual void OnPending( MyCSocket *pSocket, MYOVERLAPPED *pOverlap,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {};
virtual void OnAccept( MyCSocket *pSocket, MYOVERLAPPED *pOverlap,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {
int nRet;
DWORD dwSize;
char *temp;
dwSize = BUFF_SIZE - 1;
temp = pSocket->GetAttachment()->sString;
// initiate the reading with OnAccept
nRet = pSocket->ReadFromSocket( temp, dwSize );
pSocket->GetAttachment()->ResetTime( false );
if ( nRet == SOCKET_ERROR ) {
pServerSocket->Release( pSocket );
}
};
virtual void OnReadFinalized( MyCSocket *pSocket,
MYOVERLAPPED *pOverlap, DWORD dwBytesTransferred,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {
int nRet;
DWORD dwSize, dwPos;
char *temp;
// finalize the filling of the buffer
pSocket->GetAttachment()->Commit( dwBytesTransferred );
dwSize = BUFF_SIZE - 1;
dwPos = pSocket->GetAttachment()->dwStringSize;
temp = pSocket->GetAttachment()->sString;
nRet = pSocket->ReadFromSocket( temp + dwPos, dwSize - dwPos );
pSocket->GetAttachment()->ResetTime( false );
if ( nRet == SOCKET_ERROR ) {
pServerSocket->Release( pSocket );
}
else if ( nRet == RECV_BUFFER_EMPTY ) {
// means that dwSize - dwPos == 0, so send the data
// back to the socket
nRet = pSocket->WriteToSocket( temp, dwSize );
if ( nRet == SOCKET_ERROR ) {
pServerSocket->Release( pSocket );
}
}
};
virtual void OnWriteFinalized( MyCSocket *pSocket,
MYOVERLAPPED *pOverlap, DWORD dwBytesTransferred,
MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {
// clean the attachment
pSocket->GetAttachment()->Clear();
// and once again
OnAccept(pSocket, NULL,pServerSocket, NULL);
};
};
E. 并且,最后
int main(int argc, char* argv[])
{
int nRet;
MyServerService *sService;
MyISockEventHandler *mSockHndl;
WSAData wsData;
nRet = WSAStartup(MAKEWORD(2,2),&wsData);
if ( nRet < 0 ) {
Log::LogMessage(L"Can't load winsock.dll./n");
return -1;
}
try {
Overlapped::Init( MAX_CONNECTIONS );
mSockHndl = new MyISockEventHandler();
sService = new MyServerService((MyISockEvent *) mSockHndl,
PORT, MAX_CONNECTIONS, NO_THREADS, TIME_OUT, false);
sService->start();
printf("hit <enter> to stop .../n");
while( !_kbhit() ) ::Sleep(100);
delete sService;
delete mSockHndl;
}
catch (const char *err) {
printf("%s/n", err);
}
catch (const wchar_t *err) {
wprintf(L"%ls/n", err);
}
WSACleanup();
return 0;
}
(本文从本人的另一个CSDN账户上搬移过来)