一、大致思路
如图所示,为线程池的基本框架
首先定义服务器端的数据类型
struct data{
pid_t pid; //进程pid
int fd; //管道id
short busy; //进程是否忙碌 0非忙碌 1忙碌
}
父进程流程
- 初始化管道,创建子进程,每个子进程等待父进程发送任务
- 初始化socket,bind端口,listen
- 初始化epoll句柄,注册sfd,注册子进程对应的每一条管道在父进程的哪一端
- while(1){
开始epoll_wait;
if(如果sfd可读){ //有客户端请求
new_fd=accept();
分配一个空闲的进程
把new_fd传递给这个进程
把这个进程标记为忙碌
}
if(管道在父进程的一端可读){ //子进程已经完成传输任务
就把该管道对应的子进程标识为非忙碌
}
子进程流程
while(1)
{
1、等待接收父进程发过来的new_fd
2、发送文件给客户端
3、通知父进程非忙碌
}
举一个那种先买单后吃饭,服务员懒散不主动的小店的例子。父进程相当于收银台的老板,是个管理者,只负责给客户买单和给厨房和服务员(子进程们,既进程池)分配任务;客户付钱买单后,服务员和厨师就服务客户(传输数据)。客户走后,服务员没活干了,就跟老板汇报一声,等老板再分配。
难点:
1、epoll监控那些事件?
2、父进程如何传递描述符给子进程?
3、内存管理?
4、如何传递一个文件,双方协商好什么时候开始发送,什么时候发送完?
5、当发送大文件时,可能会出错。socket的缓冲区只有64K。极有可能在网络中传输的速度不匹配。怎么办?
6、如何看两个文件完全相同 ?
二、难点分析
1.epoll监控事件分析
epoll代码放在父进程执行,子进程只需做发送数据的任务即可。
首先,服务器端的sfd套接字一定要监控是否可读。如果可读,则说明有客户端发来连接请求。(老板要负责先给顾客先点单买账)
接着,要监控父进程这一端的管道是否可读,用于在传输任务结束后,子进程向父进程发送终结信号。(老板要负责给空闲的服务员安排任务)
所以如图所示,红点均为epoll所监控事件。
2.父进程传递描述符给子进程
如果你这么想:父进程打开一个文件 fd,通过管道把fd给子进程,那就大错特错了。文件描述符表面上是一个非负整数,但是实际上是一个索引值,指向内核为每个进程所维护的该进程打开文件的记录表。所以,父进程和子进程各有一个记录表,两个记录表都不一样,传一个整数给子进程,怎么可能有作用?
那么为什么不先创建fd,然后再fork,这样fork出来的子进程就同主进程的一样了,这样就相当于已经把fd传过来了。这种方法也是不可取的,因为实在应用的时候,一个服务器启动多少个进程或线程都已经定好了。需要让子进程处于待命状态,这样效率才最佳。所以要先创建子进程。
所以怎么才能把一个进程描述符的控制信息,传递给另一个进程?
这里用到了socketpair,sendmsg,recvmsg三个函数。
第1步,初始化socketpair类型描述符
int fds[2];
socketpair(AF_LOCAL,SOCK_STREAM,0,fds);
必须要用这个管道。
socketpair创建了一对无名的套接字描述符,只能用于父子进程之间。描述符存储于一个二元数组,例如fds[2]。这对套接字可以进行全双工通信,每一个描述符既可以读也可以写。
注意:向fds[0]中写入,就只能从fds[1]中读取,无法从fds[0]读取;也可以在fds[1]中写入,然后只能从fds[0]中读取,无法从fds[1]读取;但是,若没有在0端写入,而从1端读取,则1端的读取操作会阻塞,即使在1端写入,也不能从1读取,仍然阻塞。
第3步:sendmsg接口发送描述符
#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
与第3步的recvmsg函数是一对,可以用于传输内核控制信息,故可以用这一对函数实现父子进程共享同一文件的描述符。
sockfd即sockpair初始化的描述符fds[0]或fds[1]。
Sendmsg关键是初始化msghdr结构体。
struct msghdr {
void *msg_name; //没用,memset为0即可
socklen_t msg_namelen; //没用,memset为0即可
struct iovec *msg_iov; //用户态信息结构体指针。这个机构体跟一次可以写多个buf有关。虽然不传内容,但是因为有**的问题,所以必须要写点东西。
size_t msg_iovlen; //用户态信息结构体数组成员个数。
void *msg_control; //内核控制信息结构体指针,即下面的cmsghdr结构体地址。告诉它我们要传递内核的信息。
size_t msg_controllen; //内核控制信息结构体长度,cmsghdr结构体的长度。这个长度只能是字节,因为前面是void * 类型,没有类型,所以不可能是个数.
int msg_flags; //没用,0
};
更多的sendmsg成员解析或百度或翻课件。
*msg_control就是下面这个结构体的指针:
struct cmsghdr{ //这是一个变长结构体。
socklen_t cmsg_len; //变长结构体的长度。
int cmsg_level; //填SOL_SOCKET即可。
int cmsg_type; //填SCM_RIGHTS即可。
//followed by unsigned char cmsg_data[]; //接下来是真正的控制信息数据,放什么什么都行。在本文中,我们存放fd.
};
因为前三个成员变量的长度是一定的,所以可以通过偏移找到最后一个成员的首地址,也可以通过接口SMSG_DATA()来得到最后一个成员的首地址。
我们用以下代码给struct cmsghdr的控制信息区域赋值:
*(int*)CMSG_DATA(cmsg)=fd; //相当于*(int*)((char*)cnsg+12)=fd;
cmsg_len成员怎么赋值?
变长结构体长度的计算就是前三个成员变量的长度(3*4)加上最后一个成员变量的长度。本文中最后一个成员变量中传递fd,所以最后一个成员变量的长度为4。所以cmsg_len等于16。除了自己计算的方法,还可以通过接口:CMSG_LEN()来得到长度,参数为最后一个成员变量的长度。即:
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
下图为struct msghdr 和struct cmsghdr的关系图:
第3步: recvmsg接收文件描述符
#include <sys/socket.h>
ssize_t recvmsg(int sockfd, struct msghdr * msg, int flag);
接收的msghdr结构体初始化和sendmsg几乎完全一致,多了一句:
*fd=*(int*)CMSG_DATA(cmsg);
3、内存管理:ARC(引用计数) GC(自动回收)
引用计数:一个结构体或对象,若有一个进程使用它,使用他一次引用计数就加1。只有当为0时才回收。
C、C++使用引用计数,java使用GC。
当把new_fd发送给了子进程之后,new_fd的引用计数为2。
可以当把new_fd发给了子进程之后,就关闭new_fd。这样引用计数就变回了1。当子进程给客户端发送完数据,再关闭对应的new_fd。
4、如何传递一个文件,双方协商好什么时候开始发送,什么时候发送完?
首先定义数据传输的数据结构:
typedef struct{ //小火车,只有车长才知道后面有多少个字节。这是一个简单的协议。这就是上层协议。
int len; //小火车:接下来我要发多少。
char buf[1000]; //存放数据。
}train;
服务端
1、发送文件名给客户端,发送的t.len=strlen(filename),再strcpy(t.buf,filename)
2、打开文件
while(读取的字符数>0)
{
然后读取文件内容1000,发送文件内容1000给客户端
}
3、发送结束
接受端
1、接收文件名,并以这个名字新建一个文件,并打开每次先读4个字节
recv(sfd,buf,len);
2、while(1)接收1000,并write到对应文件
3、接收到结束len=0;
要注意文件内容读入t.buf中,文件长度为t.len=sizeof(t.buf),绝不能用strlen,这是因为文件名是字符串,可以用strlen;而文件内容不一定全是字符,很有可能有很多0。
5、当发送大文件时,可能会出错。socket的缓冲区只有64K。极有可能在网络中传输的速度不匹配。比如,发送1000字节内容,因为网络原因,导致客户端只收到200字节。但是火车头告诉他有1000字节的内容,于是写入文件的1000字节数据中,只有前200是有效的,后800字节全是0。
所以,就要重写send和recv函数。每一次发送或接受,要记录下返回值,既真正发送或接受的字符数。当接受数等于要发生的总数据长度时,说明传输完成。
void sendn(int sfd,char* buf,int len)
{
int total=0;//已发送的字节数
int ret;
while(total<len)
{
ret=send(sfd,buf+total,len-total,0);
total=total+ret;
}
}
void recvn(int sfd,char* buf,int len)
{
int total=0;
int ret;
while(total<len)
{
ret=recv(sfd,buf+total,len-total,0);
total=total+ret;
}
}
6、如何看两个文件完全相同 ?
md5查看内容是否一样。闪电算法:做一致性校验。shell命令md5sum file
三、代码
服务端
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <strings.h>
#include <time.h>
#include <sys/msg.h>
#include <signal.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/uio.h>
#include <sys/epoll.h>
#define FILENAME "textfile.pdf"
#define PROCESSNUM 10
#define IP "192.168.59.129"
#define PORT 8888
typedef struct
{
pid_t pid;
int pfd;//子进程管道的对端,即父进程所持有的管道的那一端
short busy;//标识进程是否忙碌
}Data,*pData;
typedef struct{
int len; //buf中数据的长度
char buf[1000];
}container;
void make_child(pData p,int pro_num);
void child_handle(int pfd);
void send_fd(int,int);
void recv_fd(int,int*);
void trans_file(int);
void sendn(int,char*,int);
int main()
{
int ret;
int process_num=PROCESSNUM;
pData p=(pData)malloc(sizeof(Data)*process_num);
bzero(p,sizeof(Data)*process_num);
make_child(p,process_num);
//socket
int sfd=socket(AF_INET,SOCK_STREAM,0);
if(sfd==-1)
{
perror("sfd");
return -1;
}
printf("sfd=%d\n",sfd);
//setsocket,设置套接字端口可复用
int resue=1;
ret=setsockopt(sfd,SOL_SOCKET,SO_REUSEADDR,&resue,sizeof(int));
if(ret==-1)
{
perror("setsockopt");
return -1;
}
//bind
struct sockaddr_in ser;
bzero(&ser,sizeof(ser));
ser.sin_family=AF_INET;
ser.sin_port=htons(PORT);
ser.sin_addr.s_addr=inet_addr(IP);
ret=bind(sfd,(struct sockaddr*)&ser,sizeof(struct sockaddr));
if(ret==-1)
{
perror("bind");
return -1;
}
//epoll监听两种流的可读事件:sfd的可读事件和管道在父进程的那一端的可读事件。前者表示有客户端发来请求,后者表示子进程发来回应,表示传输任务已经完成
int epfd=epoll_create(1);
struct epoll_event event,*evs;
evs=(struct epoll_event*)calloc(process_num+1,sizeof(struct epoll_event));
event.events=EPOLLIN;
event.data.fd=sfd;
epoll_ctl(epfd,EPOLL_CTL_ADD,sfd,&event);
for(int i=0;i<process_num;i++)
{
event.data.fd=p[i].pfd;
event.events=EPOLLIN;
epoll_ctl(epfd,EPOLL_CTL_ADD,p[i].pfd,&event);
}
//listen
listen(sfd,process_num);
//some varible
int new_fd;
int nrecv; //可读的描述符个数
char flag;
while(1)
{
nrecv=epoll_wait(epfd,evs,process_num+1,-1);
for(int i=0;i<nrecv;i++)
{
//如果sfd可读,即有客户端发来信息
if(evs[i].data.fd==sfd)
{
new_fd=accept(sfd,NULL,NULL);
for(int j=0;j<process_num;j++)
{
if(p[j].busy==0)
{
send_fd(p[j].pfd,new_fd);
p[j].busy=1;
printf("No.%d child is busy now\n",p[j].pid);
close(new_fd);
break;
}
}
}
for(int j=0;j<process_num;j++)
{
//如果管道在父进程的那一端可读,即子进程向父进程写数据,说明任务已完成,该子进程已空闲
if(p[j].pfd==evs[i].data.fd)
{
read(p[j].pfd,&flag,sizeof(flag));
p[j].busy=0;
printf("No.%d child is not busy now\n",p[j].pid);
}
}
}
}
return 0;
}
void child_handle(int pfd)
{
int new_fd;
char flag=0;
while(1)
{
recv_fd(pfd,&new_fd);
printf("I will send file new_fd=%d\n",new_fd);
trans_file(new_fd);
write(pfd,&flag,sizeof(flag));//通知父进程我完成了文件发送
}
}
//创建子进程
void make_child(pData p,int process_num)
{
int fds[2];
pid_t pid;
for(int i=0;i<process_num;i++)
{
//socketpair创建的一对无名管道fds[2]每一个描述符既可以读也可以写。这个在同一
//个进程中也可以进行通信,向sv[0]中写入,就可以从sv[1]中读取(只能从sv[1]
//中读取),可以在sv[1]中写入,然后从sv[0]中读取;但是,若没有在0端写入,
//而从1端读取,则1端的读取操作会阻塞,即使在1端写入,也不能从1读取,仍然阻塞;
socketpair(AF_LOCAL,SOCK_STREAM,0,fds);
pid=fork();
if (pid== 0)
{
close(fds[1]);
child_handle(fds[0]);//fds[0]端分给子进程
}
close(fds[0]);
p[i].pid=pid;
p[i].pfd=fds[1];//fds[1]端分给父进程
p[i].busy=0;
printf("p[%d].pfd=%d\n",i,p[i].pfd);
}
}
//父进程把文件描述符传递给子进程,即传递控制信息
void send_fd(int pfd,int fd)
{
//set struct cmsghdr
struct cmsghdr *cmsg;
int len=CMSG_LEN(sizeof(int));//len为int长度加上结构体的头长度
cmsg=(struct cmsghdr*)malloc(len);
bzero(cmsg,len);
cmsg->cmsg_len=len;
cmsg->cmsg_level=SOL_SOCKET;
cmsg->cmsg_type=SCM_RIGHTS;
*(int*)CMSG_DATA(cmsg)=fd;
//set struct msghdr
struct msghdr msg;
bzero(&msg,sizeof(msg));
char buf1[10]="lover~";
char buf2[10]="fucker~";
struct iovec iov[2];
iov[0].iov_base=buf1;
iov[0].iov_len=6;
iov[1].iov_base=buf2;
iov[1].iov_len=7;
msg.msg_iov=iov;
msg.msg_iovlen=2;
msg.msg_control=cmsg;
msg.msg_controllen=len;
//sendmsg
int ret;
ret=sendmsg(pfd,&msg,0);
if(ret==-1) {
perror("sendmsg");
return;
}
printf("father has sent fd to son\n");
}
//子进程接受父进程发来的文件描述符
void recv_fd(int pfd,int *fd)
{
//set struct cmsghdr
struct cmsghdr *cmsg;
int len=CMSG_LEN(sizeof(int));//len为int长度加上结构体的头长度
cmsg=(struct cmsghdr*)malloc(len);
bzero(cmsg,len);
cmsg->cmsg_len=len;
cmsg->cmsg_level=SOL_SOCKET;
cmsg->cmsg_type=SCM_RIGHTS;
//set struct msghdr
struct msghdr msg;
bzero(&msg,sizeof(msg));
char buf1[10]="lover~";
char buf2[10]="fucker~";
struct iovec iov[2];
iov[0].iov_base=buf1;
iov[0].iov_len=6;
iov[1].iov_base=buf2;
iov[1].iov_len=7;
msg.msg_iov=iov;
msg.msg_iovlen=2;
msg.msg_control=cmsg;
msg.msg_controllen=len;
//recvmsg
int ret;
ret=recvmsg(pfd,&msg,0);
if(-1==ret)
{
perror("recvmsg");
return;
}
*fd=*(int*)CMSG_DATA(cmsg);
printf("son has received the fd from father\n");
}
void sig(int signum)
{
printf("%d is coming\n",signum);
}
void trans_file(int new_fd)
{
signal(SIGPIPE,sig);
container t;
//先发文件名
strcpy(t.buf,FILENAME);
t.len=strlen(t.buf);
sendn(new_fd,(char*)&t,4+t.len);//发送文件名火车给对端,4是train中int型数据len的长度
//再发文件内容
int fd=open(t.buf,O_RDONLY);
if(fd==-1)
{
perror("open");
return;
}
while(bzero(&t,sizeof(t)),(t.len=read(fd,t.buf, sizeof(t.buf)))>0)//逗号表达式的结果就是最后一个表达式的结果
{
sendn(new_fd,(char*)&t,4+t.len);
}
//文件读完且发送完后,发送一个len成员为0且buf为空的数据,表示文件发送完毕
t.len=0;
sendn(new_fd,(char*)&t,4);
close(new_fd);
}
//发送端和接收端速度不匹配,可能发送了1000,但是接收端只收到200,
//于是1000字节中只有前200是数据,剩下都是0.为了匹配速率,要重写send和recv函数
void sendn(int sfd,char* buf,int len)
{
int total=0;//已发送的字节数
int ret;
while(total<len)
{
ret=send(sfd,buf+total,len-total,0);
total=total+ret;
}
}
void recvn(int sfd,char* buf,int len)
{
int total=0;
int ret;
while(total<len)
{
ret=recv(sfd,buf+total,len-total,0);
total=total+ret;
}
}
客户端
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <strings.h>
#include <time.h>
#include <sys/msg.h>
#include <signal.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/uio.h>
#include <sys/epoll.h>
#define FILENAME "file"
#define PROCESSNUM 10
#define IP "192.168.59.129"
#define PORT 8888
void recvn(int,char*,int);
int main()
{
int ret;//保留函数的返回值,用以判断函数是否正确执行
char buf[1000]={0};//内存缓存区
int len;//每一次读取的数据长度
int fd;//保存传输文件的描述符
char filename[50]={0};
//socket
int sfd=socket(AF_INET,SOCK_STREAM,0);
if(sfd==-1)
{
perror("socket");
return -1;
}
//connect
struct sockaddr_in ser;
bzero(&ser,sizeof(sockaddr_in));
ser.sin_family=AF_INET;
ser.sin_port=htons(PORT);
ser.sin_addr.s_addr=inet_addr(IP);
ret=connect(sfd,(struct sockaddr*)&ser,sizeof(struct sockaddr));
if(ret==-1)
{
perror("connect");
return -1;
}
//传递文件名信息,新建并打开一个同名文件
recvn(sfd,(char*)&len,sizeof(int));//由container数据类型所决定的,首先传递的是数据长度,这里的数据长度是文件名长度
recvn(sfd,buf,len);//然后传递的是文件名
strcpy(filename,buf);
fd=open(buf,O_RDWR|O_CREAT,0666);
if(fd<0)
{
perror("open");
return -1;
}
while(1)
{
recvn(sfd,(char*)&len,sizeof(int));//先接4个字节,4是container中int型数据len的长度
if(len>0)
{
bzero(buf,sizeof(buf));
recvn(sfd,buf,len);
write(fd,buf,len);
printf("Downloading %s\n",filename);
}else{
break;
}
}
close(fd);
close(sfd);
}
//发送端和接收端速度不匹配,可能发送了1000,但是接收端只收到200,
//于是1000字节中只有前200是数据,剩下都是0.为了匹配速率,要重写send和recv函数
void sendn(int sfd,char* buf,int len)
{
int total=0;//已发送的字节数
int ret;
while(total<len)
{
ret=send(sfd,buf+total,len-total,0);
total=total+ret;
}
}
void recvn(int sfd,char* buf,int len)
{
int total=0;
int ret;
while(total<len)
{
ret=recv(sfd,buf+total,len-total,0);
total=total+ret;
}
}