可伸缩的IO完成端口服务器模型(IOCP)

时间:2022-03-05 18:18:04

可伸缩的IO完成端口服务器模型

来源:CodeProject

翻译:Kevin Chen

Email:chcucl@sina.com

URL:http://www.codeproject.com/KB/IP/IOCP_how_to_cook.aspx

备注:此文是本人首次翻译,如果不精确或者难以理解的地方,请查阅原文章。

1、简介:

该文主要介绍三个主题:线程管理,内存管理,和客户端套接字处理结构。这里使用TCP套接字。

2、线程

通常,当开发服务器程序时,线程模型的使用可以被分为以下两种:

容易实现的,一个(服务)线程对应一个客户端连接/套接字

复杂一点的,使用线程池(如:固定大小线程池,基于超时的线程池,可变大小的线程池,等等)。

而第一个模型通常是最容易实现的一个,但是当服务于大量连接的时候不是一个最好的选择,有两点原因:

上下文切换,它是OS对相同或者不同进程的两个正在运行的进程或线程进行切换的处理,包括保存和载入运行线程/进程的上下文,在这种方式下好像每个个线程继续执行,但事实上,它们也许共享一个CPU。显然,上下文切换需要时间,并且这个时间基于操作系统的实现。在Windows操作系统中,切换大概需要十亿分之一秒。而在一些旧版本/分布式LINUX系统中,切换大概需要2毫秒,所以,假如创建了1000个线程,并且所有线程有相同的优先级,则有2毫秒乘以1000,约2秒耗费在上下文切换上,这是不可接受的。显然,需要一种不同的模型

通常,线程在“一个线程对应一个连接”的模型中,线程大部分时间什么也不做。比如:当执行套接字的阻塞I/O操作时线程会被挂起。创建大量的线程不过是浪费资源。

文章中所描述的IOCP框架所使用的线程模型是CSimpleThreadPool类。这个线程框架并没有什么特别的,我会提供一些我使用这种模型的原因:

它非常轻巧,非常有用而且容易使用。

CSimpleThreadPool是一个固定大小的线程池。预先创建一定数量的线程将有助于减少运行时用于创建线程的时间,因为它们已经被创建。因为创建线程是需要时间的(操作系统必须创建一个系统对象,创建一个线程栈,等等)。所以这将获得更好的性能。

主要原因是,CSimpleThreadPool是一个包含一个优先级队列的线程池。下面详细描述其细节,IOCP队列不会对完成请求按优先级进行排序。所有的完成请求拥有相同的优先级。所以,可以写一个简单的程序,建立数千个连接,然后一下子全关闭它们,重复创建和关闭...IOCP将在队列中记录大量的关闭请求,事实上,服务端会仅忙于处理关闭请求。这是一个拒绝服务攻击列子。IOCP没有使用优先级队列对完成请求进行排序是很糟糕的,但是,我猜想也许有什么技术原因导致没有使用优先级队列。然而,来自IOCP的完成请求会被传递到CSimpleThreadPool的队列,并且它们会被按照优先级排序。因此,关闭连接请求会有比其他完成请求更低的优先级。这是一种相对于前文所描述的攻击的保护策略

3、内存管理

和开发服务器程序相关的另一个问题是正确的内存管理,尽管非常熟悉的操作如newdelete(类似的,mallocfree,或者其它的低等级的WinSDK API)用起来非常繁琐,但问题是,当谈到这个程序运行一天24小时,每周7天,每年365天。

问题被称为内存碎片。这些操作(API)被设计为可以分配连续的内存块,但是如果程序频繁的执行new/delete,并且内存块大小不一,程序将以new返回NULL而结束(也许会时常发生),尽管程序在堆上有足够的空间。这种情况也许会经常发生,因为没有满足请求大小的连续的空余内存空间,换句话说,我们有内存碎片,或早或晚,服务器将失败。

有几种方案应对内存碎片问题:

实现垃圾回收,执行碎片整理。

自定义内存管理旨在防止潜在的内存碎片,对new/delete的重载,仅仅是为了是最终解决方案优雅而已。

使用newmalloc)操作分配一个指定大小的内存块,不过,内存仍然会有碎片,但是碎片是固定大小(或者是一个系数乘以固定大小)。这是一种非常容易实现的机制。然而,如果实际内存是从1KB1MB,分配1MB的内存块,即使只需要1KB。这只不过是浪费内存。

预分配一块内存区域,在这种情况下,当可以估算服务一个连接所需要的内存大小,用单个连接所需内存大小值乘以允许的最大连接数(配置参数)。通过所获取的值,分配一块指定大小的连续内存并且在这块内存区域中执行操作。这是另一种非常容易使用机制,并且工作良好,前提是能够估算内存大小。

重用所使用的内存块。这种构思非常简单,一旦分配了一个内存块,当它不用时不要删除它,仅仅标记为未使用并且在下一次重用它。这也许看起来像内存泄露,但事实上,这仅仅是处理内存碎片的一种策略。

下面一个模板类,运用了以上所描述的“预分配内存区域”和“重用内存块”机制。

// A template class allowing re-using of the memory blocks.

template<class T>

class QueuedBlocks {

private:

    QMutex       m_qMutex;

    set< T* >    m_quBlocks;

    vector< T* > m_allBlocks;

public:

    QueuedBlocks(int nInitSize = 1): 

        m_qMutex(), m_quBlocks(), m_allBlocks() {...};

    // get a free block from the queue, if one cannot be found

    // then NULL is returned

    T* GetFromQueue() {...};

    // get a free block from the queue, if one cannot be found

    // then a new one is created

    T* Get() {...};

    // Release the used block, place it

    // back to the queue. For performance reason,

    // we assume that the block was previously taken

    // from the queue.

    void Release(T* t) {...};

    // Return all the blocks ever allocated.

    vector<T*> *GetBlocks() {...};

    ~QueuedBlocks() {...};

};

这样,通过构造函数,我们预分配了一些需要的对象,并且:

如果仅仅需要“预分配内存区域”机制,仅仅使用GetFromQueue()Release()函数,这样它将会“在已分配内存区域中”工作

如果需要“重用内存块”机制(同“预分配内存区域机制一起使用”),使用Get()Release()函数,如果预分配的空间不够用,它将扩展预分配内存区域。

因为这是一个通用模板,它允许自定义一个不同类/结构的队列。这种实现的问题是,每个特定的类必须有一个默认的构造函数和一个 Clear()  函数。 Clear() 函数目的是在内存块将要被重用前执行内部清理,

另一个有用的模板类是:

// A template class used for providing free blocks as well

// as releasing unnecessary ones. Uses "QueuedBlocks" which

// allows reusing of the blocks.

template<class T>

class StaticBlocks {

private:

    static QueuedBlocks<T> *blocks;

public:

    static void Init(int nSize = 1) {...};

    static T *Get() {...};

    static void Release(T *b) {...};

};

它代表QueuedBlocks的一个适配器,但是所有函数都被定义为static,当需要在不同类*享内存块时这将非常有用。

4、关于IOCP的几个事实

如果你正在读这篇文章,你也许知道使用IOCP的优势。如果你不知道,那么...IOCP是开发可伸缩的服务器程序最好的模型,它能仅使用少量线程处理数千个连接。这是如何实现的?通过使用一种称为“I/O完成端口”的系统对象,该系统对象使一个程序接收异步I/O的完成通知。所以,任何I/O操作都是异步(立即完成),程序(或线程)在I/O操作完成时收到通知,因此,程序(或者程序中的线程)有足够的时间去处理任何其它的操作(而不是等待I/O操作,这通常发生在“一个线程服务一个连接/套接字”模型上)。让我们概述下我们所知道的关于IOCP的事实。

首先,为了使套接字受益于IOCP,必须使用WinSock  API的重叠I/O版本。

WSASend(...);

WSARecv(...);

第二点,一个套接字必须被设置为支持重叠API

nSock = WSASocket(..., ..., ..., ..., ..., WSA_FLAG_OVERLAPPED);

同时,我们也需要创建一个IOCP句柄。

hIocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 00 );

然后,我们需要将这个套接字关联到IOCP上。

CreateIoCompletionPort( (HANDLE) nSock, hIocp, ..., ... );

现在,当在套接字上使用重叠I/O操作时,I/O操作必须被注册到IOCP的队列里。这个注册项仅仅在执行I/O操作时发生,否则,这个注册项不会被执行。尽管这个套接字已经和IOCP关联。当一个重叠I/O操作的完成时,将通过下面的调用从IOCP队列中返回(事实上,仅仅是一些操作的详细信息,但是,它有些技巧,后面将详细介绍):

GetQueuedCompletionStatus( hIocp, &dwBytesTransferred, ..., lpOverlapped, dwTimeout );

至此,我们知道一个套接字的I/O操作已经成功完成。根据这些事实,让我们开始一些支持这些机制的类的实现。

5、Client/Server 套接字

首先,让我们分析ClientSocket 这个模板类(完整的源代码,请查看附件资源)

// A class wrapping basic client socket's operations

template<class T>

class ClientSocket {

private:

    SOCKET                 m_ClientSock;

    volatile unsigned int  m_nSession;

    QMutex                 m_qMutex;

    struct sockaddr_in     m_psForeignAddIn;

    volatile bool          m_blnIsBusy;

    T                      m_objAttachment;

protected:

public:

    ClientSocket(): m_objAttachment(), m_qMutex() {...}

    ~ClientSocket() {...}

    // Is the object assigned a socket.

    bool IsBusy() {...};

    // Method is used by 'QueuedBlocks' from

    // "mem_manager.h".

    void Clear() {...}

    // Returns the socket associated with the object.

    SOCKET GetSocket() {...}

    // Returns the session of the current client socket.

    // Technically, this is the counter of how may times

    // current instance was pooled, but really, this is used

    // to check if session of operation (MYOVERLAPPED)

    // is == with the session of the socket.

    unsigned int GetSession() {...}

    // Internally used by this implementation. Don't call it.

    void Lock() {...}

    // Internally used by this implementation. Don't call it.

    void UnLock() {...}

    // Associate the object with a socket.

    int Associate(SOCKET sSocket, struct sockaddr_in *psForeignAddIn) {...}

    // Returns the attachment of the object.

    // Use the attachment to associate the socket with any other

    // additional information required.

    T* GetAttachment() {...}

    // Write data to the socket,

    // uses overlapped style.

    int WriteToSocket(char *pBuffer, DWORD buffSize) {...}

    // Read data from the socket,

    // uses overlapped style.

    int ReadFromSocket(char *pBuffer, DWORD buffSize) {...}

    // Closes the socket associated with this instance

    // of the object and performs clean-up.

    void CloseSocket() {...}

};

下面是一些有关这个类的几个要点:

设计这个类仅仅是为了与ServerSocket类一起工作(下面将详细叙述),单独使用时是没什么用的。

它封装了重叠I/O的 WinSock  API

这是一个通用的模板类可以支持“附件”(查阅GetAttachment()函数)。虽然ClientSocket的基础功能非常的普通,但是,附件允许扩展ClientSocket类的功能(例如:保存套接字的当前状态,包含供读写的数据缓存,等等)。对用作附件的结构体/类的唯一的限制是:必须有一个默认的构造函数和一个Clear()函数(用作执行内部清理)。还有一些其它的要求,将在稍后叙述。

这个类的设计可以让ServerSocket重复使用已经创建的实例。如我们稍后所见,ServerSocket包含一个QueuedBlocks<ClientSocket<T> >ClientSocket必须实现Clear()),这就是为什么套接字句柄不是传给构造函数,而是关联到一个空闲(IsBusy())的实例(通过Associate()函数)。

其次,让我们分析ServerSocket 这个模板类(完整的源代码,请查看附件资源)

// A class for handling basic server socket operations.

template<class T>

class ServerSocket {

private:

    // Server socket.

    SOCKET    m_ServSock;

    // Pool of client sockets.

    QueuedBlocks<ClientSocket<T> > m_SockPool;

protected:

public:

    // Set the size of the socket pool to the maximum number of accepted

    // client connections.

    ServerSocket(unsigned int nPort, unsigned int nMaxClients,

              bool blnCreateAsync = false

              bool blnBindLocal = true): m_SockPool(nMaxClients) {...}

    // Returns a pointer to the ClientSocket instance, bind

    // to the accepted client socket (incoming connection).

    ClientSocket<T>* Accept() {...}

    // Release the client socket, will try closing connection and

    // will place it back to the pool.

    void Release(ClientSocket<T> *sock) {...}

    vector<ClientSocket<T> *> *GetPool() {...}

    ~ServerSocket() {...}

};

下面是一些有关这个类的几个要点:

这是一个模板类,原因仅仅是它操作了模板类ClientSocket(重复,重点在“附件”)。

它和固定数量的ClientSocket实例一起工作,这就是为什么Accept(),在内部,调用m_SockPoolGetFromQueue()函数(而不仅仅是Get()函数)。事实上,这模拟了服务器在同一时间可以服务的最大的活动连接数。最大活动连接数是通过nMaxClients这个参数传递给构造函数的。

Accept()函数返回准备使用的从m_SockPool中获取的一个ClientSocket实例。这个实例被关联到已接收到的连接的套接字Release()函数用来关闭套接字句柄(如果没有关闭),并且将这个ClientSocket实例返回给m_SockPool队列,为了在后面重复使用。

GetPool()函数覆盖了m_SockPoolGetBlocks()函数。它简单的返回了在队列中所有的ClientSocket实例(全部的,包括哪些没有被套接字句柄的关联的实例)。这个函数在将队列传递给CTimeOutChecker的时候被ServerService调用。稍后详细叙述。

这个函数的构造函数,在内部,执行了创建服务端套接句柄所需要的所有操作,绑定套接字到一个指定端口(通过nPort传递给构造函数)和IP地址,并且将这个套接字设置为监听模式(详细请参考代码)。如果这些操作中有任何一个失败,将抛出一个异常。

6、MYOVERLAPPED结构体

OVERLAPPED WinSock API 的重叠I/O操作和IOCP来说是一个非常重要结构体。它被IOCP系统对象使用(不仅仅如此,但是,我将跳过一些不需要的细节),用于“跟踪”I/O操作的状态。但是,OVERLAPPED 本身太没用了。这就是为什么要扩展的原因。有一个小窍门,当然(完整的定义在socket_client.h中)。

// Overlapped structure used with this implementation of the IOCP.

// Only the "Overlapped" member is used in context with the

// standard IOCP API, anything else is just to help this particular

// implementation.

typedef struct {

    // It is very important "Overlapped" to be the very first

    // member of the structure because pointer pointing to

    // "this" is == to the address of the first member.

    // This makes usage of the MYOVERLAPPED equivalent

    // to the OVERLAPPED in context of the standard IOCP API (just a trick).

    OVERLAPPED     Overlapped;

    // A structure that points to the data packet and size of the data

    // packet associated with current overlapped operation.

    WSABUF         DataBuf;

    // Operation type associated with current overlapped operation.

    IO_SOCK_MODES  OperationType;

    // Internally used. If nSession != ClientSocket::m_nSession

    // that means that socket was closed (recently) and

    // operation retrieved from IOCP queue or TaskPool queue

    // is not relevant, to the current session of the socket.

    volatile unsigned int    nSession;

    // Method is used by 'QueuedBlocks' from

    // "mem_manager.h".

    void Clear() {...}

MYOVERLAPPED;

窍门在哪里?窍门就是OVERLAPPED 结构体必须是MYOVERLAPPED结构体的第一个成员。这样,MYOVERLAPPED的一个实例的内存地址就等于结构体的第一个成员的地址(在这种情况下是OVERLAPPED )。因此,传递( WSASend()/WSARecv() )或者接收( GetQueuedCompletionStatus())一个MYOVERLAPPED的指针(当然,通过正确的转换)是相同的,如果我们使用的是原有的OVERLAPPED 结构。

这仅仅是一个小窍门,有几点益处:

关于MYOVERLAPPED,我们总是能通过OperationType类型知道已经成功完成得操作的类型(例如:READ或者WRITE)。因为这样,我冒着风险将一个MYOVERLAPPED实例定义成“操作”(为简单起见)。

关于DataBuf成员,我们知道成功完成得操作的数据缓冲区的详细信息(参考ClientSocketWriteToSocket()ReadFromSocket() )。

nSession成员是这篇文章所描述的IOCP框架的另一个窍门。ClientSocket有一个Session( 参考上文第5段 ),并且MYOVERLAPPED也有一个SessionClientSocket的实例的Session在每次ServerSocket::Release()被调用时递增(这意味着,这个实例将被重用)。当调用WriteToSocket()或者ReadFromSocket()函数时,在内部,它们将当前ClientSocketSession设置为用于操作的MYOVERLAPPED实例的Session。当通过GetQueuedCompletionStatus()接收到完成操作时,并且如果返回操作的Session不同于ClientSocket实例所添加的Session,则这个操作时无意义的,因为ClientSocket实例已经成功关闭或者,也许,被重用了,当这个操作获得IOCP队列响应。所以,在代码里,你会看到一些这样的检查。

并且,因为MYOVERLAPPED被不同的类共享,以下描述(如第3段所描述的)是很重要的。

// a pool of the MYOVERLAPPED structures

typedef StaticBlocks<MYOVERLAPPED> Overlapped;

QueuedBlocks<MYOVERLAPPED> *Overlapped::blocks = NULL;

7、IOCPSample 

IOCPSample是这篇文章所描述的架构中的其中一个核心模板类(关于完整的代码,请参考附件资源中iocp.h文件)。

// A template class (wrapper) to handle basic operations

// related to IO Completion Ports (IOCP).

template<class T>

class IOCPSimple {

private:

    HANDLE  m_hIocp;

    DWORD   m_dwTimeout;

protected:

    // Useful method to queue a socket (handler in fact) in the IOCP's

    // queue, without requesting any 'physical' I/O operations.

    BOOL PostQueuedCompletionStatus( ClientSocket<T> *sock, 

             MYOVERLAPPED *pMov ) {...}

public:

    ~IOCPSimple() {:}

    IOCPSimple( DWORD dwTimeout = 0 ) {...}

    // Method associates Socket handler with the handler

    // of the IOCP. It is important to do this association

    // first of all, once Socket is accepted and is valid.

    // In fact, this method registers socket with the IOCP,

    // so IOCP will monitor every I/O operation happening

    // with the socket.

    void AssociateSocket( ClientSocket<T> *sock ) {...}

    // Queue in the IOCP's queue with the status 'pending'.

    // This is a fictive status (not real). Method is useful in

    // cases when packets/data buffers to send via socket are yet

    // not ready (e.g. due some internal, time consuming, calculations)

    // and rather than keeping socket somewhere, in separate structures,

    // place it in the queue with status pending. It will be returned

    // shortly with the "GetQueuedCompletionStatus" call anyway.

    BOOL SetPendingMode( ClientSocket<T> *sock ) {...}

    // Queue in the IOCP's queue with the status 'close'.

    // This is a fictive status (not real). Useful only

    // when close socket cases should be treated/handled

    // within the routine, handling I/O completion statuses,

    // and just to not spread the code across different

    // modules/classes.

    BOOL SetCloseMode( ClientSocket<T> *sock, 

             MYOVERLAPPED *pMov = NULL ) {...}

    // Queue in the IOCP's queue with the status 'accept'.

    // This is a fictive status (not real). When a new

    // client connection is accepted by the server socket,

    // method is used to acknowledge the routine, handling

    // I/O completion statuses, of this. Same reason as per

    // 'close' status, just to have a unique piece of code

    // handling all the possible statuses.

    BOOL SetAcceptMode( ClientSocket<T> *sock ) {...}

    // This method is just a wrapper. For more details,

    // see MSDN for "GetQueuedCompletionStatus". The only

    // difference is, method is adjusted to handle

    // 'ClientSocket<T>' (as registered with 'AssociateSocket(...)')

    // rather than pointer to the 'DWORD'.

    BOOL GetQueuedCompletionStatus( LPDWORD pdwBytesTransferred,

        ClientSocket<T> **sock, MYOVERLAPPED **lpOverlapped ) {...}

};

因此,这个类仅仅是关于IOCPAPI函数的简单封装,并且这是一个通用模板类,因为ClientSocket是一个模板类,关于IOCP逻辑的大部分细节,我们已经在第4节讨论过了,但是,仍然有些额外的东西要写。

下面函数第3个参数:

CreateIoCompletionPort( ..., ..., Key, ... );

预定是一个DWORD,在API中它被命名为Key,这个KeyI/O操作完成被返回到下面函数的第3个参数。

GetQueuedCompletionStatus( ..., ..., &Key, ..., ... );

Key可以是任何可以被转换为DWORD类型的类型(当然,使用类型转换)。例如:Key可是一个指向对象的指针。如果我们使用这个指针,作为一个Key,则ClientSocket实例的指针是相对于被执行的那个I/O操作?是的,这使得我们不仅仅知道了完成的操作,而且,ClientSocket的实例是是被应用的那个I/O操作。这里有另一个窍门,是IOCPSimple AssociateSocket() GetQueuedCompletionStatus()方法。

8、IOCPSampleSendPendingMode方法

我应该写一点关于IOCPSample模板类的SetPendingMode() 方法。它太有用的同时也太危险。考虑一种情况,当你的程序正在从外部资源成块的读数据,并且将它们发送给ClientSocket的一个实例。如果外部资源没有准备好返回下一块数据(因为任何原因),则没有任何数据被发送到ClientSocket,因此,没有任何I/O操作在IOCP中被查询(如第4节所描述的)。所以,ClientSocket的状态是未定义的,并且非常有可能“失去”这样的一个套接字。使用SetPendingMode()方法,而不是实现一个缓冲存储区去“保持这些套接字并且当有数据时再处理它们”。这个方法将套接字推送到IOCP的队列,并且,一小段时间后,这个套接字将会被ISockEventOnPending() 事件返回(更多详细信息见下章)。很有可能的是,在那个时候,外部资源的下一个数据块已经准备好被发送了( 如果没有,再次使用OnPending() )。

这是非常有用的,但是,注意SetPendingMode()仅仅是将套接字推送到IOCP的队列中,没有实际注册任何I/O操作。非常有可能的是,当用这种方式推送套接字到IOCP队列中的时候,这个套接字也许被远端关闭了。不幸的是,IOCP将不会处理这类“意外的”关闭直到和这个套接相关联的一个实际的I/O操作被执行。这种情况将被CTimeOutChecker 很好的“过滤”掉(更详细的信息见下一节,所以,不要轻视它),并且或早或晚(根据传递的超时值),这样的套接字将会被“拒绝”,通过ISockEventOnClose() 事件,只要超时值大于0(并且CTimeOutChecker 被使用)。提醒自己注意。

9、CTimeOutChecker

这个模板类在server_server.h文件中被定义,有一个作用是检查每一个活动连接是否超时,当过去的时间,自从上次的I/O操作,大于传递给构造函数的nTimeOutValue时一个超时情况发生。

// A template class designed for verifying client sockets

// against time-out cases. Time-out case = if no I/O actions

// happen with a socket during a configured number of

// seconds. This implies that the attachment of the

// client socket must implement "GetTimeElapsed()"

// and "ResetTime(...)" methods.

template<class T>

class CTimeOutChecker: public IRunnable {

private:

    unsigned int               m_nTimeOutValue;

    vector<ClientSocket<T> *>  *m_arrSocketPool;

    IOCPSimple<T>              *m_hIocp;

protected:

    // method checks sockets to detect time-out cases

    virtual void run() {...}

public:

    // Constructor of the template class. Requires:

    // - a pointer to a collection of pointers to client sockets.

    //   This is the collection of sockets to be checked against

    //   time-out cases.

    // - a pointer to the IO completion port object responsible

    //   for checking I/O events against passed collection of

    //   client sockets.

    // - value of the time-out, in seconds.

    CTimeOutChecker( vector<ClientSocket<T> *> *arrSocketPool, 

                     IOCPSimple<T> *hIocp, unsigned int nTimeOutValue ) {...}

    // Nothing to destruct as inside the template class

    // we keep/use just pointers obtained from external

    // sources.

    ~CTimeOutChecker() {};

};

这个类被ServerService 模板类(下面描述)使用,并且,同样的,它的run() 方法将被关联到ServerService的线程池 CSimpleThreadPool)中的一个线程执行,ServerService 也将传递套接字池(关联到ServerService ,通过GetPool()方法)到CTimeOutChecker (的实例)。这样,CTimeOutChecker 知道哪些套接字需要检查。

如果一个超时情况发生,则一个相关的ClientSocket(实例)被传递到IOCP队列,状态是“关闭”。IOCP,早晚会从队列中返回它,并且会将它“传递”给ServerService ServerService 将创建一个“事件任务”,并且将它传递给CSimpleThreadPool。“事件任务”,将会被CSimpleThreadPool中的一个线程执行,会“启动”ISockEvent(实现)的OnClose()“事件”,并且会关闭套接字。这就是它如何工作的。

另外,CTimeOutChecker 需要“附件”实现两个额外的方法:GetTimeElapsed() ResetTime( bool ).。因此,从技术上来说,CTimeOutChecker 的行为被下面两个方法的实现所控制:

如果一个特定的ClientSocket(的附件)的GetTimeElapsed()返回0,则超时情况对ClientSocket不适用,这是一种使指定套接字超时检查无效的方法。

尽管ResetTime( bool )的实现可以是任意的(根据开发者而定,必须依据特征)。实际需求是:

Ø     调用ResetTime( true )必须强制后面的GetTimeElapsed()调用返回0。这使得在某一时刻(当需要时)对一个特定套接字的超时检测无效。例如:当写一个WebHTTP)服务时,当从ClientSocket中等待完成HTTP请求时,有必要开启超时检查。一旦收到完成HTTP请求,有必要关闭相关ClientSocket的超时检查。

Ø      调用ResetTime( false )必须强制后面的GetTimeElapsed()调用返回自动上次调用ResetTime( true )后实际已过去的时间,单位是秒。也就是说,它应该记录当前时间,并且GetTimeElapsed()应该返回当前时间到所记录的时间之间的间隔,以秒为单位。它是由开发者调用ResetTime( false ),当ClientSocket::WriteToSocket()或者ClientSocket::ReadFromSocket()成功完成时(根据服务器的开发策略)。

CTimeOutChecker 的有用之处在哪呢?仅仅是关闭那些经过一段(配置的)时间“没有响应”的套接字,这样可以为其它即将进来的连接释放一些空间(注意ServerSocker 类的nMaxClient参数)。

但是,如果这个逻辑不需要,有一种方式可以使CTimeOutChecker (完全的)无效。仅仅是将ServerService 构造函数的timeout参数设置为0

10、ISockEventServerService

最后,让我开始这个结构的核心。如果你没能记住以上所描述的全部细节也是可以的,因为它们全部被封装进ServerService 模板类的逻辑(原因是相同的,“附件”元素),可是,记住第8和第9节是重要的。所有的类在server_service.h文件中被定义。让我们从ServerService 开始:

// Well, finally, here is the template class joining all

// the stuff together. Considering the Aspect Oriented paradigm,

// this template class may be seen as an Aspect. The "individualizations"

// of this aspect are "ISockEvent<T>" and "T" itself. "T" is nothing

// else but attachment of the client socket (see ClientSocket<T> template

// class for more details). Implementing "ISockEvent<T>" and "T" will

// define the individual behaviour of this aspect.

// It is a composition of the IOCP, server socket, time-out checker

// and thread pool. Class implements the business logic that makes all

// these entities working together.

template<class T>

class ServerService: public IRunnable {

private:

    ServerSocket<T>      m_ServerSocket;

    IOCPSimple<T>        m_hIocp;

    ISockEvent<T>        *m_pSEvent;

    CTimeOutChecker<T>   *m_TChecker;

    // thread pool which will execute the tasks

    CSimpleThreadPool    *m_ThPool;

    // a pool of the CSockEventTask<T> objects

    QueuedBlocks<CSockEventTask<T> > m_SockEventTaskPool;

protected:

    // This method will be executed by a thread

    // of the task pool.

    virtual void run() {...}

public:

    // Constructor or the class.

    // pSEvent   - pointer to an instance implementing

    //                ISockEvent<T>. This instance will be used

    //                as a client socket event handler.

    // nPort        - port number to bind server socket to.

    // nMaxClients  - the maximum number of accepted (concurrent)

    //                client connections. To be passed to

    //                the server socket and also will be used

    //                as the initial size for the pool of the

    //                CSockEventTask<T> objects.

    // nNoThreads   - indicative (the real one is computed,

    //                see below) number of the threads

    //                to be created by the thread pool.

    // timeout      - the value of the time-out, in seconds.

    //                Will be passed to the time-out checker.

    //                If time-out is zero, time-out checker

    //                will not be created.

    // blnBindLocal - see ServerSocket<T> for more details.

    //                If "true" then server socket is bind

    //                to localhost ("127.0.0.1").

    //                If "false" then server socket is bind

    //                to INADDR_ANY ("0.0.0.0").

    ServerService( ISockEvent<T> *pSEvent, unsigned int nPort,

                unsigned int nMaxClients, unsigned int nNoThreads, 

                unsigned int timeout, bool blnBindLocal = true ): 

                    m_ServerSocket( nPort, nMaxClients, true, blnBindLocal ), 

                    m_hIocp( 200 ), m_SockEventTaskPool( nMaxClients ) 

    {...}

    virtual ~ServerService() {...}

    // Start the threat pool (== all the

    // threads in the pool).

    void start() {...}

};

这就是ServerService ,没有任何窍门。关于代码的注释应该足够强调响应的逻辑了。所以,我将会集中提供一点关于ISockEvent 模板类的详细信息(实际上,模板接口),该模板类作为一个参数被传递给ServerService 构造函数。

ISockEvent 是一个包含纯虚函数的模板接口:

// A template interface showing how the client socket event

// handler should look like.

template<class T>

class ISockEvent {

public:

    // Client socket ("pSocket") is about to be closed.

    virtual void OnClose( ClientSocket<T> *pSocket, 

            MYOVERLAPPED *pOverlap,

            ServerSocket<T> *pServerSocket,

            IOCPSimple<T> *pHIocp

        ) = 0;

    // Client socket ("pSocket") was just accepted by

    // the server socket (new connection with a client

    // is created).

    virtual void OnAccept( ClientSocket<T> *pSocket, 

            MYOVERLAPPED *pOverlap,

            ServerSocket<T> *pServerSocket,

            IOCPSimple<T> *pHIocp

        ) = 0;

    // Client socket ("pSocket") was returned from the IOCP

    // queue with status _PENDING. For more details see

    // "IOCPSimple<T>::SetPendingMode(...)". This method

    // is invoked only if there was a call to

    // "IOCPSimple<T>::SetPendingMode(...)", performed by a

    // user code, internally "SetPendingMode(...)"

    // is never called.

    virtual void OnPending( ClientSocket<T> *pSocket, 

            MYOVERLAPPED *pOverlap,

            ServerSocket<T> *pServerSocket,

            IOCPSimple<T> *pHIocp

        ) = 0;

    // Client socket ("pSocket") was returned from IOCP

    // queue with status _READ. This means that overlapped

    // reading operation, started previously with

    // "ClientSocket<T>::ReadFromSocket(...)", was

    // successfully finished.

    // - "pOverlap->DataBuf" structure points to the data

    //   buffer and buffer's size that were passed to the

    //   "ClientSocket<T>::ReadFromSocket(...)".

    // - "dwBytesTransferred" will indicate how many

    //   bytes were read.

    virtual void OnReadFinalized( ClientSocket<T> *pSocket, 

            MYOVERLAPPED *pOverlap,

            DWORD dwBytesTransferred,

            ServerSocket<T> *pServerSocket,

            IOCPSimple<T> *pHIocp

        ) = 0;

    // Client socket ("pSocket") was returned from IOCP

    // queue with status _WRITE. This means that overlapped

    // writting operation, started previously with

    // "ClientSocket<T>::WriteToSocket(...)", was

    // successfully finished.

    // - "pOverlap->DataBuf" structure points to the data

    //   buffer and buffer's size that were passed to the

    //   "ClientSocket<T>::WriteToSocket(...)".

    // - "dwBytesTransferred" will indicate how many

    //   bytes were written.

    virtual void OnWriteFinalized( ClientSocket<T> *pSocket, 

            MYOVERLAPPED *pOverlap,

            DWORD dwBytesTransferred,

            ServerSocket<T> *pServerSocket,

            IOCPSimple<T> *pHIocp

        ) = 0;

};

这个接口的实现(当然还是有“附件”)将定义服务器程序。没有其它的需要被做的...真的!ISockEvent包含了用于跟踪和处理套接字的所有可能的状态所需要的全部方法。让我们来看看:

当一个新的连接被ServerSocket接收,ServerSevice将确保OnAccept()事件被调用。它将由开发决定如何处理这个事件。典型的,通过这个事件,你可能发起pSocket->ReadFromSocket()或者pSocket->WriteToSocket(),基于服务器开发的策略(服务器希望客户端发起会话或者反之亦然)。

如果一个套接字将要被关闭(远端关闭连接;CTimeOutChecker 发起这个操作,或者你在一个事件的代码中调用了pHIocp->SetCloseMode(pSocket)),然后OnClose()事件被调用。使用这个时间去纸箱任何需要的清理。如果在你没有或者忘记在OnClose()中关闭套接字也是可以的,因为在这个事件完成后,这个套接无论如何是会被关闭的。

如果,在一个事件的代码内,pHIocp->SetPendingMode(pSocket)被调用,是的,按上述第8节。确保你使用OnPending()处理了它。

如果,在一个事件的代码内(除了OnClose()),pSocket->ReadFromSocket()被调用(并且返回值不是一个错误值),OnReadFinalized()事件将会被调用,一旦“读”操作完成,但是不要指望传递的缓存会被完全填充到缓存的大小。避免在这期间(从ReadFromSocket()  OnReadFinalized())使用已传递的缓冲区。这是从IOCP API继承下来的要求,因此,我们在此不能做任何事情。

如果,在一个事件的代码内(除了OnClose()),pSocket->WriteToSocket()被调用(并且返回值不是一个错误值),OnWriteFinalized()事件将会被调用,一旦“写”操作完成。这意味着全部的缓存已经被发出去了。避免在这期间(从WriteToSocket()  OnWriteFinalized())使用已传递的缓冲区,原因如上。

最后,你不需要创建额外的线程,因为每个事件将被关联到ServerServiceCSimpleThreadPool中的一个线程调用。如果你需要更多的线程,传递恰当的值给ServerService 的构造函数的nNoThreads 参数。并且,是的,确保每个事件的实现或早或晚的退出。我的意思是,努力避免关于事件的无限循环;否则,停止运行中停止ServerService 会有问题(尽管,如果在循环中检查CThread::currentThread().isInterrupted()仍然存在问题

最后请那些计划在多核CPU上写服务器的人注意:如果环境中有NCPU,则恰好CSimpleThreadPoolN个线程执行ServerService::run()函数,为了获得更好的性能(如MSDN所推荐的那样)。确保你不要在一个事件内多次调用pSocket->ReadFromSocket() 或者 pSocket->WriteToSocket() (尽量设计程序使其只有正确的,没有错误的,祈祷吧)。这并不是说这个架构不能处理重复调用的情况。问题是,虽然发送/接受请求的顺序是保证能够符合调用顺序的(在IOCP队列中),但是报告操作完成的顺序不能够保证符合调用顺序,因为GetQueuedCompletionStatus()会被多线程调用。但是如果你不关心顺序是否正确,可以忽略此点。

11、一个简单模拟服务器的实现

作为一个“技术性验证测试”,让我们实现一个简单的模拟服务器,这个服务器从套接上读取7个字符,并且,一旦缓冲区被填充,将它们发送回去。完成得代码在资源中。

A.首先,让我们定义服务器的参数和缓冲区的大小。

#define BUFF_SIZE       8

#define MAX_CONNECTIONS 10

#define NO_THREADS      4

#define TIME_OUT        10

#define PORT            8080

B.现在,让我们定义“附件”

struct Attachment {

    volatile time_t    tmLastActionTime;

    char            sString<BUFF_SIZE>;

    DWORD           dwStringSize; // current string's size

    Attachment() { Clear(); };

    bool Commit( DWORD dwBytesTransferred ) {

        DWORD dwSize = dwStringSize + dwBytesTransferred;

        if ( dwBytesTransferred <0 ) return false;

        if ( dwSize >= BUFF_SIZE ) return false;

        dwStringSize = dwSize;

        sString[dwStringSize] = 0;

        return true;

    };

    // as requested by the API of the framework

    void Clear() { memset(this0sizeof(Attachment) ); };

    // as requested by the API of the framework

    void ResetTime( bool toZero ) { 

        if (toZero) tmLastActionTime = 0;

        else {

            time_t    lLastActionTime;

            time(&lLastActionTime); 

            tmLastActionTime = lLastActionTime;

        }

    };

    // as requested by the API of the framework

    long GetTimeElapsed() {

        time_t tmCurrentTime;

        if (0 == tmLastActionTime) return 0;

        time(&tmCurrentTime);

        return (tmCurrentTime - tmLastActionTime);

    };

};

C.现在,我们必须实例化模板类

typedef ClientSocket<Attachment> MyCSocket;

typedef ServerSocket<Attachment> MySSocket;

typedef IOCPSimple<Attachment> MyIOCPSimple;

typedef ISockEvent<Attachment> MyISockEvent;

typedef ServerService<Attachment> MyServerService;

D.现在,我们实现ISockEvent<Attachment>:

class MyISockEventHandler: public MyISockEvent {

public:

    MyISockEventHandler() {};

    ~MyISockEventHandler() {};

    // empty method, not used

    virtual void OnClose( MyCSocket *pSocket, MYOVERLAPPED *pOverlap, 

            MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {};

    // empty method, not used

    virtual void OnPending( MyCSocket *pSocket, MYOVERLAPPED *pOverlap, 

            MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {};

    virtual void OnAccept( MyCSocket *pSocket, MYOVERLAPPED *pOverlap, 

        MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {

        int nRet;

        DWORD dwSize;

        char *temp;

        dwSize = BUFF_SIZE - 1;

        temp = pSocket->GetAttachment()->sString;

        // initiate the reading with OnAccept

        nRet = pSocket->ReadFromSocket( temp, dwSize );

        pSocket->GetAttachment()->ResetTime( false );

        if ( nRet == SOCKET_ERROR ) {

            pServerSocket->Release( pSocket );

        }

    };

    virtual void OnReadFinalized( MyCSocket *pSocket, 

                MYOVERLAPPED *pOverlap, DWORD dwBytesTransferred, 

                MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {

        int nRet;

        DWORD dwSize, dwPos;

        char *temp;

        // finalize the filling of the buffer

        pSocket->GetAttachment()->Commit( dwBytesTransferred );

        dwSize = BUFF_SIZE - 1;

        dwPos = pSocket->GetAttachment()->dwStringSize;

        temp = pSocket->GetAttachment()->sString;

        nRet = pSocket->ReadFromSocket(    temp + dwPos, dwSize - dwPos );

        pSocket->GetAttachment()->ResetTime( false );

        if ( nRet == SOCKET_ERROR ) {

            pServerSocket->Release( pSocket );

        }

        else if ( nRet == RECV_BUFFER_EMPTY ) {

            // means that dwSize - dwPos == 0, so send the data

            // back to the socket

            nRet = pSocket->WriteToSocket( temp, dwSize );

            if ( nRet == SOCKET_ERROR ) {

                pServerSocket->Release( pSocket );

            }

        }

    };

    virtual void OnWriteFinalized( MyCSocket *pSocket, 

                MYOVERLAPPED *pOverlap, DWORD dwBytesTransferred, 

                MySSocket *pServerSocket, MyIOCPSimple *pHIocp ) {

        // clean the attachment

        pSocket->GetAttachment()->Clear();

        // and once again

        OnAccept(pSocket, NULL,pServerSocket, NULL);

    };

};

E. 并且,最后

int main(int argc, char* argv[])

{

    int    nRet;

    MyServerService *sService;

    MyISockEventHandler *mSockHndl;

    WSAData    wsData;

    nRet = WSAStartup(MAKEWORD(2,2),&wsData);

    if ( nRet < 0 ) {

        Log::LogMessage(L"Can't load winsock.dll./n");

        return -1;

    }

    try {

        Overlapped::Init( MAX_CONNECTIONS );

        mSockHndl = new MyISockEventHandler();

        sService = new MyServerService((MyISockEvent *) mSockHndl,

                   PORT, MAX_CONNECTIONS, NO_THREADS, TIME_OUT, false);

        sService->start();

        printf("hit <enter> to stop .../n");

        while( !_kbhit() ) ::Sleep(100);

        delete sService;

        delete mSockHndl;

    }

    catch (const char *err) {

        printf("%s/n", err);

    }

    catch (const wchar_t *err) {

        wprintf(L"%ls/n", err);

    }

    WSACleanup();

    return 0;

}

 

(本文从本人的另一个CSDN账户上搬移过来)