Linux网络编程之多进程模型编程与一个使用进程池实现的CGI服务器

时间:2022-02-26 00:10:22

NO.1

一:什么是多进程模型

多进程模型是服务器在接收到大量高并发客户端访问时,通过创建多个子进程来与客户端进行通信。单进程阻塞在read()系统调用的时候,会导致服务器无法响应到其他的连接请求。这时可以通过fork()函数创建出多个子进程来处理业务,而主进程继续循环accept()其他客户连接,子进程实施具体的通信细节。

二:fork函数详解

NAME
       fork - create a child process       

SYNOPSIS
       #include <unistd.h>
 
       pid_t fork(void);
DESCRIPTION
       fork()  creates  a  new  process by duplicating(复制) the calling process.  The new process, referred to as the child, is an exact(准确的)
       duplicate of the calling process, referred to as the parent, except for the following points:          

       *  The child has its own unique process ID, and this PID does not match the ID of any existing process group (setpgid(2)).

       *  The child's parent process ID is the same as the parent's process ID.

       *  The child does not inherit(继承) its parent's memory locks (mlock(2), mlockall(2)).

       *  Process resource utilizations(利用) (getrusage(2)) and CPU time counters (times(2)) are reset to zero in the child.

       *  The child's set of pending(挂起) signals is initially empty (sigpending(2)).

       *  The child does not inherit semaphore(信号量) adjustments from its parent (semop(2)).

       *  The child does not inherit record(记录) locks from its parent (fcntl(2)).

       *  The child does not inherit timers from its parent (setitimer(2), alarm(2), timer_create(2)).

       ......

RETURN VALUE
       On success, the PID of the child process is returned in the parent, and 0 is returned in  the  child.   On  failure,  -1  is
       returned in the parent, no child process is created, and errno is set appropriately(适当的).                       

ERRORS
       EAGAIN
fork()  cannot  allocate  sufficient(足够的)  memory  to  copy the parent's page tables and allocate a task structure for the child.

       EAGAIN It was not possible to create a new process because the caller's RLIMIT_NPROC(用户可拥有的最大进程数) resource  limit  was  encountered(冲突).  

              To exceed this limit, the process must have either the CAP_SYS_ADMIN or the CAP_SYS_RESOURCE capability.

       ENOMEM fork() failed to allocate the necessary kernel structures because memory is tight(紧的).

       ENOSYS fork() is not supported on this platform(平台) (for example, hardware without a Memory-Management Unit).

所以由上可以总结出我们使用fork的“套路”:

pid_t pid;      //pid_t是Process ID _Type的缩写,其实是宏定义下的unsigned int类型

pid = fork();   //调用fork函数
if(pid == -1)   //如果返回值为-1,函数执行出错
	ERR_EXIT("fork error\n");
else if(pid == 0){           //如果返回值为0,启动一个子进程
	close(listenfd);         //子进程负责通信细节,关闭监听套接字
	do_service(connfd);      //通信执行函数
	exit(EXIT_SUCCESS);      //do_service结束后,子进程已无用,在此处销毁
}
else                         //父进程
	close(connfd);           //父进程仍然循环accept

三:服务器代码示例

下面是一个简单的回射服务器代码,服务器将客户端发送的代码回给客户端。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>

#define ERR_EXIT(sz) \
		do{ \
			perror(sz); \
			exit(EXIT_FAILURE); \
		}while(0)

void do_service(int connfd)
{
	char recvbuf[1024];
	for(; ;){
		memset(recvbuf, 0, sizeof(recvbuf));
		
		ssize_t ret = read(connfd, recvbuf, sizeof(recvbuf));
		if(ret == 0){
			fprintf(stdout, "client close\n");
			break;
		}			
		
		fputs(recvbuf, stdout);
		write(connfd, recvbuf, strlen(recvbuf));
		memset(recvbuf, 0, sizeof(recvbuf));
	}
}

int main(void)
{
	struct sockaddr_in servaddr;
	int listenfd;

	if( (listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)     //创建监听套接字,选择TCP协议
			ERR_EXIT("sockfd err");  

	memset(&servaddr, 0, sizeof(servaddr));
	servaddr.sin_family = AF_INET;                  //TCP协议族AF_INET或PF_INET,这两个宏实际上值是一样的
	servaddr.sin_port = htons(8888);                //端口8888
	servaddr.sin_addr.s_addr = INADDR_ANY;          //选择为本地任意地址

	int on = 1;
	if(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)      //设置IO复用,on为非零值即可
		ERR_EXIT("setsockopt err");

	if(bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)        //绑定套接字
		ERR_EXIT("bind err");

	if(listen(listenfd, SOMAXCONN) < 0)       //监听
		ERR_EXIT("listen err");

	struct sockaddr_in peeraddr;
	int connfd;
	
	pid_t pid;
 	socklen_t peerlen = sizeof(peeraddr);     //客户端地址长度
	for(; ;){
		if( (connfd = accept(listenfd, (struct sockaddr*)&peeraddr, &peerlen)) < 0)           //接收
			ERR_EXIT("accept err");

		printf("client%s:port%d connect.\n", inet_ntoa(peeraddr.sin_addr), ntohs(peeraddr.sin_port));     //输出客户端ip和端口

		pid = fork();           //调用fork函数
		if(pid == -1)
			ERR_EXIT("fork");
		if(pid == 0){           //子进程创建成功
			close(listenfd);
			do_service(connfd);
			exit(EXIT_SUCCESS);   //should destroy
		}
		else
			close(connfd);
	}

	return 0;
}

四:客户端代码示例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/types.h>

#define ERR_EXIT(sz) \
		do{ \
			perror(sz); \
			exit(EXIT_FAILURE); \
		}while(0)

int main(void)
{
	struct sockaddr_in servaddr;
	int sockfd;

	if( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)       //创建套接字
		ERR_EXIT("socket err");

	memset(&servaddr, 0, sizeof(servaddr));
	servaddr.sin_family = AF_INET;                            
	servaddr.sin_port = htons(8888);
	servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

	if(connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)    //调用connect,请求连接
		ERR_EXIT("connect err");

	char sendbuf[1024] = {0};
	char recvbuf[1024] = {0};
	
	while(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL){           //从标准输入得到数据
		write(sockfd, sendbuf, strlen(sendbuf));                     //写入套接字
		read(sockfd, recvbuf, sizeof(recvbuf));                      //从套接字读取
		fputs(recvbuf, stdout);                                      //输出显示
		memset(sendbuf, 0, sizeof(sendbuf));
		memset(recvbuf, 0, sizeof(recvbuf));
	}

	close(sockfd);

	return 0;
}

以上程序均经过测试。


NO.2

下面是一个进程池的实现:
#ifndef _PROCESS_POOL_H
#define _PROCESS_POOL_H

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>

//描述一个子进程的类,pid_是目标子进程的pid,pipefd是父进程和子进程通信用的管道
class process {
public:
    pid_t pid_;
    int   pipefd_[2];
};

//进程池类,将它定义为模板类是为了代码复用。其模板参数是处理逻辑任务的类
template <typename T>
class process_pool {
private:
    process_pool(int listenfd, int process_number = 8); 
    static process_pool<T>* instance_;
public:
    //单例模式,以保证程序最多创建一个process_pool实例,这是程序正确处理信号的必要条件
    static process_pool<T>* create(int listenfd, int process_number = 8) {
 static process_pool<T>* create(int listenfd, int process_number = 8) {
        if(instance_ == NULL) {
            instance_ = new process_pool<T>(listenfd, process_number);
        }
        return instance_;
    }
    void run();
private:
    void setup_sig_pipe();
    void run_parent();
    void run_child();
private:
    //进程池语序的最大子进程数量
    static const int MAX_PROCESS_NUMBER = 16;
    //每个子进程最多能处理的客户数量
    static const int USER_PER_PROCESS = 65536;
    //epoll 最多能处理的事件数
    static const int MAX_EVENT_NUMBER = 10000;
    //进程池的进程总数
    int process_number_;
    //子进程在池中的序号,从0开始
    int index_;
    //每个进程都有一个epollfd
    int epollfd_;
    //监听套接字
    int listenfd_;
    //子进程通过stop_来决定是否停止运行
    int stop_;
    //保存所有子进程的描述信息
    process* sub_process_;
};

template <typename T>
process_pool<T>* process_pool<T>::instance_ = NULL;

//用于处理信号的管道,以统一事件源,后面称之为信号管道
static int sig_pipefd[2];

static int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

static void addfd(int epollfd, int fd)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    setnonblocking(fd);
}

//从epollfd标识的epoll内核事件表中删除fd上的所有注册事件
static void removefd(int epollfd, int fd)
{
    epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);
    close(fd);
}

static void sig_handler(int sig)
{
    int save_errno = errno;
    int msg = sig;
    send(sig_pipefd[1], (char*)&msg, 1, 0);
    errno = save_errno;
}
static void addsig(int sig, void(handler)(int), bool restart = true)
{
    struct sigaction sa;
    memset(&sa, '\0', sizeof(sa));
    sa.sa_handler = handler;
    if(restart) {
        sa.sa_flags |= SA_RESTART;
    }
    sigfillset(&sa.sa_mask);
    assert(sigaction(sig, &sa, NULL) != -1);
}

//进程池构造函数,参数listenfd是监听socket,它必须在创建进程池之前被创建,
//否则子进程无法直接引用它,参数process_number指定进程池中子进程的数量
template <typename T>
process_pool<T>::process_pool(int listenfd, int process_number)
    : listenfd_(listenfd), process_number_(process_number), index_(-1), stop_(false)
{
    assert((process_number > 0) && (process_number <= MAX_PROCESS_NUMBER));

    sub_process_ = new process[process_number_];

    //创建process_number个子进程,并建立它们和父进程之间的管道
    for(int i=0; i<process_number_; ++i) {
        int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sub_process_[i].pipefd_);
        assert(ret != -1);

        sub_process_[i].pid_ = fork();
        assert(sub_process_[i].pid_ >= 0);

        if(sub_process_[i].pid_ > 0) {
            close(sub_process_[i].pipefd_[1]);
            continue;
        }
        else {
   close(sub_process_[i].pipefd_[0]);
            index_ = i;   //子进程赋予自己的index
            ////////////////////////////////////
            break;   //这才是进程池的精华子进程需要退出循环,不然子进程也会fork
            ////////////////////////////////////
        }
    }
}

//统一事件源,这个pipe不是用来和父进程通信的pipe
template <typename T>
void process_pool<T>::setup_sig_pipe()
{
    epollfd_ = epoll_create(5);
    assert(epollfd_ != -1);

    int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
    assert(ret != -1);

    setnonblocking(sig_pipefd[1]);  //sig_handler写的一侧设为非阻塞
    addfd(epollfd_, sig_pipefd[0]);

    //设置信号处理函数
    addsig(SIGCHLD, sig_handler);
    addsig(SIGTERM, sig_handler);
    addsig(SIGINT, sig_handler);
    addsig(SIGPIPE, SIG_IGN);
}

//父进程中index_为-1,子进程中index_值大于等于0,我们据此判断接下来要运行的是父进程代码还是子进程代码
template <typename T>
void process_pool<T>::run()
{
    index_ == -1 ? run_parent() : run_child();
}

template <typename T>
void process_pool<T>::run_child()
{
    setup_sig_pipe();

    //每个子进程通过其在进程池中的序号值index_找到与父进程通信的管道
    int pipefd = sub_process_[index_].pipefd_[1];
    //子进程需要监听管道文件描述符pipefd,因为父进程通过它来通知子进程accept新链接
    addfd(epollfd_, pipefd);

    epoll_event events[MAX_EVENT_NUMBER];

    //每个进程负责的客户连接数组
    T* users = new T [USER_PER_PROCESS];

    int number = 0;
    int ret = -1;

    while(!stop_) {
        number = epoll_wait(epollfd_, events, MAX_EVENT_NUMBER, -1);
        if((number < 0) && (errno != EINTR)) {
            printf("epoll failure\n");
            break;
        }

        for(int i=0; i<number; ++i) {
            int sockfd = events[i].data.fd;
            if((sockfd == pipefd) && (events[i].events & EPOLLIN)) {
                int client = 0;
                //从父子进程之间的管道读取数据,并将结果保存在变量client中。如果读取成功,则表示有新客户连接到来
                ret = recv(sockfd, (char*)&client, sizeof(client), 0);
                if(((ret < 0) && (errno != EAGAIN)) || ret == 0) {
                    continue;
 }
                else {
                    struct sockaddr_in client_address;
                    socklen_t len = sizeof(sockaddr_in);
                    int connfd = accept(listenfd_, (struct sockaddr*)&client_address, &len);
                    if(connfd < 0) {
                        printf("errno is: %d\n", errno);
                        continue;
                    }
                    addfd(epollfd_, connfd);

                    //模板类T必须要实现init方法,以初始化一个客户连接,我们直接使用connfd来索引逻辑处理对象(T类型的对象),以提高程序效率
                    users[connfd].init(epollfd_, connfd, client_address);
                }
            }
            //下面处理子进程接收到的信号
            else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) {
                int sig;
                char signals[1024];
                ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
                if(ret <= 0) {
                    continue;
                }
                else {
                    for(int i=0; i<ret; ++i) {
                        switch(signals[i]) {
                            case SIGCHLD: {
                                pid_t pid;
                                int stat;
                                while((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
                                    continue;
                                }
                                break;
                            }
                            case SIGTERM:
   case SIGINT: {
                                stop_ = true;
                                break;
                            }
                            default:
                                break;
                        }
                    }
                }
            }
            //如果是其他可读数据,那么必然是客户请求到来,调用逻辑处理对象的process方法处理之
            else if(events[i].events & EPOLLIN) {
                users[sockfd].process();
            }
            else {
                continue;
            }
        }
    }

    delete []users;
    users = NULL;
    close(pipefd);
    close(epollfd_);
}

template <typename T>
void process_pool<T>::run_parent()
{
    setup_sig_pipe();

    //父进程监听listenfd
    addfd(epollfd_, listenfd_);

    epoll_event events[MAX_EVENT_NUMBER];
   int sub_process_counter = 0;
    int number = 0;
    int ret = -1;

    while(!stop_) {
        number = epoll_wait(epollfd_, events, MAX_EVENT_NUMBER, -1);
        if((number < 0) && (errno != EINTR)) {
            printf("epoll failure\n");
            break;
        }

        for(int i=0; i<number; ++i) {
            int sockfd = events[i].data.fd;
            if(sockfd == listenfd_) {
                //如果有新连接到来,就采用Round Robin的方式为其分配一个子进程处理
                int i= sub_process_counter;
                do {
                    if(sub_process_[i].pid_ != -1)  //选一个仍存活的进程
                        break;

                    i = (i+1) % process_number_;
                }
                while(i != sub_process_counter);

                if(sub_process_[i].pid_ == -1) {   //pid=-1说明找了一圈也没有找到一个活的进程,也就是所有子进程都死了,那么退出服务器
                    stop_ = true;
                    break;
                }
                sub_process_counter = (i+1)%process_number_;
                int new_conn = 1;  //这个值没意义,统一事件源
                send(sub_process_[i].pipefd_[0], (char*)&new_conn, sizeof(new_conn), 0);  //随便发送一个信号,让子进程识别
                printf("send request to child: %d\n", i);
            }
            else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)){
                int sig;
 char signals[1024];
                ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
                if(ret <= 0)
                    continue;
                else {
                    for(int i=0; i<ret; ++i) {
                        switch(signals[i]) {
                            case SIGCHLD: {
                                pid_t pid;
                                int stat;
                                while((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
                                    for(int i=0; i<process_number_; ++i) {
                                //如果进程池中第i个子进程退出了,则主进程关闭相应的通信管道,并设置响应的pid_位-1,以标记该子进程已经退出
                                        if(sub_process_[i].pid_ == pid) {
                                            printf("child %d join\n", i);
                                            close(sub_process_[i].pipefd_[0]);
                                            sub_process_[i].pid_ = -1;  //标记为-1
                                        }
                                    }
                                }
                                //如果所有的子进程都退出了,则父进程也退出
                                stop_ = true;
                                for(int i=0; i<process_number_; ++i) {
                                    if(sub_process_[i].pid_ != -1)
                                        stop_ = false;
                                }
                                break;
                            }
                            case SIGTERM:
                            case SIGINT: {
                                //如果父进程收到终止信号,那么就杀死所有子进程,并等待它们全部结束。当然,通知子进程结束的更好
                                //方法是向父子进程之间的通信管道发送特殊数据
                                printf("kill all the child now\n");
                                for(int i=0; i<process_number_; ++i) {
                                    int pid = sub_process_[i].pid_;
 if(pid != -1) {
                                        kill(pid, SIGTERM);
                                    }
                                }
                                break;
                            }
                            default:
                                break;
                        }
                    }
                }
            }
            else {
                continue;
            }
        }
    }
    close(epollfd_);
}

#endif

服务器主程序:
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>

#include "process_pool.h"

class cgi_conn {
public:
    cgi_conn() : sockfd_(-1), read_idx_(-1) {
        memset(buf_, 0, sizeof(buf_));
    }   
public:
    void init(int epollfd, int sockfd, const sockaddr_in& client_addr) {
        epollfd_ = epollfd;
        sockfd_ = sockfd;
        address_ = client_addr;
        read_idx_ = 0;
    }   
    void process() {
        int idx = 0;
        int ret = 1;

        //循环读取和分析客户数据
 while(true) {
            idx = read_idx_;
            ret = recv(sockfd_, buf_+idx, BUFFER_SIZE-1-idx, 0);
            //如果读操作发生错误,则关闭客户链接,如果只是暂时无数据可读,则退出循环
            if(ret < 0) {
                if(errno != EAGAIN) {
                    removefd(epollfd_, sockfd_);
                }
                break;
            }
            //如果对方关闭,本服务器也关闭
            else if(ret == 0) {
                removefd(epollfd_, sockfd_);
                break;
            }
            else {
                read_idx_ += ret;
                printf("user content is: %s", buf_);
                //如果遇到字符CRLF,则开始处理客户请求
                for(; idx<read_idx_; ++idx) {
                    if((idx >= 1) && (buf_[idx-1] == '\r') && (buf_[idx] == '\n')) //这里查找CRLF采用简单遍历已读数据的方法
                        break;
                }
            }
            //如果没有遇到字符CRLF,则需要读取更多客户数据
            if(idx == read_idx_) {
                continue;
            }
            buf_[idx-1] = '\0';

            char* file_name = buf_;
            //判断客户要运行的CGI程序是否存在
            if(access(file_name, F_OK) == -1) {
                removefd(epollfd_, sockfd_);   //不存在就不连接了
                break;
   }

            //创建子进程来执行CGI程序
            ret = fork();
            if(ret == -1) {
                removefd(epollfd_, sockfd_);
                break;
            }
            else if(ret > 0) {
                //父进程只需关闭连接
                removefd(epollfd_, sockfd_);
                break;   //父进程break
            }
            else {
                //子进程将标准输出定向到sockfd_,并执行CGI程序
                close(STDOUT_FILENO);
                dup(sockfd_);
                execl(buf_, buf_, 0);
                exit(0);
            }
        }
    }
private:
    //读缓冲区的大小
    static const int BUFFER_SIZE = 1024;
    static int  epollfd_;
    int         sockfd_;
    sockaddr_in address_;
    char        buf_[BUFFER_SIZE];
    //标记缓冲区中已读入客户数据的最后一个字节的下一个位置
    int         read_idx_;
};

int cgi_conn::epollfd_ = -1;
int main(int argc, char** argv)
{
    if(argc <= 2) {
        printf("usage: %s ip_address port_number\n", basename(argv[0]));
        return -1;
    }

    const char* ip = argv[1];
    int port = atoi(argv[2]);

    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    assert(listenfd >= 0);

    int ret = 0;
    struct sockaddr_in address;
    memset(&address, 0, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);

    int on = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

    ret =bind(listenfd, (struct sockaddr*)&address, sizeof(address));
    if(ret == -1) {
        printf("what: %m\n");
        return -1;
    }

    ret = listen(listenfd, 5);
    assert(ret != -1);

    process_pool<cgi_conn>* pool = process_pool<cgi_conn>::create(listenfd);
    if(pool) {
        pool->run();
  delete pool;
    }

    close(listenfd);

    return 0;
}

CGI程序:
#include <stdio.h>

int main()
{
    printf("hello,client");
    return 0;
}

将CGI程序生成可执行文件,等候客户端请求。

客户端程序:
import socket  
  
  
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
server_address = ('192.168.33.147', 6666)  
sock.connect(server_address)  
  
message = 'hello\r\n'  
sock.sendall(message)

data = sock.recv(1024);
print data 

然后客户端就会输出,“hello,client”