使用epoll和线程池实现具有简单的数据传输功能的FTP文件服务器

时间:2024-04-07 13:16:53

一、大致思路

使用epoll和线程池实现具有简单的数据传输功能的FTP文件服务器
如图所示,为线程池的基本框架

首先定义服务器端的数据类型

struct data{
     pid_t pid;			//进程pid
     int fd;		   	//管道id
     short busy;		//进程是否忙碌  0非忙碌 1忙碌
}

进程流程

  1. 初始化管道,创建子进程,每个子进程等待父进程发送任务
  2. 初始化socket,bind端口,listen
  3. 初始化epoll句柄,注册sfd,注册子进程对应的每一条管道在父进程的哪一端
  4. while(1){
    开始epoll_wait;
    if(如果sfd可读){ //有客户端请求
        new_fd=accept();
        分配一个空闲的进程
        把new_fd传递给这个进程
        把这个进程标记为忙碌
      }
      if(管道在父进程的一端可读){ //子进程已经完成传输任务
        就把该管道对应的子进程标识为非忙碌
      }

进程流程
   while(1)
   {
     1、等待接收父进程发过来的new_fd
     2、发送文件给客户端
     3、通知父进程非忙碌
   }

举一个那种先买单后吃饭,服务员懒散不主动的小店的例子。父进程相当于收银台的老板,是个管理者,只负责给客户买单和给厨房和服务员(子进程们,既进程池)分配任务;客户付钱买单后,服务员和厨师就服务客户(传输数据)。客户走后,服务员没活干了,就跟老板汇报一声,等老板再分配。

难点:
1、epoll监控那些事件?
2、父进程如何传递描述符给子进程?
3、内存管理?
4、如何传递一个文件,双方协商好什么时候开始发送,什么时候发送完?
5、当发送大文件时,可能会出错。socket的缓冲区只有64K。极有可能在网络中传输的速度不匹配。怎么办?
6、如何看两个文件完全相同 ?

二、难点分析

1.epoll监控事件分析
epoll代码放在父进程执行,子进程只需做发送数据的任务即可。
首先,服务器端的sfd套接字一定要监控是否可。如果可读,则说明有客户端发来连接请求。(老板要负责先给顾客先点单买账)
接着,要监控父进程这一端的管道是否可,用于在传输任务结束后,子进程向父进程发送终结信号。(老板要负责给空闲的服务员安排任务)
所以如图所示,红点均为epoll所监控事件

2.父进程传递描述符给子进程
如果你这么想:父进程打开一个文件 fd,通过管道把fd给子进程,那就大错特错了。文件描述符表面上是一个非负整数,但是实际上是一个索引值,指向内核为每个进程所维护的该进程打开文件的记录表。所以,父进程和子进程各有一个记录表,两个记录表都不一样,传一个整数给子进程,怎么可能有作用?

那么为什么不先创建fd,然后再fork,这样fork出来的子进程就同主进程的一样了,这样就相当于已经把fd传过来了。这种方法也是不可取的,因为实在应用的时候,一个服务器启动多少个进程或线程都已经定好了。需要让子进程处于待命状态,这样效率才最佳。所以要先创建子进程。

所以怎么才能把一个进程描述符的控制信息,传递给另一个进程?
这里用到了socketpair,sendmsg,recvmsg三个函数。

第1步,初始化socketpair类型描述符

 int fds[2]; 
 socketpair(AF_LOCAL,SOCK_STREAM,0,fds);   

必须要用这个管道。
socketpair创建了一对无名的套接字描述符,只能用于父子进程之间。描述符存储于一个二元数组,例如fds[2]。这对套接字可以进行全双工通信,每一个描述符既可以读也可以写。
注意:向fds[0]中写入,就只能从fds[1]中读取,无法从fds[0]读取;也可以在fds[1]中写入,然后只能从fds[0]中读取,无法从fds[1]读取;但是,若没有在0端写入,而从1端读取,则1端的读取操作会阻塞,即使在1端写入,也不能从1读取,仍然阻塞。

第3步:sendmsg接口发送描述符

#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);

与第3步的recvmsg函数是一对,可以用于传输内核控制信息,故可以用这一对函数实现父子进程共享同一文件的描述符。
sockfd即sockpair初始化的描述符fds[0]或fds[1]。

Sendmsg关键是初始化msghdr结构体。

struct msghdr {               
  void          *msg_name;          //没用,memset为0即可
  socklen_t     msg_namelen;        //没用,memset为0即可
  struct iovec  *msg_iov;           //用户态信息结构体指针。这个机构体跟一次可以写多个buf有关。虽然不传内容,但是因为有**的问题,所以必须要写点东西。                   
  size_t        msg_iovlen;         //用户态信息结构体数组成员个数。
  void          *msg_control;       //内核控制信息结构体指针,即下面的cmsghdr结构体地址。告诉它我们要传递内核的信息。         
  size_t        msg_controllen;     //内核控制信息结构体长度,cmsghdr结构体的长度。这个长度只能是字节,因为前面是void * 类型,没有类型,所以不可能是个数.
  int           msg_flags;          //没用,0
 }; 

更多的sendmsg成员解析或百度或翻课件。
*msg_control就是下面这个结构体的指针:

struct cmsghdr{     						//这是一个变长结构体。
  socklen_t   cmsg_len;                     //变长结构体的长度。
  int         cmsg_level;                   //填SOL_SOCKET即可。                 
  int         cmsg_type;                    //填SCM_RIGHTS即可。
  //followed by unsigned char cmsg_data[];  //接下来是真正的控制信息数据,放什么什么都行。在本文中,我们存放fd.          
 }; 

因为前三个成员变量的长度是一定的,所以可以通过偏移找到最后一个成员的首地址,也可以通过接口SMSG_DATA()来得到最后一个成员的首地址。

我们用以下代码给struct cmsghdr的控制信息区域赋值:

*(int*)CMSG_DATA(cmsg)=fd;		//相当于*(int*)((char*)cnsg+12)=fd;

cmsg_len成员怎么赋值?
变长结构体长度的计算就是前三个成员变量的长度(3*4)加上最后一个成员变量的长度。本文中最后一个成员变量中传递fd,所以最后一个成员变量的长度为4。所以cmsg_len等于16。除了自己计算的方法,还可以通过接口:CMSG_LEN()来得到长度,参数为最后一个成员变量的长度。即:

cmsg->cmsg_len = CMSG_LEN(sizeof(int));

下图为struct msghdr 和struct cmsghdr的关系图:
使用epoll和线程池实现具有简单的数据传输功能的FTP文件服务器

第3步: recvmsg接收文件描述符

#include <sys/socket.h>
ssize_t recvmsg(int sockfd, struct msghdr * msg, int flag);

接收的msghdr结构体初始化和sendmsg几乎完全一致,多了一句:

*fd=*(int*)CMSG_DATA(cmsg);

3、内存管理:ARC(引用计数) GC(自动回收)
引用计数:一个结构体或对象,若有一个进程使用它,使用他一次引用计数就加1。只有当为0时才回收。
C、C++使用引用计数,java使用GC。
当把new_fd发送给了子进程之后,new_fd的引用计数为2。
可以当把new_fd发给了子进程之后,就关闭new_fd。这样引用计数就变回了1。当子进程给客户端发送完数据,再关闭对应的new_fd。

4、如何传递一个文件,双方协商好什么时候开始发送,什么时候发送完?
首先定义数据传输的数据结构:

typedef struct{                //小火车,只有车长才知道后面有多少个字节。这是一个简单的协议。这就是上层协议。
  int len;                    //小火车:接下来我要发多少。
  char buf[1000];            //存放数据。
}train;

服务端
1、发送文件名给客户端,发送的t.len=strlen(filename),再strcpy(t.buf,filename)
2、打开文件
  while(读取的字符数>0)
  {
    然后读取文件内容1000,发送文件内容1000给客户端
  }
3、发送结束

接受端
1、接收文件名,并以这个名字新建一个文件,并打开每次先读4个字节
recv(sfd,buf,len);
2、while(1)接收1000,并write到对应文件
3、接收到结束len=0;

要注意文件内容读入t.buf中,文件长度为t.len=sizeof(t.buf),绝不能用strlen,这是因为文件名是字符串,可以用strlen;而文件内容不一定全是字符,很有可能有很多0。

5、当发送大文件时,可能会出错。socket的缓冲区只有64K。极有可能在网络中传输的速度不匹配。比如,发送1000字节内容,因为网络原因,导致客户端只收到200字节。但是火车头告诉他有1000字节的内容,于是写入文件的1000字节数据中,只有前200是有效的,后800字节全是0。
所以,就要重写send和recv函数。每一次发送或接受,要记录下返回值,既真正发送或接受的字符数。当接受数等于要发生的总数据长度时,说明传输完成。

void sendn(int sfd,char* buf,int len)
{
    int total=0;//已发送的字节数
    int ret;
    while(total<len)
    {
        ret=send(sfd,buf+total,len-total,0);
        total=total+ret;
    }
}


void recvn(int sfd,char* buf,int len)
{
    int total=0;
    int ret;
    while(total<len)
    {
        ret=recv(sfd,buf+total,len-total,0);
        total=total+ret;
    }
}

6、如何看两个文件完全相同 ?
md5查看内容是否一样。闪电算法:做一致性校验。shell命令md5sum file

三、代码
服务端

#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <strings.h>
#include <time.h>
#include <sys/msg.h>
#include <signal.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/uio.h>
#include <sys/epoll.h>

#define FILENAME "textfile.pdf"
#define PROCESSNUM 10
#define IP "192.168.59.129"
#define PORT 8888

typedef struct
{
    pid_t pid;
    int pfd;//子进程管道的对端,即父进程所持有的管道的那一端
    short busy;//标识进程是否忙碌
}Data,*pData;

typedef struct{
    int len;    //buf中数据的长度
    char buf[1000];
}container;



void make_child(pData p,int pro_num);
void child_handle(int pfd);
void send_fd(int,int);
void recv_fd(int,int*);
void trans_file(int);
void sendn(int,char*,int);


int main()
{
    int ret;
    int process_num=PROCESSNUM;
    pData p=(pData)malloc(sizeof(Data)*process_num);
    bzero(p,sizeof(Data)*process_num);
    make_child(p,process_num);

    //socket
    int sfd=socket(AF_INET,SOCK_STREAM,0);
    if(sfd==-1)
    {
        perror("sfd");
        return -1;
    }
    printf("sfd=%d\n",sfd);

    //setsocket,设置套接字端口可复用
    int resue=1;
    ret=setsockopt(sfd,SOL_SOCKET,SO_REUSEADDR,&resue,sizeof(int));
    if(ret==-1)
    {
        perror("setsockopt");
        return -1;
    }

    //bind
    struct sockaddr_in ser;
    bzero(&ser,sizeof(ser));
    ser.sin_family=AF_INET;
    ser.sin_port=htons(PORT);
    ser.sin_addr.s_addr=inet_addr(IP);
    ret=bind(sfd,(struct sockaddr*)&ser,sizeof(struct sockaddr));
    if(ret==-1)
    {
        perror("bind");
        return -1;
    }

    //epoll监听两种流的可读事件:sfd的可读事件和管道在父进程的那一端的可读事件。前者表示有客户端发来请求,后者表示子进程发来回应,表示传输任务已经完成
    int epfd=epoll_create(1);
    struct epoll_event event,*evs;
    evs=(struct epoll_event*)calloc(process_num+1,sizeof(struct epoll_event));
    event.events=EPOLLIN;
    event.data.fd=sfd;
    epoll_ctl(epfd,EPOLL_CTL_ADD,sfd,&event);
    for(int i=0;i<process_num;i++)
    {
        event.data.fd=p[i].pfd;
        event.events=EPOLLIN;
        epoll_ctl(epfd,EPOLL_CTL_ADD,p[i].pfd,&event);
    }


    //listen
    listen(sfd,process_num);

    //some varible
    int new_fd;
    int nrecv;  //可读的描述符个数

    char flag;

    while(1)
    {
        nrecv=epoll_wait(epfd,evs,process_num+1,-1);
        for(int i=0;i<nrecv;i++)
        {
            //如果sfd可读,即有客户端发来信息
            if(evs[i].data.fd==sfd)
            {
                new_fd=accept(sfd,NULL,NULL);
                for(int j=0;j<process_num;j++)
                {
                    if(p[j].busy==0)
                    {
                        send_fd(p[j].pfd,new_fd);
                        p[j].busy=1;
                        printf("No.%d child is busy now\n",p[j].pid);
                        close(new_fd);
                        break;
                    }
                }
            }

            for(int j=0;j<process_num;j++)
            {
                //如果管道在父进程的那一端可读,即子进程向父进程写数据,说明任务已完成,该子进程已空闲
                if(p[j].pfd==evs[i].data.fd)
                {
                    read(p[j].pfd,&flag,sizeof(flag));
                    p[j].busy=0;
                    printf("No.%d child is not busy now\n",p[j].pid);
                }
            }
        }
    }
    return 0;
}

void child_handle(int pfd)
{
    int new_fd;
    char flag=0;
    while(1)
    {
        recv_fd(pfd,&new_fd);
        printf("I will send file new_fd=%d\n",new_fd);
        trans_file(new_fd);
        write(pfd,&flag,sizeof(flag));//通知父进程我完成了文件发送
    }
}

//创建子进程
void make_child(pData p,int process_num)
{
    int fds[2];
    pid_t pid;
    for(int i=0;i<process_num;i++)
    {
        //socketpair创建的一对无名管道fds[2]每一个描述符既可以读也可以写。这个在同一
        //个进程中也可以进行通信,向sv[0]中写入,就可以从sv[1]中读取(只能从sv[1]
        //中读取),可以在sv[1]中写入,然后从sv[0]中读取;但是,若没有在0端写入,
        //而从1端读取,则1端的读取操作会阻塞,即使在1端写入,也不能从1读取,仍然阻塞;
        socketpair(AF_LOCAL,SOCK_STREAM,0,fds);
        pid=fork();
        if (pid== 0)
        {
            close(fds[1]);
            child_handle(fds[0]);//fds[0]端分给子进程
        }
        close(fds[0]);
        p[i].pid=pid;
        p[i].pfd=fds[1];//fds[1]端分给父进程
        p[i].busy=0;
        printf("p[%d].pfd=%d\n",i,p[i].pfd);
    }
}


//父进程把文件描述符传递给子进程,即传递控制信息
void send_fd(int pfd,int fd)
{
    //set struct cmsghdr
    struct cmsghdr *cmsg;
    int len=CMSG_LEN(sizeof(int));//len为int长度加上结构体的头长度
    cmsg=(struct cmsghdr*)malloc(len);
    bzero(cmsg,len);
    cmsg->cmsg_len=len;
    cmsg->cmsg_level=SOL_SOCKET;
    cmsg->cmsg_type=SCM_RIGHTS;
    *(int*)CMSG_DATA(cmsg)=fd;

    //set struct msghdr
    struct msghdr msg;
    bzero(&msg,sizeof(msg));
    char buf1[10]="lover~";
    char buf2[10]="fucker~";
    struct iovec iov[2];
    iov[0].iov_base=buf1;
    iov[0].iov_len=6;
    iov[1].iov_base=buf2;
    iov[1].iov_len=7;
    msg.msg_iov=iov;
    msg.msg_iovlen=2;
    msg.msg_control=cmsg;
    msg.msg_controllen=len;

    //sendmsg
    int ret;
    ret=sendmsg(pfd,&msg,0);
    if(ret==-1) {
        perror("sendmsg");
        return;
    }
    printf("father has sent fd to son\n");
}

//子进程接受父进程发来的文件描述符
void recv_fd(int pfd,int *fd)
{
    //set struct cmsghdr
    struct cmsghdr *cmsg;
    int len=CMSG_LEN(sizeof(int));//len为int长度加上结构体的头长度
    cmsg=(struct cmsghdr*)malloc(len);
    bzero(cmsg,len);
    cmsg->cmsg_len=len;
    cmsg->cmsg_level=SOL_SOCKET;
    cmsg->cmsg_type=SCM_RIGHTS;

    //set struct msghdr
    struct msghdr msg;
    bzero(&msg,sizeof(msg));
    char buf1[10]="lover~";
    char buf2[10]="fucker~";
    struct iovec iov[2];
    iov[0].iov_base=buf1;
    iov[0].iov_len=6;
    iov[1].iov_base=buf2;
    iov[1].iov_len=7;
    msg.msg_iov=iov;
    msg.msg_iovlen=2;
    msg.msg_control=cmsg;
    msg.msg_controllen=len;

    //recvmsg
    int ret;
    ret=recvmsg(pfd,&msg,0);
    if(-1==ret)
    {
        perror("recvmsg");
        return;
    }
    *fd=*(int*)CMSG_DATA(cmsg);
    printf("son has received the fd from father\n");
}



void sig(int signum)
{
    printf("%d is coming\n",signum);
}
void trans_file(int new_fd)
{
    signal(SIGPIPE,sig);
    container t;
    
    //先发文件名
    strcpy(t.buf,FILENAME);
    t.len=strlen(t.buf);
    sendn(new_fd,(char*)&t,4+t.len);//发送文件名火车给对端,4是train中int型数据len的长度

    //再发文件内容
    int fd=open(t.buf,O_RDONLY);
    if(fd==-1)
    {
        perror("open");
        return;
    }
    while(bzero(&t,sizeof(t)),(t.len=read(fd,t.buf, sizeof(t.buf)))>0)//逗号表达式的结果就是最后一个表达式的结果
    {
        sendn(new_fd,(char*)&t,4+t.len);
    }

    //文件读完且发送完后,发送一个len成员为0且buf为空的数据,表示文件发送完毕
    t.len=0;
    sendn(new_fd,(char*)&t,4);
    close(new_fd);

}


//发送端和接收端速度不匹配,可能发送了1000,但是接收端只收到200,
//于是1000字节中只有前200是数据,剩下都是0.为了匹配速率,要重写send和recv函数
void sendn(int sfd,char* buf,int len)
{
    int total=0;//已发送的字节数
    int ret;
    while(total<len)
    {
        ret=send(sfd,buf+total,len-total,0);
        total=total+ret;
    }
}


void recvn(int sfd,char* buf,int len)
{
    int total=0;
    int ret;
    while(total<len)
    {
        ret=recv(sfd,buf+total,len-total,0);
        total=total+ret;
    }
}

客户端

#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <strings.h>
#include <time.h>
#include <sys/msg.h>
#include <signal.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/uio.h>
#include <sys/epoll.h>

#define FILENAME "file"
#define PROCESSNUM 10
#define IP "192.168.59.129"
#define PORT 8888

void recvn(int,char*,int);



int main()
{
    int ret;//保留函数的返回值,用以判断函数是否正确执行
    char buf[1000]={0};//内存缓存区
    int len;//每一次读取的数据长度
    int fd;//保存传输文件的描述符
    char filename[50]={0};
    //socket
    int sfd=socket(AF_INET,SOCK_STREAM,0);
    if(sfd==-1)
    {
        perror("socket");
        return -1;
    }

    //connect
    struct sockaddr_in ser;
    bzero(&ser,sizeof(sockaddr_in));
    ser.sin_family=AF_INET;
    ser.sin_port=htons(PORT);
    ser.sin_addr.s_addr=inet_addr(IP);
    ret=connect(sfd,(struct sockaddr*)&ser,sizeof(struct sockaddr));
    if(ret==-1)
    {
        perror("connect");
        return -1;
    }

    //传递文件名信息,新建并打开一个同名文件
    recvn(sfd,(char*)&len,sizeof(int));//由container数据类型所决定的,首先传递的是数据长度,这里的数据长度是文件名长度
    recvn(sfd,buf,len);//然后传递的是文件名
    strcpy(filename,buf);
    fd=open(buf,O_RDWR|O_CREAT,0666);
    if(fd<0)
    {
        perror("open");
        return  -1;
    }

    while(1)
    {
        recvn(sfd,(char*)&len,sizeof(int));//先接4个字节,4是container中int型数据len的长度
        if(len>0)
        {
            bzero(buf,sizeof(buf));
            recvn(sfd,buf,len);
            write(fd,buf,len);
            printf("Downloading %s\n",filename);
        }else{
            break;
        }
    }
    close(fd);
    close(sfd);
}

//发送端和接收端速度不匹配,可能发送了1000,但是接收端只收到200,
//于是1000字节中只有前200是数据,剩下都是0.为了匹配速率,要重写send和recv函数
void sendn(int sfd,char* buf,int len)
{
    int total=0;//已发送的字节数
    int ret;
    while(total<len)
    {
        ret=send(sfd,buf+total,len-total,0);
        total=total+ret;
    }
}


void recvn(int sfd,char* buf,int len)
{
    int total=0;
    int ret;
    while(total<len)
    {
        ret=recv(sfd,buf+total,len-total,0);
        total=total+ret;
    }
}