先前整的完成端口服务器在进行客户端之间的通信时有些bug.在通过网上和人交流后,采用了逻辑线程、消息队列来进行服务器中转操作。大体上只是用了这个理念,自己夏琢磨这写的。真正的消息队列和逻辑线程可能认识得还是冰山一角。
客户端:
头文件:
#pragma once
#include <iostream>
#include <windows.h>
#include < Ws2bth.h >
using namespace std;
#pragma comment(lib, "ws2_32.lib")
#define MAX_CONCURRENT 2000 // 并发线程最大数
#define IP "192.168.1.185" // 服务器端IP地址
#define PORT 123456 // 服务器端口号
class CSOCKET
{
public:
// 加载Winsock库
static BOOL WSAInit();
// 客户端去连接时的操作函数
static BOOL ConnectA(DWORD threadID);
static BOOL ConnectB(DWORD threadID);
// 线程函数
static unsigned int __stdcall _ThreadFuncA();
static unsigned int __stdcall _ThreadFuncB();
// 清理工作
static BOOL Clean(SOCKET sockClient);
};
cpp文件
#include <iostream>服务器端:
#include <windows.h>
#include < Ws2bth.h >
#include"client5.h"
char * left(const char * str, int n)
{
if (n < 0)
n = 0;
char * p = new char[n + 1];
int i;
for (i = 0; i < n && str[i]; i++)
p[i] = str[i]; // copy characters
while (i <= n)
p[i++] = '\0'; // set rest of string to '\0'
return p;
}
int SendDataToSocketServer(SOCKET socketL, const char * cSendData)
{
Sleep(10);
int Flag = send(socketL, cSendData, strlen(cSendData), 0);
if (Flag == SOCKET_ERROR)
{
return 444;//如果len大于s的发送缓冲区的长度,该函数返回SOCKET_ERROR;如果send在等待协议传送数据时网络断开的话那么send函数也返回SOCKET_ERROR
}
else
{
return Flag;//返回实际copy到发送缓冲区的的字节数
}
}
int RecvDataToSocketClient(SOCKET socketL, char * cSendData)
{
if (recv(socketL, cSendData, strlen(cSendData), 0) == SOCKET_ERROR)
{
return FALSE;
}
return TRUE;
}
/**********************************************************************
Description: :客户端执行发,收的逻辑程序
Input : 客户端所用的线程ID
***********************************************************************/
//发送PIN码的客户端
int DataIntegrationA(SOCKET socketL, DWORD threadID)
{
int k = GetTickCount();
int oldK = k;//确定循环发送体的起始时间
int currK = k;
int Sendflag;
//定义并初始化客户端A要发送给服务器的PIN码
char Sendmessage[100] = { 0 };
sprintf(Sendmessage, "%s%d%s", "1 I'm<TID:", threadID, ">");
//定义并初始化客户端A要收到的数据
char FromSever[100] = { 0 };
//在一段时间内time持续发送、接收
while (currK < (oldK + 3000))
{
Sleep(1000);
//发送
Sendflag = SendDataToSocketServer(socketL, Sendmessage);
//发送失败,直接结束返回
if (Sendflag == 444)
{
//找发送失败原因
printf("\n<线程 %d>客户端发送已经被服务器切断 %d\n", threadID, GetLastError());
return 1;
}
//成功则继续下来的收
else
{
//接收
cout << "[" << threadID << "]" << "发给中转站:" << Sendmessage << endl;
}
//接收
memset(FromSever, 0, 100);
int RecvFlag = recv(socketL, FromSever, 100, 0);
//接收失败,直接结束返回
if (RecvFlag == 0)
{
printf("\n<线程 %d>客户端连接已经被服务器切断,误码 %d\n", threadID, GetLastError());
return 1;
}
else if (RecvFlag < 0)
{
printf("\n<线程 %d>客户端接收时出现网络错误, 误码:%d\n", threadID, GetLastError());
return 1;
}
//成功则继续
else
{
cout << "[" << threadID << "]" << "收到中转站:" << FromSever << endl;
}
//更新当前运行时间,循环体执行判断
currK = GetTickCount();
}
//
//(业务逻辑结束)再发送,是为了释放资源
Sendflag = SendDataToSocketServer(socketL, Sendmessage);
//发送失败,直接结束返回
if (Sendflag == 444)
{
//找发送失败原因
printf("\n<线程 %d>客户端发送已经被服务器切断 %d\n", threadID, GetLastError());
return 1;
}
//成功则继续下来的收
else
{
//接收
cout << "[" << threadID << "]" << "发给中转站:" << Sendmessage << endl;
}
return 0;
}
//接收PIN码客户端B
int DataIntegrationB(SOCKET socketL, DWORD threadID)
{
int k = GetTickCount();
int oldK = k;//确定循环发送体的起始时间
int currK = k;
int Sendflag;
//定义并初始化客户端A要发送给服务器的PIN码
char Sendmessage[100] = { 0 };
sprintf(Sendmessage, "%s%d%s", "2 I'm<TID:", threadID, ">");
//定义并初始化客户端A要收到的数据
char FromSever[100] = { 0 };
//在一段时间内time持续接收发送
while (currK < (oldK + 3000))
{
Sleep(1000);
//收
memset(FromSever, 0, 100);
int RecvFlag = recv(socketL, FromSever, 100, 0);
if (RecvFlag == 0)
{
printf("\n<线程 %d>客户端连接已经被服务器切断,误码 %d\n", threadID, GetLastError());
return 1;
}
else if (RecvFlag < 0)
{
printf("\n<线程 %d>客户端接收时出现网络错误, 误码:%d\n", threadID, GetLastError());
return 1;
}
else
{
cout << "[" << threadID << "]" << "收到中转站:" << FromSever << endl;
}
//发送
Sendflag = SendDataToSocketServer(socketL, Sendmessage);
//发送失败
if (Sendflag == 444)
{
//找发送失败原因
printf("\n<线程 %d>客户端发送已经被服务器切断 %d\n", threadID, GetLastError());
return 1;
}
else
{
//接收
cout << "[" << threadID << "]" << "发给中转站:" << Sendmessage << endl;
}
//更新当前运行时间,循环体执行判断
currK = GetTickCount();
}
return 0;
}
/***********************************
Description:加载Winsock库
************************************/
BOOL CSOCKET::WSAInit()
{
WSADATA wsaData;
int nRet = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (nRet != 0)
{
return FALSE;
}
return TRUE;
}
/********************************************
Description : 连接服务器端并发送数据
InPut : sockClient - SOCKET
wStr - 日志字符串
Return : TRUE - 执行成功
FALSE - 连接或发送失败
*********************************************/
BOOL CSOCKET::ConnectA(DWORD threadID)
{
struct sockaddr_in ServerAddress;
struct hostent *Server; //包含主机名字和地址信息的hostent结构指针
SOCKET sockClient = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
BOOL bOptval = TRUE;
setsockopt(sockClient, SOL_SOCKET, SO_KEEPALIVE, (char*)&bOptval, sizeof(bOptval));
// 生成地址信息
Server = gethostbyname(IP);
ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
ServerAddress.sin_family = AF_INET;
CopyMemory((char *)&ServerAddress.sin_addr.s_addr, (char *)Server->h_addr, Server->h_length);
ServerAddress.sin_port = htons(PORT);
int nRet = 0;
nRet = connect(sockClient, (SOCKADDR*)&ServerAddress, sizeof(ServerAddress));
if (nRet == SOCKET_ERROR)
{
cout << "初始连接Server失败。" << endl;
closesocket(sockClient);
return FALSE;
}
else
{
DataIntegrationA(sockClient, threadID);
//时间走完(或者自己未通过验证)关闭套接字
closesocket(sockClient);
return TRUE;
}
}
BOOL CSOCKET::ConnectB(DWORD threadID)
{
struct sockaddr_in ServerAddress;
struct hostent *Server; //包含主机名字和地址信息的hostent结构指针
SOCKET sockClient = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
BOOL bOptval = TRUE;
setsockopt(sockClient, SOL_SOCKET, SO_KEEPALIVE, (char*)&bOptval, sizeof(bOptval));
// 生成地址信息
Server = gethostbyname(IP);
ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
ServerAddress.sin_family = AF_INET;
CopyMemory((char *)&ServerAddress.sin_addr.s_addr, (char *)Server->h_addr, Server->h_length);
ServerAddress.sin_port = htons(PORT);
int nRet = 0;
nRet = connect(sockClient, (SOCKADDR*)&ServerAddress, sizeof(ServerAddress));
if (nRet == SOCKET_ERROR)
{
cout << "初始连接Server失败。" << endl;
closesocket(sockClient);
return FALSE;
}
else
{
DataIntegrationB(sockClient, threadID);
//发完关闭套接字
closesocket(sockClient);
return TRUE;
}
}
/********************************************
Description : 多线程函数
InPut : mystruct - 结构体
*********************************************/
unsigned int CSOCKET::_ThreadFuncA()
{
DWORD TID = GetCurrentThreadId();//获得当前线程Id
Sleep(500);
//全局变量计算多个线程
cout << "(客户端)线程 " << TID << "开启" << endl;
ConnectA(TID);
cout << "(客户端)线程 " << TID << "关闭" << endl;
return 1;
}
unsigned int CSOCKET::_ThreadFuncB()
{
DWORD TID = GetCurrentThreadId();//获得当前线程Id
Sleep(1500);
cout << "(客户端)线程 " << TID << "开启" << endl;
ConnectB(TID);
cout << "(客户端)线程 " << TID << "关闭" << endl;
return 1;
}
/********************************************
Description : 清理工作
InPut : sockClient - SOCKET
*********************************************/
BOOL CSOCKET::Clean(SOCKET sockClient)
{
closesocket(sockClient);
return TRUE;
}
/*
* Synchronically waiting for all objects signaled.
* - handles : An array of object handles to wait.
* - count : The count of handles.
* returns : Same as WaitForMultipleObjects.
*/
DWORD SyncWaitForMultipleObjs(HANDLE * handles, int count)
{
int waitingThreadsCount = count;
int index = 0;
DWORD res = 0;
while (waitingThreadsCount >= MAXIMUM_WAIT_OBJECTS)
{
res = WaitForMultipleObjects(MAXIMUM_WAIT_OBJECTS, &handles[index], TRUE, INFINITE);
if (res == WAIT_TIMEOUT || res == WAIT_FAILED)
{
puts("1. Wait Failed.");
return res;
}
waitingThreadsCount -= MAXIMUM_WAIT_OBJECTS;
index += MAXIMUM_WAIT_OBJECTS;
}
if (waitingThreadsCount > 0)
{
res = WaitForMultipleObjects(waitingThreadsCount, &handles[index], TRUE, INFINITE);
if (res == WAIT_TIMEOUT || res == WAIT_FAILED)
{
puts("2. Wait Failed.");
}
}
return res;
}
#include <Afx.h>
#include <Windows.h>
#include <Winsock2.h>
#pragma comment(lib, "WS2_32.lib")
#include<stdio.h>
#include <mswsock.h> //微软扩展的类库
#include<vector>
#include<queue>
#include<iostream>
using namespace std;
///////////////////////////////////////////////////
//全局变量和数据结构声明部分
#define DATA_BUFSIZE 100
#define READ 0
#define WRITE 1
#define ACCEPT 2
queue<char*> myqueue; //使用前需定义一个消息队列queue变量
CRITICAL_SECTION m_thread;//互斥量
DWORD g_count = 0;
volatile long IDnum = 0;
//单IO结构
typedef struct _io_operation_data
{
OVERLAPPED overlapped;
WSABUF databuf;
CHAR buffer[DATA_BUFSIZE];
DWORD len;
SOCKET sock;
BYTE type;//请求操作类型(连入,发送,接收)
int IOnum = 0; //每个新连入的客户端分配IO数据结构ID
}IO_OPERATION_DATA, *LP_IO_OPERATION_DATA;
//完成键
typedef struct _completion_key
{
SOCKET sock;
char sIP[100]; //本机测试,IP都是127.0.0.1,没啥意思,实际写时候这个值填的是端口号
BOOL first; //表示是不是客户端连接时发的第一条消息(第一条则定为true,否则定为flase
char MsgBuff[100] = { 0 };//不停的更新在这个socket的数据
int clientID = 0;//每个新连入的客户端分配完成键ID
BOOL islive;
}COMPLETION_KEY, *LP_COMPLETION_KEY;
//所有的IO请求操作
typedef struct _eachIODate
{
//socket m_socket;
vector<LP_IO_OPERATION_DATA> SocketEachIODate;//放着这个socket上的每次IO请求数据
}EachIODate, *LP_EachIODate;
EachIODate m_EachIOData;
//所有连入客户端信息句柄
vector<LP_COMPLETION_KEY> m_arrayClientFlag;// 结构体指针数组,存放指向每个连入客户端Socket的相关信息(地址等)
//完成端口句柄
HANDLE g_hComPort = NULL;
//主进程运行标志
BOOL g_bRun = FALSE;
//监听套接字,其实也不一定要是全局的。用于接收到客户端连接后继续发起等待下一个客户端连接操作。
SOCKET g_sListen;
///////////////////////////////////////////////////
//函数声明部分
LPFN_ACCEPTEX lpfnAcceptEx = NULL; //AcceptEx函数指针
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs; //加载GetAcceptExSockaddrs函数指针
//投递接收连接操作
BOOL AcceptClient(SOCKET sListen);
//投递接收数据操作
BOOL RecvFunc(COMPLETION_KEY *pComKey);
BOOL SendFunc(COMPLETION_KEY *pComKey, char* msgBuff);
//处理IO结果
BOOL ProcessIO(IO_OPERATION_DATA *pIOdata, COMPLETION_KEY *pComKey);
//逻辑队列消息处理线程
DWORD WINAPI LogicProThread(LPVOID pParam);
//服务线程
DWORD WINAPI ServerWorkerThread(LPVOID pParam);
bool IsSocketAlive(SOCKET s);
char* left(const char * str, int n);
void _AddToContextList(LP_COMPLETION_KEY pHandleData);
void _AddToEachIODataList(LP_IO_OPERATION_DATA pHandleIO);
void _DelToContextList(LP_COMPLETION_KEY pHandleData);
void _DelToIOtList(SOCKET msocket);
///////////////////////////////////////////////////
//函数体部分
//主函数入口
int main(int argc, char* argv[])
{
g_bRun = TRUE;
InitializeCriticalSection(&m_thread);
//建立第一个完成端口
g_hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (g_hComPort == NULL)
{
printf("Create completionport error! %d\n", WSAGetLastError());
return 0;
}
//创建逻辑线程
HANDLE LogichThread;
DWORD dwLogicThreadID;
LogichThread = CreateThread(NULL, 0, LogicProThread, NULL, 0, &dwLogicThreadID);
CloseHandle(LogichThread);
//创建服务线程
SYSTEM_INFO sysInfor;
GetSystemInfo(&sysInfor);
int i = 0;
for (i = 0; i < sysInfor.dwNumberOfProcessors * 2; i++)
{
HANDLE hThread;
DWORD dwThreadID;
hThread = CreateThread(NULL, 0, ServerWorkerThread, g_hComPort, 0, &dwThreadID);
CloseHandle(hThread);
}
//加载套接字库
WSADATA wsData;
if (0 != WSAStartup(MAKEWORD(2, 2), &wsData))
{
printf("加载套接字库失败! %d\n", WSAGetLastError());
g_bRun = FALSE;
return 0;
}
//先创建一个套接字用于监听
SOCKET sListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
g_sListen = sListen;
LP_COMPLETION_KEY pComKey; //完成键(将来存放监听套接字的地址信息和值)
pComKey = (LP_COMPLETION_KEY)GlobalAlloc(GPTR, sizeof(COMPLETION_KEY));//开辟内存
pComKey->sock = sListen;
//将监听套接字与完成端口绑定(并把完成键传入进来)
CreateIoCompletionPort((HANDLE)sListen, g_hComPort, (DWORD)pComKey, 0);
//填充服务器Ip和端口地址
SOCKADDR_IN serAdd;
serAdd.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
serAdd.sin_family = AF_INET;
serAdd.sin_port = htons(123456);
//将地址和完成端口进行绑定
if (SOCKET_ERROR == bind(sListen, (SOCKADDR*)&serAdd, sizeof(SOCKADDR)))
{
printf("端口和地址绑定失败\n");
return 0;
}
// 开始进行监听完成端口
if (SOCKET_ERROR == listen(sListen, SOMAXCONN))
{
goto STOP_SERVER;
}
//使用WSAIoctl获取AcceptEx函数指针
if (true)
{
DWORD dwbytes = 0;
//Accept function GUID
GUID guidAcceptEx = WSAID_ACCEPTEX;
if (SOCKET_ERROR == WSAIoctl(
sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,//将进行的操作的控制代码。
&guidAcceptEx,
sizeof(guidAcceptEx),
&lpfnAcceptEx,
sizeof(lpfnAcceptEx),
&dwbytes,
NULL, //WSAOVERLAPPED结构的地址
NULL//一个指向操作结束后调用的例程指针
)
)
{
printf("WSAIoctl 未能获取AcceptEx函数指针\n"); //百度百科,有关该函数的所有返回值都有!
}
// 获取GetAcceptExSockAddrs函数指针,也是同理
GUID guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
if (SOCKET_ERROR == WSAIoctl(
sListen,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&guidGetAcceptExSockaddrs,
sizeof(guidGetAcceptExSockaddrs),
&lpfnGetAcceptExSockaddrs,
sizeof(lpfnGetAcceptExSockaddrs),
&dwbytes,
NULL,
NULL
)
)
{
printf("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针\n");
}
}
//第一次就一次性投递多个AcceptEx异步请求,发起接收客户端的异步请求
for (i = 0; i < 1; i++)
{
AcceptClient(sListen);
}
//不让主线程退出
while (g_bRun)
{
Sleep(1000);
}
STOP_SERVER:
closesocket(sListen);
g_bRun = FALSE;
DeleteCriticalSection(&m_thread);
WSACleanup();
return 0;
}
//逻辑线程函数入口
DWORD WINAPI LogicProThread(LPVOID pParam)
{
char Megbuff[100] = { 0 };
BOOL run = TRUE;
while (run)
{
//先检查一下所有socket是否还活着
vector<LP_COMPLETION_KEY>::iterator t;
for (t = m_arrayClientFlag.begin(); t != m_arrayClientFlag.end();)
{
if (FALSE == IsSocketAlive((*t)->sock) || FALSE == ((*t)->islive))
{
_DelToIOtList((*t)->sock);
closesocket((*t)->sock);
GlobalFree((*t));
t = m_arrayClientFlag.erase(t);
}
else
{
t++;
}
}
//当消息队列有消息时,消息数据包里的东西分类处理
while (!myqueue.empty())
{
memset(Megbuff, 0, 100);
strcpy(Megbuff, myqueue.front());
//如果信息来源是1号客户端
if (strcmp(left(Megbuff, 1), "1") == 0)
{
vector<LP_COMPLETION_KEY>::iterator it1;
for (it1 = m_arrayClientFlag.begin(); it1 != m_arrayClientFlag.end();)
{
if ((*it1)->islive == FALSE)
{
_DelToIOtList((*it1)->sock);
closesocket((*it1)->sock);
GlobalFree((*it1));
it1 = m_arrayClientFlag.erase(it1);
}
else
{
//判断是否找到指向了客户端
if ((*it1)->clientID == 2)
{
SendFunc((*it1), Megbuff);
}
it1++;
}
}
}
//如果信息来源是2号客户端
else if (strcmp(left(Megbuff, 1), "2") == 0) //如果信息来源是1号客户端
{
vector<LP_COMPLETION_KEY>::iterator it2;
for (it2 = m_arrayClientFlag.begin(); it2 != m_arrayClientFlag.end();)
{
if ((*it2)->islive == FALSE)
{
_DelToIOtList((*it2)->sock);
closesocket((*it2)->sock);
GlobalFree((*it2));
it2 = m_arrayClientFlag.erase(it2);
}
else
{
//判断是否找到指向了客户端
if ((*it2)->clientID == 1)
{
SendFunc((*it2), Megbuff);
}
it2++;
}
}
}
myqueue.pop();
}
Sleep(10);
}
return 0;
}
//服务线程函数入口
DWORD WINAPI ServerWorkerThread(LPVOID pParam)
{
HANDLE completionPort = (HANDLE)pParam;
DWORD dwIoSize;
BOOL bRet;
COMPLETION_KEY* pComKey; //新建完成键指针变量
LP_IO_OPERATION_DATA lpIOoperData; //新建单I/O数据指针变量
//检查完成端口请求队列,是否有网络请求到来
while (g_bRun)
{
//将所用到的变量、指针初始化
bRet = FALSE;
dwIoSize = -1;
pComKey = NULL;
lpIOoperData = NULL;
//下函数从完成端口取出一个成功I/O操作的完成包,返回值为非0
bRet = GetQueuedCompletionStatus(
g_hComPort,
&dwIoSize,
(LPDWORD)&pComKey,
(LPOVERLAPPED*)&lpIOoperData,
INFINITE
);
// 判断是否出现了错误
if (bRet == FALSE)
{
if (NULL == lpIOoperData) //函数则不会在lpNumberOfBytes and lpCompletionKey所指向的参数中存储信息
{
continue;
}
else
{ //当lpIOoperData !=NULL 不为空并且函数从完成端口出列一个失败I/O操作的完成包,返回值为0。
//函数在指向lpNumberOfBytes, lpCompletionKey, lpOverlapped的参数指针中存储相关信息
DWORD dwErr = GetLastError();
if (pComKey == NULL)
{
printf("此时链接的socket为空\n");
continue;
}
else
{
//链接超时
if (WAIT_TIMEOUT == dwErr)
{
// 确认客户端是否还活着...因为如果客户端网络异常断开(例如客户端崩溃或者拔掉网线等)的时候,服务器端是无法收到客户端断开的通知的
if (!IsSocketAlive(pComKey->sock))
{
//若该socket已经失效
printf("一个客户端的socket已经异常断开(非正常结束)\n");
CancelIo((HANDLE)pComKey->sock);
pComKey->islive = FALSE;
continue;
}
else
{
continue;
}
}
//未知错误
else
{
printf("完成端口遇到未知错误 :%d!\n", dwErr);
CancelIo((HANDLE)pComKey->sock);
pComKey->islive = FALSE;
continue;
}
}
}
}
//从完成端口取出一个成功I O操作的完成包
else
{
// 判断是否有客户端断开了
if (0 == dwIoSize && (READ == lpIOoperData->type || WRITE == lpIOoperData->type))
{
printf("[端口:%s] 客户自己断开了连接!\n", pComKey->sIP);
//当关闭套接字时,如果此时系统还有未完成的异步操作,
//调用CancelIo函数取消等待执行的异步操作,如果函数调用成功,返回TRUE,所有在此套接字上等待的异步操作都被成功的取消。
CancelIo((HANDLE)pComKey->sock);
pComKey->islive = FALSE;
continue;
}
//正常接收到客户端发的包,处理IO端口的请求
else
{
ProcessIO(lpIOoperData, pComKey);
}
}
}
return 0;
}
BOOL ProcessIO(IO_OPERATION_DATA *pIOoperData, COMPLETION_KEY *pComKey)
{
//1.服务器要从客户端收PIN(系统已经执行过数据处理了)
if (pIOoperData->type == READ)
{
EnterCriticalSection(&m_thread);
ZeroMemory(pComKey->MsgBuff, sizeof(pComKey->MsgBuff));
strcpy(pComKey->MsgBuff, pIOoperData->databuf.buf);//将要发送的数据复制到1号客户端的pComKey->MsgBuff中
myqueue.push(pComKey->MsgBuff);
LeaveCriticalSection(&m_thread);
}
else if (pIOoperData->type == WRITE)
{
ZeroMemory(pComKey->MsgBuff, sizeof(pComKey->MsgBuff));
strcpy(pComKey->MsgBuff, pIOoperData->databuf.buf);//将从服务器得到的数据包复制到2号的pComKey->MsgBuff中,继续发送给2号客户端
RecvFunc(pComKey);
}
else if (pIOoperData->type == ACCEPT)
{
//accept 创建的 socket 会自动继承监听 socket 的属性, AcceptEx 却不会.
//因此如果有必要, 在 AcceptEx 成功接受了一个客户端的连接之后, 我们必须调用:
//设置socket的一些属性比如超时等,不调用setsockopt,也不会有什么问题
setsockopt(
pIOoperData->sock,//将要被设置或者获取选项的套接字。
SOL_SOCKET, //选项所在的协议层(此处是套接字层)
SO_UPDATE_ACCEPT_CONTEXT,//需要访问的选项名
(char*)&(pComKey->sock),
sizeof(pComKey->sock)
);
//为新接入的客户新建一个完成键结构,来存放此时接入的客户端socket信息,之前的完成键是属于完成端口的。
LP_COMPLETION_KEY pClientComKey = (LP_COMPLETION_KEY)GlobalAlloc(GPTR, sizeof(COMPLETION_KEY));
pClientComKey->sock = pIOoperData->sock;
//定义客户端地址,服务器本机地址(备用)
SOCKADDR_IN *addrClient = NULL, *addrLocal = NULL;
int nClientLen = sizeof(SOCKADDR_IN), nLocalLen = sizeof(SOCKADDR_IN);
//使用GetAcceptExSockaddrs函数 获得具体的各个地址参数.(这函数没有返回值)
lpfnGetAcceptExSockaddrs(
pIOoperData->buffer,//指向传递给AcceptEx函数接收客户第一块数据的缓冲区
0,//lpoutputBuffer缓冲区的大小,必须和传递给AccpetEx函数的一致
sizeof(SOCKADDR_IN) + 16,//为本地地址预留的空间大小,必须和传递给AccpetEx函数一致
sizeof(SOCKADDR_IN) + 16,//为远程地址预留的空间大小,必须和传递给AccpetEx函数一致
(LPSOCKADDR*)&addrLocal,//用来返回连接的本地地址
&nLocalLen,//用来返回本地地址的长度
(LPSOCKADDR*)&addrClient,//用来返回远程地址
&nClientLen//用来返回远程地址的长度
);
ZeroMemory(pClientComKey->sIP, 100);
sprintf(pClientComKey->sIP, "%s+%d", inet_ntoa(addrClient->sin_addr), addrClient->sin_port); //cliAdd.sin_port ;
printf("客户端接入:[%s]\n", pClientComKey->sIP);
//将新接入的客户端设置一些标志
pClientComKey->first = TRUE;//此时标记为true(表明接下来收的是客户端发的第一条消息)
pClientComKey->islive = TRUE;//此时标记为TRUE(标志着该客户端在连接)
int MyNum = InterlockedIncrement(&IDnum);
pClientComKey->clientID = MyNum;//为新连入的客户端分配完成键序号
pIOoperData->IOnum = MyNum;//为新连入的客户端分配IO数据结构体序号
//将新连入的socket绑定到完成端口
CreateIoCompletionPort((HANDLE)pClientComKey->sock, g_hComPort, (DWORD)pClientComKey, 0); //将监听到的套接字关联到完成端口
//将新接入客户端对应开辟好的完成键、IO结构的指针存放至列表 //并将该新连入的客户端相关信息存到链接列表数组中
_AddToContextList(pClientComKey);
//当新客户连入服务器后,就开始根据自己的程序逻辑进行投递接收请求RecvFunc,或者投递发送请求SendFunc //在新连入的socket投递第一个WSARecv请求
if (1 == pClientComKey->clientID)
{
RecvFunc(pClientComKey);
}
else if (2 == pClientComKey->clientID)
{
EnterCriticalSection(&m_thread);
vector<LP_COMPLETION_KEY>::iterator it1;
for (it1 = m_arrayClientFlag.begin(); it1 != m_arrayClientFlag.end();)
{
//判断是否找到指向了等待PIN码的客户端1,并且1还未断开
if (((*it1)->sock != pClientComKey->sock) && ((*it1)->islive == TRUE))
{
//找到后,向2号套接字上投递1号套接字上的数据PIN
SendFunc(pClientComKey, (*it1)->MsgBuff);
}
it1++;
}
LeaveCriticalSection(&m_thread);
}
RecvFunc(pClientComKey);
AcceptClient(g_sListen);
}
return TRUE;
}
BOOL AcceptClient(SOCKET sListen)
{
DWORD dwBytes;
LP_IO_OPERATION_DATA pIO;
pIO = (LP_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(IO_OPERATION_DATA));
pIO->databuf.buf = pIO->buffer;
pIO->databuf.len = pIO->len = DATA_BUFSIZE;
pIO->type = ACCEPT;
//先创建一个套接字(相比accept有点就在此,accept是接收到连接才创建出来套接字,浪费时间. 这里先准备一个,用于接收连接)
pIO->sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == pIO->sock)
{
printf("创建用于AcceptEX的Socket失败!错误代码: %d", WSAGetLastError());
return false;
}
//将新建立的IO网络操作请求所开辟的结构体存入列表数组
_AddToEachIODataList(pIO);
//调用AcceptEx函数,地址长度需要在原有的上面加上16个字节
//向服务线程投递一个接收连接的的请求
BOOL rc = lpfnAcceptEx(
sListen,//一参本地监听Socket
pIO->sock,//二参为即将到来的客人准备好的Socket
pIO->buffer,// 三参接收缓冲区: 存客人发来的第一份数据、存Client远端地址地址包括IP和端口,
0, //四参定三参数据区长度,0表只连不接收、连接到来->请求完成,否则连接到来+任意长数据到来->请求完成
sizeof(SOCKADDR_IN) + 16,//
sizeof(SOCKADDR_IN) + 16,
&dwBytes,
&(pIO->overlapped)
);
if (FALSE == rc)
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
printf("投递 AcceptEx 请求失败,错误代码%d", WSAGetLastError());
return false;
}
}
return true;
}
BOOL RecvFunc(COMPLETION_KEY *pComKey)
{
DWORD flags = 0;
DWORD recvBytes = 0;
LP_IO_OPERATION_DATA pIOoperData;
pIOoperData = (LP_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(IO_OPERATION_DATA));
pIOoperData->databuf.buf = pIOoperData->buffer;
pIOoperData->databuf.len = pIOoperData->len = DATA_BUFSIZE;
pIOoperData->type = READ;
pIOoperData->sock = pComKey->sock;
//将新建立的IO网络操作请求所开辟的结构体存入列表数组
_AddToEachIODataList(pIOoperData);
ZeroMemory(pIOoperData->buffer, sizeof(pIOoperData->buffer));
if (SOCKET_ERROR == WSARecv(pComKey->sock, &pIOoperData->databuf, 1, &recvBytes, &flags, &pIOoperData->overlapped, NULL))
{
if (ERROR_IO_PENDING != WSAGetLastError())
{
printf("向客户端投递重叠接收失败! %d\n", GetLastError());
return FALSE;
}
}
return TRUE;
}
BOOL SendFunc(COMPLETION_KEY *pComKey, char* msgBuff)
{
DWORD flags = 0;
DWORD SendBytes = 0;
LP_IO_OPERATION_DATA pIOoperData;
pIOoperData = (LP_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(IO_OPERATION_DATA));
pIOoperData->databuf.buf = pIOoperData->buffer;
pIOoperData->databuf.len = pIOoperData->len = DATA_BUFSIZE;
pIOoperData->type = WRITE;
pIOoperData->sock = pComKey->sock;
//将新建立的IO网络操作请求所开辟的结构体存入列表数组
_AddToEachIODataList(pIOoperData);
strcpy(pIOoperData->databuf.buf, msgBuff);
pIOoperData->databuf.len = 100;
if (SOCKET_ERROR == WSASend(pComKey->sock, &pIOoperData->databuf, 1, &SendBytes, flags, &pIOoperData->overlapped, NULL))
{
if (ERROR_IO_PENDING != WSAGetLastError())
{
printf("投递发送重叠失败! 错误码%d\n", GetLastError());
return FALSE;
}
}
return TRUE;
}
// 将客户端的相关信息指针(完成键、IO结构体)存储到连接列表数组,方便后来的查找,端对端通信
void _AddToContextList(LP_COMPLETION_KEY pHandleData)
{
EnterCriticalSection(&m_thread);
m_arrayClientFlag.push_back(pHandleData);
LeaveCriticalSection(&m_thread);
}
void _AddToEachIODataList(LP_IO_OPERATION_DATA pHandleIO)
{
EnterCriticalSection(&m_thread);
m_EachIOData.SocketEachIODate.push_back(pHandleIO);
LeaveCriticalSection(&m_thread);
}
// 有客户端退出时,列表数组某个客户端的相关信息指针删掉,
void _DelToContextList(LP_COMPLETION_KEY pHandleData)
{
//vetor类型 http://blog.csdn.net/duan19920101/article/details/50717748
EnterCriticalSection(&m_thread);
vector<LP_COMPLETION_KEY>::iterator it;
for (it = m_arrayClientFlag.begin(); it != m_arrayClientFlag.end();)
{
//判断是否找到指向了那个退出客户端的指针
if ((*it)->clientID == pHandleData->clientID)
{
GlobalFree(pHandleData); //该函数是释放指定的全局内存块
printf(" 开辟的内存已经释放\n");
it = m_arrayClientFlag.erase(it);
printf("<=列表中客户端完成键信息已经被清除\n");
}
else
{
it++;
}
}
LeaveCriticalSection(&m_thread);
}
void _DelToIOtList(SOCKET m_SOCKET)
{
//vetor类型 http://blog.csdn.net/duan19920101/article/details/50717748
EnterCriticalSection(&m_thread);
int IOnum = 0;
vector<LP_IO_OPERATION_DATA>::iterator it;
for (it = m_EachIOData.SocketEachIODate.begin(); it != m_EachIOData.SocketEachIODate.end();)
{
//判断是否找到指向了那个退出客户端的指针
if ((*it)->sock == m_SOCKET)
{
GlobalFree((*it));
it = m_EachIOData.SocketEachIODate.erase(it);
printf("%d<=列表中属于该socket上的单IO结构已经被清除\n", ++IOnum);
}
else
{
it++;
}
}
LeaveCriticalSection(&m_thread);
}
bool IsSocketAlive(SOCKET s)
{
int nByteSent = send(s, "", 0, 0);
if (-1 == nByteSent)
return false;
return true;
}
char * left(const char * str, int n)
{
if (n < 0)
n = 0;
char * p = new char[n + 1];
int i;
for (i = 0; i < n && str[i]; i++)
p[i] = str[i]; // copy characters
while (i <= n)
p[i++] = '\0'; // set rest of string to '\0'
return p;
}