我按理解整了个基于select模式的单进程多路复用并发服务器,并写了个简单的测试程序测了下,虽然离实用还差得远,但用来练习select够用了。
至于如何实现的细节,代码注释算比较清楚,就不多弄了。
一。服务器部份
单进程并发服务器代码:
/*************************************************
Author: xiongchuanliang
Description: I/O复用(异步阻塞)模式_单进程+select模式服务器
编译命令:
Linux:
g++ -g -o tcpserverasynselect2 tcpserverasynselect2.cpp -m64 -I./common
./tcpserverasynselect2
**************************************************/
// 客户端代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "initsock.h"
#include "common.h"
#include <time.h>
//客户端Socket信息结构体
typedef struct _client_sock{
int fd;//客户端socket描述符
struct sockaddr_in addr; //客户端地址信息结构体
time_t lastseconds;//可依这个计算空闲时间,空闲太长的连接可以关闭。
} client_sock;
CInitSock initSock;
//#define IDLE_MAXTIME xxx //最长空闲时长 DEMO忽略
//#define SELECT_MAXWAITTIME xxxxx
#define NET_TIMEOUT 5000 //发送超时时限 5s
int main(int argc, char* argv[])
{
//fd_set 是通过bit位来存放文件描述符,可通过sizeof(fd_set) * 8
//来得可支持的最大文件描述符数,但受系统限制,基本达不到
fd_set readset;//select()函数 readset
int nSelectMaxfd = 0;//select() maxfdp参数
int nSelectRet = 0;//select() 返回值
//int nCheckTimeval = 5;//轮询间隔
SOCKET sListen,sClient,recvSockfd;
client_sock arrClientSock[FD_SETSIZE]; //存放需要select()监控的fd.
int arrClientSockConnAmt = 0;//实际监控fd数
socklen_t nAddrlen = sizeof(struct sockaddr_in);
time_t tCurrSysTime;
char recvData[MAXDATASIZE]={0};
int i = 0 ;
//创建套接字
sListen = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(sListen == INVALID_SOCKET)
{
PrintError("socket() failed.\n");
exit(EXIT_FAILURE);
}
//bind() 地址可立即重用
int nResAddr = 1;
setsockopt( sListen, SOL_SOCKET, SO_REUSEADDR, (const char*)&nResAddr, sizeof(nResAddr) );
int nNetTimeout = NET_TIMEOUT;
//设置发送超时时限
setsockopt(sListen,SOL_SOCKET,SO_SNDTIMEO,(char *)&nNetTimeout,sizeof(int) );
//设置接收超时时限
setsockopt(sListen,SOL_SOCKET,SO_RCVTIMEO,(char *)&nNetTimeout,sizeof(int));
//绑定本地IP和端口到套接字
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVPORT); //大于1024且小于65535
server_addr.sin_addr.s_addr = INADDR_ANY;
bzero(&(server_addr.sin_zero),8);
if(bind(sListen,(struct sockaddr *)&server_addr,sizeof(struct sockaddr)) == SOCKET_ERROR)
{
PrintError("bind() failed.");
exit(EXIT_FAILURE);
}
//开始监听
// listen(套接字,监听队列中允许保持的尚未处理的最大连接数量)
// listen仅用在支持连接的套接字上,如SOCK_STREAM类型的套接字
// 如果连接数超过BACKLOG,客户端将收到WSAECONNREFUSED错误
if(listen(sListen, BACKLOG) == SOCKET_ERROR) //FD_SETSIZE
{
PrintError("sListen() failed.");
exit(EXIT_FAILURE);
}
//初始化
for(int i=0;i<FD_SETSIZE;i++){
arrClientSock[i].fd = -1;
}
nSelectMaxfd = sListen; //设置select()函数maxfdp参数
//循环接收数据
while(true)
{
struct sockaddr_in remoteAddr;
tCurrSysTime = time(NULL); //系统当前时间
//重建fd_set集合
FD_ZERO(&readset); //每次循环须重新初始化,否则select不能检测描述符变化
//将数组中的fd清理并赋给readset
arrClientSockConnAmt = 0;
FD_SET(sListen,&readset); //将socket描述符加入检测集合
nSelectMaxfd = sListen; //设置select()函数maxfdp参数
for(i=0;i< FD_SETSIZE;i++)
{
if(arrClientSock[i].fd > 0) //从描述符数组中找到一个还没用的保存进去
{
//对于空闲时间太长的,可能客户端已非常规的断开如断网,停电之类,将其关闭并从数组中删除,DEMO省略
/*if( tCurrSysTime - arrClientSock[i].lastseconds > IDLE_MAXTIME)
{
close(arrClientSock[i].fd);
arrClientSock[i].fd = -1;
arrClientSock[i].lastseconds = 0;
memset(&arrClientSock[i].addr,0,sizeof(struct sockaddr_in));
}else{*/
FD_SET(arrClientSock[i].fd,&readset);
arrClientSockConnAmt ++;
//maxfdp
if( arrClientSock[i].fd > nSelectMaxfd){
nSelectMaxfd = arrClientSock[i].fd ;
}
//}
} // end if > 0
}
//调用select
//超时则返回0,否则返回发生事件的文件描述符的个数
nSelectRet = select(nSelectMaxfd+1,&readset,NULL,NULL,NULL); //设置为阻塞状态
//struct sockaddr_in remoteAddr;
//struct timeval timeout={nCheckTimeval,0}; //阻塞式select, 超时时间. timeval{一个是秒数,另一个是毫秒数}
//nSelectRet = select(nSelectMaxfd+1,&readset,NULL,NULL,&timeout); //设置select在超时时间内阻塞
if( FD_ISSET(sListen,&readset) )
{
printf("select() 返回值 = %d. \n",nSelectRet );
printf("accept() 连接客户端.\n");
//调用accept,连接一个客户端
sClient = accept(sListen,(struct sockaddr *)&remoteAddr,(socklen_t *)&nAddrlen);
if( sClient <= 0) // == INVALID_SOCKET) //-1
{
PrintError("accept() failed.");
continue;
}
//描述符数组已满
if( arrClientSockConnAmt + 1 > FD_SETSIZE )
{
printf("ERROR: 等待连接的客户端太多!超出处理能力。\n");
continue;
}
//将连接上的客户端放入数组,
//后续可以再写个for,检查已正常close的并把空闲太长的close掉,
//把arrClientSockConnAmt设为实际值,并注意设置nSelectMaxfd的值
for(i=0;i< FD_SETSIZE;i++)
{
if(arrClientSock[i].fd < 0) //从描述符数组中找到一个还没用的保存进去
{
arrClientSock[i].fd = sClient;
arrClientSock[i].addr = remoteAddr;
arrClientSock[i].lastseconds = time(NULL);
printf("连接上的客户端IP = %s. \n",inet_ntoa(arrClientSock[i].addr.sin_addr) );
arrClientSockConnAmt ++;
//maxfdp
if( sClient > nSelectMaxfd){
nSelectMaxfd = sClient;
}
break;
}
}
//如果select()检测到多个文件描述符并发时,则继续while,生成新的socket放入数组
nSelectRet -= 1;
if(nSelectRet <= 0){
continue;//如果没有新客户端连接,则继续循环
}
} //end if( FD_ISSET(sListen,&readset) )
//把select()函数返回的有发生事件的Socket描述符保存完后,统一在这做响应处理
for(i = 0;i<arrClientSockConnAmt; i++)
{
//如果客户端描述符小于0,则没有连接
if( arrClientSock[i].fd < 0){
continue;
}
recvSockfd = arrClientSock[i].fd;
if( FD_ISSET(recvSockfd,&readset) ) //检查可读
{
//接收数据
memset(recvData,0,sizeof(recvData)); //重新清空缓冲区
printf("recv() fd[%d].\n",i);
int recvbytes = recv(recvSockfd, recvData, MAXDATASIZE, 0);
if( recvbytes == 0)
{
printf("recv() no data!\n");
close(recvSockfd);
FD_CLR(recvSockfd,&readset);
arrClientSock[i].fd=-1;
arrClientSockConnAmt --;
printf("close() \n");
}else if( recvbytes < 0){
PrintError("recv() failed");
close(recvSockfd);
FD_CLR(recvSockfd,&readset);
arrClientSock[i].fd=-1;
arrClientSockConnAmt --;
printf("close() \n");
;
//exit(EXIT_FAILURE); //刷屏
}else if(recvbytes > 0){
recvData[recvbytes]='\0';
printf("收到信息:%s\n",recvData);
//发送数据到客户端
char sendData[500] ={0};
strcpy(sendData,"Hello client!\n");
strcat(sendData,recvData);
send(recvSockfd, sendData, strlen(sendData), 0);
//更新一下fd最后响应时间
arrClientSock[i].lastseconds = time(NULL);
//如果没有新客户端连接,则break for
if( (--nSelectRet) <= 0){
break;
}
} //end if recv
} //end if( FD_ISSET(recvSockfd,&readset) )
} //end for
} //end while(1)
//关闭监听套接字
close(sListen);
exit(EXIT_SUCCESS);
}
二。测试 部份
用于测试的代码:
/*************************************************
Author: xiongchuanliang
Description: 通过在不同机器或会话窗口运行测试程序,生成多个线程连接Socket服务器来完成测试
编译命令:
Linux:
g++ -o testthread2 testthread2.cpp -m64 -I./common -lpthread
./testthread2
**************************************************/
// 客户端代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "initsock.h"
#include "common.h"
#include <pthread.h>
#include <string.h>
#include <sys/stat.h>
//指定要连接的服务器ip
#define SERVIP"127.0.0.1"
#define MAX_THREAD 50 //生成线程数
CInitSock initSock;
void *TestSocket(void *p); //连接服务器
int main(int argc, char* argv[])
{
pthread_t tpid[MAX_THREAD];
for(int i=0;i< MAX_THREAD - 1;i++)
{
if( pthread_create(&tpid[i],NULL,&TestSocket,&i) != 0 )
{
fprintf(stderr,"Create Thread[%d] Error:%s\n",i,strerror(errno));
exit(EXIT_FAILURE);
}
//pthread_join(tpid[i],NULL);
}
sleep(10);
exit(EXIT_SUCCESS);
}
void *TestSocket(void *p)
{
int ti = *((int *)p);
pid_t pid;
pid = getpid();
pthread_t tid;
tid = pthread_self();
time_t ttm = time(NULL);
char testMsg[100] = {0};
snprintf(testMsg,100,"thread id=%lu pid=%u ttm=%d \n",tid, (unsigned int)pid,ttm);
//建立套接字
SOCKET sclient = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(sclient == INVALID_SOCKET)
{
PrintError("invalid() failed");
exit(EXIT_FAILURE);
}
//指定要连接的服务器地址和端口
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVPORT);
server_addr.sin_addr.s_addr =inet_addr(SERVIP);
memset(&(server_addr.sin_zero),0,8);
//将套接字连接上服务器
if( connect(sclient,(struct sockaddr *)&server_addr,sizeof(struct sockaddr) ) == SOCKET_ERROR)
{
PrintError("connect() failed");
exit(EXIT_FAILURE);
}
//发送数据到服务端
send(sclient,testMsg,strlen(testMsg),0);
//接收返回的数据
char recvData[MAXDATASIZE] = {0};
int recvbytes = recv(sclient,recvData,MAXDATASIZE,0);
if( recvbytes == 0)
{
printf("thread id=%lu recv() no data!\n",tid);
}else if( recvbytes < 0)
{
PrintError("recv() failed");
exit(EXIT_FAILURE);
}else if( recvbytes > 0)
{
recvData[recvbytes]='\0';
printf("thread id=%lu tm=%d \n服务端返回信息:%s\n",tid,time(NULL),recvData);
}
close(sclient);
return NULL;
}
测试效果图:
代码中写到的头文件请看: 网络编程(1)跨平台的Socket同步阻塞工作模式例子
MAIL: xcl_168@aliyun.com
BLOG: http://blog.csdn.net/xcl168