今天继续学习socket编程,期待的APEC会议终于在京召开了,听说昨晚鸟巢那灯火通明,遍地礼花,有点08年奥运会的架势,有种冲动想去瞅见一下习大大的真容,"伟大的祖国,我爱你~~~",话不多说,进入学习正题:
这节会初步接触一下并发的一些知识,
用select实现的并发服务器,能达到的并发数,受两方面限制
①、一个进程能打开的最大文件描述符限制。这可以通过调整内核参数。
那最大文件描述符是多少呢?可以用以下命令查看出来:
其实这个数是可以进行调整的,但是前提得是root用户才有权限调整,如下:
此时再更改:
以上是通过命令调整进程支持的最大描述符个数,那用代码能改么?当然也能,下面来看下如何编写:
获取资源的限制可以通过getrlimit()函数,查看man帮助:
其中第一个参数我们需要设置它:
于是乎编写一个测试程序先来获得最大描述符的个数:
编译运行:
下面来调整一下它的大小:
nofile_limit.c:
#include <signal.h>
#include <sys/wait.h> #include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h> #define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while() int main(void)
{
struct rlimit rl;
if (getrlimit(RLIMIT_NOFILE, &rl) < )
ERR_EXIT("getrlimit"); printf("%d\n", (int)rl.rlim_max); rl.rlim_cur = 2048;
rl.rlim_max = 2048;
if (setrlimit(RLIMIT_NOFILE, &rl) < 0)//调整描述符限制
ERR_EXIT("setrlimit"); if (getrlimit(RLIMIT_NOFILE, &rl) < 0)//再次打印修改后的大小
ERR_EXIT("getrlimit"); printf("%d\n", (int)rl.rlim_max);
return ;
}
编译运行:
那么下面先将切至root用户,然后再执行该程序:
但是,它只能改变当前进程的大小,如果用ulimit -n查看,看到的大小是父进程的,还是没有变:
关于调整内核增大描述符个数,是很容易改变select并发数的,但是还有另外一个因素,就不那么容易了,因为得编译内核才行,如下:
关于最大限制,下面用程序来验证下,先编写一个简单客户端,不断循环向服务端连接,看最终连接成功的个数,不加其它逻辑:
conntest.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h> #include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h> #define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while() int main(void)
{
int count = ;//用来记数,当客户端与服务端连接成功则加1
while()
{
int sock;
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < )
{
ERR_EXIT("socket");
} struct sockaddr_in servaddr;
memset(&servaddr, , sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons();
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < )
ERR_EXIT("connect"); struct sockaddr_in localaddr;
socklen_t addrlen = sizeof(localaddr);
if (getsockname(sock, (struct sockaddr*)&localaddr, &addrlen) < )
ERR_EXIT("getsockname"); printf("ip=%s port=%d\n", inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
printf("count = %d\n", ++count);//将个数打印出来 } return ;
}
服务端还是用之前的,代码如下:
echosrv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h> #include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h> #define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while() ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf; while (nleft > )
{
if ((nread = read(fd, bufp, nleft)) < )
{
if (errno == EINTR)
continue;
return -;
}
else if (nread == )
return count - nleft; bufp += nread;
nleft -= nread;
} return count;
} ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf; while (nleft > )
{
if ((nwritten = write(fd, bufp, nleft)) < )
{
if (errno == EINTR)
continue;
return -;
}
else if (nwritten == )
continue; bufp += nwritten;
nleft -= nwritten;
} return count;
} ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while ()
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return ret;
}
} ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while ()
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < )
return ret;
else if (ret == )
return ret; nread = ret;
int i;
for (i=; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+);
if (ret != i+)
exit(EXIT_FAILURE); return ret;
}
} if (nread > nleft)
exit(EXIT_FAILURE); nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE); bufp += nread;
} return -;
} void echo_srv(int conn)
{
char recvbuf[];
while ()
{
memset(recvbuf, , sizeof(recvbuf));
int ret = readline(conn, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
if (ret == )
{
printf("client close\n");
break;
} fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
}
} void handle_sigchld(int sig)
{
/* wait(NULL);*/
while (waitpid(-, NULL, WNOHANG) > )
;
} void handle_sigpipe(int sig)
{
printf("recv a sig=%d\n", sig);
} int main(void)
{
signal(SIGPIPE, handle_sigpipe);
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < )
ERR_EXIT("socket"); struct sockaddr_in servaddr;
memset(&servaddr, , sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons();
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); int on = ;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < )
ERR_EXIT("setsockopt"); if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < )
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < )
ERR_EXIT("listen"); struct sockaddr_in peeraddr;
socklen_t peerlen;
int conn; int i;
int client[FD_SETSIZE];
int maxi = ; for (i=; i<FD_SETSIZE; i++)
client[i] = -; int nready;
int maxfd = listenfd;
fd_set rset;
fd_set allset;
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
while ()
{
rset = allset;
nready = select(maxfd+, &rset, NULL, NULL, NULL);
if (nready == -)
{
if (errno == EINTR)
continue; ERR_EXIT("select");
}
if (nready == )
continue; if (FD_ISSET(listenfd, &rset))
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -)
ERR_EXIT("accept"); for (i=; i<FD_SETSIZE; i++)
{
if (client[i] < )
{
client[i] = conn;
if (i > maxi)
maxi = i;
break;
}
} if (i == FD_SETSIZE)
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
FD_SET(conn, &allset);
if (conn > maxfd)
maxfd = conn; if (--nready <= )
continue; } for (i=; i<=maxi; i++)
{
conn = client[i];
if (conn == -)
continue; if (FD_ISSET(conn, &rset))
{
char recvbuf[] = {};
int ret = readline(conn, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
if (ret == )
{
printf("client close\n");
FD_CLR(conn, &allset);
client[i] = -;
close(conn);
} fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf)); if (--nready <= )
break; }
}
}
return ;
}
编译运行:
其中异常是报在这一段代码:
为了看到服务端的连接数,也打印一下个数:
echosrv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h> #include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h> #define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while() ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf; while (nleft > )
{
if ((nread = read(fd, bufp, nleft)) < )
{
if (errno == EINTR)
continue;
return -;
}
else if (nread == )
return count - nleft; bufp += nread;
nleft -= nread;
} return count;
} ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf; while (nleft > )
{
if ((nwritten = write(fd, bufp, nleft)) < )
{
if (errno == EINTR)
continue;
return -;
}
else if (nwritten == )
continue; bufp += nwritten;
nleft -= nwritten;
} return count;
} ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while ()
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return ret;
}
} ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while ()
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < )
return ret;
else if (ret == )
return ret; nread = ret;
int i;
for (i=; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+);
if (ret != i+)
exit(EXIT_FAILURE); return ret;
}
} if (nread > nleft)
exit(EXIT_FAILURE); nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE); bufp += nread;
} return -;
} void echo_srv(int conn)
{
char recvbuf[];
while ()
{
memset(recvbuf, , sizeof(recvbuf));
int ret = readline(conn, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
if (ret == )
{
printf("client close\n");
break;
} fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
}
} void handle_sigchld(int sig)
{
/* wait(NULL);*/
while (waitpid(-, NULL, WNOHANG) > )
;
} void handle_sigpipe(int sig)
{
printf("recv a sig=%d\n", sig);
} int main(void)
{
int count = 0;//客户端连接个数累加器
signal(SIGPIPE, handle_sigpipe);
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < )
ERR_EXIT("socket"); struct sockaddr_in servaddr;
memset(&servaddr, , sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons();
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); int on = ;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < )
ERR_EXIT("setsockopt"); if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < )
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < )
ERR_EXIT("listen"); struct sockaddr_in peeraddr;
socklen_t peerlen;
int conn; int i;
int client[FD_SETSIZE];
int maxi = ; for (i=; i<FD_SETSIZE; i++)
client[i] = -; int nready;
int maxfd = listenfd;
fd_set rset;
fd_set allset;
FD_ZERO(&rset);
FD_ZERO(&allset);
FD_SET(listenfd, &allset);
while ()
{
rset = allset;
nready = select(maxfd+, &rset, NULL, NULL, NULL);
if (nready == -)
{
if (errno == EINTR)
continue; ERR_EXIT("select");
}
if (nready == )
continue; if (FD_ISSET(listenfd, &rset))
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -)
ERR_EXIT("accept"); for (i=; i<FD_SETSIZE; i++)
{
if (client[i] < )
{
client[i] = conn;
if (i > maxi)
maxi = i;
break;
}
} if (i == FD_SETSIZE)
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
printf("count = %d\n", ++count);//当客户端连接成功,打印一下个数
FD_SET(conn, &allset);
if (conn > maxfd)
maxfd = conn; if (--nready <= )
continue; } for (i=; i<=maxi; i++)
{
conn = client[i];
if (conn == -)
continue; if (FD_ISSET(conn, &rset))
{
char recvbuf[] = {};
int ret = readline(conn, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
if (ret == )
{
printf("client close\n");
FD_CLR(conn, &allset);
client[i] = -;
close(conn);
} fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf)); if (--nready <= )
break; }
}
}
return ;
}
编译运行:
下面来解释一下为啥服务端会收到了很多client close:
由于客户端有关闭,所以服务端打印的count=1021个就不准了,下面来解决一下客户端关闭的问题,其很简单的,就是在发送1022个连接时,先休眠一段时间,然后再让进程退出既可,具体如下:
编译运行:
②、select中的fd_set集合容量的限制(FD_SETSIZE,该宏默认是1024) ,这需要重新编译内核。
这是第二个受限的地方,从代码中来解释:
下面要学习一个函数,它没有FD_SETSIZE的限制,如下:
通过man帮助查看下,struct pollfd结构体的结构:
那结构体中的events都可以取哪些值呢,如下:
对比select函数如下:
下面用poll函数来改装我们的代码,主要是改装服务端,来突然FD_SETSIZE大小的限制,下面会一点点对比原来的做法进行改装:
修改如下:
替换如下:
下面当事件产生了,则需要判断一下是哪些文件描述符产生了事件,那poll方式是如何判断的呢?具体如下:
替换如下:
然后接受客户端的连接,并加保存起来,修改如下:
替换如下:
接下来,处理已连接套接口事件,具体修改如下:
替换如下:
接下来编译运行一下:
需要包含头文件,如下:
再次编译运行:
可见改用poll函数之后的程序运行一切正常,下面再来测试并发数:
下面就来调整描述符最大个数为2048,注意:ulimit调整的是当前进程,所以要想服务端与客户端都突破1024这个限制,都需要切至root用户进行更改,切记切记~
这时再运行看下这时的连接数是否能突破1024呢?
所以,这就是poll针对select的改进,下面贴出服务端,客户端的代码:
echosrv.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <poll.h> #include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h> #define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while() ssize_t readn(int fd, void *buf, size_t count)
{
size_t nleft = count;
ssize_t nread;
char *bufp = (char*)buf; while (nleft > )
{
if ((nread = read(fd, bufp, nleft)) < )
{
if (errno == EINTR)
continue;
return -;
}
else if (nread == )
return count - nleft; bufp += nread;
nleft -= nread;
} return count;
} ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nleft = count;
ssize_t nwritten;
char *bufp = (char*)buf; while (nleft > )
{
if ((nwritten = write(fd, bufp, nleft)) < )
{
if (errno == EINTR)
continue;
return -;
}
else if (nwritten == )
continue; bufp += nwritten;
nleft -= nwritten;
} return count;
} ssize_t recv_peek(int sockfd, void *buf, size_t len)
{
while ()
{
int ret = recv(sockfd, buf, len, MSG_PEEK);
if (ret == - && errno == EINTR)
continue;
return ret;
}
} ssize_t readline(int sockfd, void *buf, size_t maxline)
{
int ret;
int nread;
char *bufp = buf;
int nleft = maxline;
while ()
{
ret = recv_peek(sockfd, bufp, nleft);
if (ret < )
return ret;
else if (ret == )
return ret; nread = ret;
int i;
for (i=; i<nread; i++)
{
if (bufp[i] == '\n')
{
ret = readn(sockfd, bufp, i+);
if (ret != i+)
exit(EXIT_FAILURE); return ret;
}
} if (nread > nleft)
exit(EXIT_FAILURE); nleft -= nread;
ret = readn(sockfd, bufp, nread);
if (ret != nread)
exit(EXIT_FAILURE); bufp += nread;
} return -;
} void echo_srv(int conn)
{
char recvbuf[];
while ()
{
memset(recvbuf, , sizeof(recvbuf));
int ret = readline(conn, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
if (ret == )
{
printf("client close\n");
break;
} fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf));
}
} void handle_sigchld(int sig)
{
/* wait(NULL);*/
while (waitpid(-, NULL, WNOHANG) > )
;
} void handle_sigpipe(int sig)
{
printf("recv a sig=%d\n", sig);
} int main(void)
{
int count = ;
signal(SIGPIPE, handle_sigpipe);
signal(SIGCHLD, handle_sigchld);
int listenfd;
if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < )
ERR_EXIT("socket"); struct sockaddr_in servaddr;
memset(&servaddr, , sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons();
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); int on = ;
if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < )
ERR_EXIT("setsockopt"); if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < )
ERR_EXIT("bind");
if (listen(listenfd, SOMAXCONN) < )
ERR_EXIT("listen"); struct sockaddr_in peeraddr;
socklen_t peerlen;
int conn; int i;
struct pollfd client[];
int maxi = ; for (i=; i<; i++)
client[i].fd = -; int nready;
client[].fd = listenfd;//第一个事件是监听套接字
client[].events = POLLIN;//代表可读事件 while ()
{
nready = poll(client, maxi+, -);
if (nready == -)
{
if (errno == EINTR)
continue; ERR_EXIT("select");
}
if (nready == )
continue; if (client[].revents & POLLIN)
{
peerlen = sizeof(peeraddr);
conn = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen);
if (conn == -)
ERR_EXIT("accept"); for (i=; i<; i++)
{
if (client[i].fd < )
{
client[i].fd = conn;
if (i > maxi)
maxi = i;
break;
}
} if (i == )
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
printf("ip=%s port=%d\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));
printf("count = %d\n", ++count); client[i].events = POLLIN; if (--nready <= )
continue; } for (i=; i<=maxi; i++)
{
conn = client[i].fd;
if (conn == -)
continue; if (client[i].events & POLLIN)
{
char recvbuf[] = {};
int ret = readline(conn, recvbuf, );
if (ret == -)
ERR_EXIT("readline");
if (ret == )
{
printf("client close\n");
client[i].fd = -;
close(conn);
} fputs(recvbuf, stdout);
writen(conn, recvbuf, strlen(recvbuf)); if (--nready <= )
break; }
}
}
return ;
}
contest.c:
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h> #include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h> #define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while() int main(void)
{
int count = ;//用来记数,当客户端与服务端连接成功则加1
while()
{
int sock;
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < )
{
sleep();
ERR_EXIT("socket");
} struct sockaddr_in servaddr;
memset(&servaddr, , sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons();
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1"); if (connect(sock, (struct sockaddr*)&servaddr, sizeof(servaddr)) < )
ERR_EXIT("connect"); struct sockaddr_in localaddr;
socklen_t addrlen = sizeof(localaddr);
if (getsockname(sock, (struct sockaddr*)&localaddr, &addrlen) < )
ERR_EXIT("getsockname"); printf("ip=%s port=%d\n", inet_ntoa(localaddr.sin_addr), ntohs(localaddr.sin_port));
printf("count = %d\n", ++count);//将个数打印出来 } return ;
}
好了,今天内容学到这,下节继续~