《Linux高性能服务器编程》学习总结(十三)——多进程编程

时间:2022-01-11 17:56:12

  在多进程编程中,我们用fork系统调用创建子进程,值得注意的是,fork函数复制当前进程并在内核进程表中创建一个新的表项,其堆、栈指针,标志寄存器的值都和父进程相同,但是其ppid被设置成父进程pid,信号位图被清除。而子进程代码和父进程完全相同,其数据也会复制自父进程,但是其复制过程是写时复制,即父子任意进程对数据执行写操作时才会复制,首先是缺页中断,然后操作系统给子进程分配空间并复制数据。此外,创建子进程后父进程中打开的文件描述符在子进程中也是默认打开的,其文件描述符引用计数加一,父进程的用户根目录,当前工作目录等变量的引用计数均会加一。

  僵尸进程是多进程编程中需要了解和避免的情况,僵尸进程的产生是由于子进程先于父进程退出,而此时父进程没有正常接收子进程退出状态,导致子进程脱离父进程存在于操作系统中,还占据了原有的进程描述符和资源,造成了资源的浪费。避免僵尸进程的产生共有三种方法:第一种是处理SIGCHLD信号,当子进程退出时会向父进程发送SIGCHLD信号,我们可以指明信号处理函数来进行处理;第二种是拖管法,由父进程创建一个中间进程,再由这个中间进程创建子进程后直接退出,这样子进程变成了孤儿进程,会由init进程托管,由init进程负责回收其资源,但是这样就破坏了父子进程之间的血缘关系;第三种就是阻塞法,调用wait函数等待子进程退出,缺点就是会使父进程进入阻塞状态。对于wait函数,其进入阻塞状态显然是我们所不允许的,所以waitpid函数就解决了这个问题。

  在多进程编程中,一个最为重要的需要解决的问题就是进程间通信IPC,我们常用的IPC方法有:管道、信号量、共享内存和消息队列。其系统调用较为简单,我们只来简单说明一下这四种方式的区别和异同。首先管道是我们在前文就提到过的,而且讲过了再网络编程中经常使用socketpair函数创建一个双向管道,管道分为无名管道和有名管道,无名管道只能用于有亲缘关系的进程之间进行通信,而且只能用低级文件编程库中的读写函数,而有名管道就是创建一个管道文件,通过文件进行信息交流,所以没有亲缘关系的限制。信号量是用来实现多进程之间的互斥与同步的,其本质是一个整形的数,我们用pv操作来对其进行操作,如果是二进制的信号量,则可以用信号量保证对于临界资源来讲同一时刻只有一段代码能对其进行访问。共享内存是在内存空间创建或获取一段空间来让各个进程进行共享,通过这段空间进行信息交流,而这种方式就比较灵活,可以自定其数据结构,完成各式各样的功能。消息队列就是一个能存放消息的队列,各个进程将消息发送到消息队列中,并指明要发送的对象,其他程序可以直接监听这个队列,收取发给自己的消息。几种进程间通信的方式都很普遍,我们来看一个用共享内存实现的聊天室服务器程序:

  

  1 /*************************************************************************
  2     > File Name: 13-4.cpp
  3     > Author: Torrance_ZHANG
  4     > Mail: 597156711@qq.com
  5     > Created Time: Wed 14 Feb 2018 04:18:04 PM PST
  6  ************************************************************************/
  7 
  8 #include"head.h"
  9 using namespace std;
 10 
 11 #define USER_LIMIT 5
 12 #define BUFFER_SIZE 1024
 13 #define FD_LIMIT 65535
 14 #define MAX_EVENT_NUMBER 1024
 15 #define PROCESS_LIMIT 65536
 16 
 17 struct client_data {
 18     sockaddr_in address;
 19     int connfd;
 20     pid_t pid;
 21     int pipefd[2];
 22 };
 23 
 24 static const char* shm_name = "/my_shm";
 25 int sig_pipefd[2];
 26 int epollfd;
 27 int listenfd;
 28 int shmfd;
 29 char* share_mem = 0;
 30 client_data* users = 0;
 31 int* sub_process = 0;
 32 int user_count = 0;
 33 bool stop_child = false;
 34 
 35 int setnonblocking(int fd) {
 36     int old_option = fcntl(fd, F_GETFL);
 37     int new_option = old_option | O_NONBLOCK;
 38     fcntl(fd, F_SETFL, new_option);
 39     return old_option;
 40 }
 41 
 42 void addfd(int epollfd, int fd) {
 43     epoll_event event;
 44     event.data.fd = fd;
 45     event.events = EPOLLIN | EPOLLET;
 46     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
 47     setnonblocking(fd);
 48 }
 49 
 50 void sig_handler(int sig) {
 51     int save_errno = errno;
 52     int msg = sig;
 53     send(sig_pipefd[1], (char*)&msg, 1, 0);
 54     errno = save_errno;
 55 }
 56 
 57 void addsig(int sig, void(*handler)(int), bool restart = true) {
 58     struct sigaction sa;
 59     memset(&sa, 0, sizeof(sa));
 60     sa.sa_handler = handler;
 61     if(restart) sa.sa_flags |= SA_RESTART;
 62     sigfillset(&sa.sa_mask);
 63     assert(sigaction(sig, &sa, NULL) != -1);
 64 }
 65 
 66 void del_resource() {
 67     close(sig_pipefd[0]);
 68     close(sig_pipefd[1]);
 69     close(listenfd);
 70     close(epollfd);
 71     shm_unlink(shm_name);
 72     delete [] users;
 73     delete [] sub_process;
 74 }
 75 
 76 void child_term_handler(int sig) {
 77     stop_child = true;
 78 }
 79 
 80 int run_child(int idx, client_data* users, char* share_mem) {
 81     epoll_event events[MAX_EVENT_NUMBER];
 82     int child_epollfd = epoll_create(5);
 83     assert(child_epollfd != -1);
 84     int connfd = users[idx].connfd;
 85     addfd(child_epollfd, connfd);
 86     int pipefd = users[idx].pipefd[1];
 87     addfd(child_epollfd, pipefd);
 88     int ret;
 89     addsig(SIGTERM, child_term_handler, false);
 90 
 91     while(!stop_child) {
 92         int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
 93         if((number < 0) && (errno != EINTR)) {
 94             printf("epoll failure\n");
 95             break;
 96         }
 97         for(int i = 0; i < number; i ++) {
 98             int sockfd = events[i].data.fd;
 99             if((sockfd == connfd) && (events[i].events & EPOLLIN)) {
100                 memset(share_mem + idx * BUFFER_SIZE, 0, BUFFER_SIZE);
101                 ret = recv(connfd, share_mem + idx * BUFFER_SIZE, BUFFER_SIZE - 1, 0);
102                 if(ret < 0) {
103                     if(errno != EAGAIN) {
104                         stop_child = true;
105                     }
106                 }
107                 else if(ret == 0) {
108                     stop_child = true;
109                 }
110                 else {
111                     send(pipefd, (char*)&idx, sizeof(idx), 0);
112                 }
113             }
114             else if((sockfd == pipefd) && (events[i].events & EPOLLIN)) {
115                 int client = 0;
116                 ret = recv(sockfd, (char*)&client, sizeof(client), 0);
117                 if(ret < 0) {
118                     if(errno != EAGAIN) {
119                         stop_child = true;
120                     }
121                 }
122                 else if(ret == 0) {
123                     stop_child = true;
124                 }
125                 else {
126                     send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0);
127                 }
128             }
129             else continue;
130         }
131     }
132     close(connfd);
133     close(pipefd);
134     close(child_epollfd);
135     return 0;
136 }
137 
138 int main(int argc, char** argv) {
139     if(argc <= 2) {
140         printf("usage: %s ip_address port_number\n", basename(argv[0]));
141         return 1;
142     }
143     const char* ip = argv[1];
144     int port = atoi(argv[2]);
145 
146     int ret = 0;
147     struct sockaddr_in address;
148     bzero(&address, sizeof(address));
149     address.sin_family = AF_INET;
150     inet_pton(AF_INET, ip, &address.sin_addr);
151     address.sin_port = htons(port);
152 
153     listenfd = socket(AF_INET, SOCK_STREAM, 0);
154     assert(listenfd >= 0);
155 
156     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
157     assert(ret != -1);
158 
159     ret = listen(listenfd, 5);
160     assert(ret != -1);
161 
162     user_count = 0;
163     users = new client_data[USER_LIMIT + 1];
164     sub_process = new int[PROCESS_LIMIT];
165     for(int i = 0; i < PROCESS_LIMIT; i ++) {
166         sub_process[i] = -1;
167     }
168     epoll_event events[MAX_EVENT_NUMBER];
169     epollfd = epoll_create(5);
170     assert(epollfd != -1);
171     addfd(epollfd, listenfd);
172     
173     ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sig_pipefd);
174     assert(ret != -1);
175     setnonblocking(sig_pipefd[1]);
176     addfd(epollfd, sig_pipefd[0]);
177 
178     addsig(SIGCHLD, sig_handler);
179     addsig(SIGTERM, sig_handler);
180     addsig(SIGINT, sig_handler);
181     addsig(SIGPIPE, SIG_IGN);
182     bool stop_server = false;
183     bool terminate = false;
184 
185     shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
186     assert(shmfd != -1);
187     ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE);
188     assert(ret != -1);
189     
190     share_mem = (char*)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0);
191     assert(share_mem != MAP_FAILED);
192     close(shmfd);
193 
194     while(!stop_server) {
195         int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
196         if((number < 0) && (errno != EINTR)) {
197             printf("epoll failure\n");
198             break;
199         }
200         for(int i = 0; i < number; i ++) {
201             int sockfd = events[i].data.fd;
202             if(sockfd == listenfd) {
203                 struct sockaddr_in client_address;
204                 socklen_t client_addrlength = sizeof(client_address);
205                 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
206                 if(connfd < 0) {
207                     printf("errno is: %d\n", errno);
208                     continue;
209                 }
210                 if(user_count >= USER_LIMIT) {
211                     const char* info = "too many users\n";
212                     printf("%s", info);
213                     send(connfd, info, strlen(info), 0);
214                     close(connfd);
215                     continue;
216                 }
217                 users[user_count].address = client_address;
218                 users[user_count].connfd = connfd;
219                 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
220                 assert(ret != -1);
221                 pid_t pid = fork();
222                 if(pid < 0) {
223                     close(connfd);
224                     continue;
225                 }
226                 else if(pid == 0) {
227                     close(epollfd);
228                     close(listenfd);
229                     close(users[user_count].pipefd[0]);
230                     close(sig_pipefd[0]);
231                     close(sig_pipefd[1]);
232                     run_child(user_count, users, share_mem);
233                     munmap((void*)share_mem, USER_LIMIT* BUFFER_SIZE);
234                     exit(0);
235                 }
236                 else {
237                     close(connfd);
238                     close(users[user_count].pipefd[1]);
239                     addfd(epollfd, users[user_count].pipefd[0]);
240                     users[user_count].pid = pid;
241                     sub_process[pid] = user_count;
242                     user_count ++;
243                 }
244             }
245             else if((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN)) {
246                 int sig;
247                 char signals[1024];
248                 ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
249                 if(ret == -1) continue;
250                 else if(ret == 0) continue;
251                 else {
252                     for(int i = 0; i < ret; i ++) {
253                         switch(signals[i]) {
254                             case SIGCHLD: {
255                                 pid_t pid;
256                                 int stat;
257                                 while((pid = waitpid(-1, &stat, WNOHANG)) > 0) {
258                                     int del_user = sub_process[pid];
259                                     sub_process[pid] = -1;
260                                     if((del_user < 0) || (del_user > USER_LIMIT)) {
261                                         continue;
262                                     }
263                                     epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0);
264                                     close(users[del_user].pipefd[0]);
265                                     users[del_user] = users[-- user_count];
266                                     sub_process[users[del_user].pid] = del_user;
267                                 }
268                                 if(terminate && user_count == 0) {
269                                     stop_server = true;
270                                 }
271                                 break;
272                             }
273                             case SIGTERM:
274                             case SIGINT: {
275                                 printf("kill all the child now\n");
276                                 if(user_count == 0) {
277                                     stop_server = true;
278                                     break;
279                                 }
280                                 for(int i = 0; i < user_count; i ++) {
281                                     int pid = users[i].pid;
282                                     kill(pid, SIGTERM);
283                                 }
284                                 terminate = true;
285                                 break;
286                             }
287                             default : break;
288                         }
289                     }
290                 }
291             }
292             else if(events[i].events & EPOLLIN) {
293                 int child = 0;
294                 ret = recv(sockfd, (char*)&child, sizeof(child), 0);
295                 printf("read data from child accross pipe\n");
296                 if(ret == -1) continue;
297                 else if(ret == 0) continue;
298                 else {
299                     for(int j = 0; j < user_count; j ++) {
300                         if(users[j].pipefd[0] != sockfd) {
301                             printf("send data to child accross pipe\n");
302                             send(users[j].pipefd[0], (char*)&child, sizeof(child), 0);
303                         }
304                     }
305                 }
306             }
307         }
308     }
309     del_resource();
310     return 0;
311 }

《Linux高性能服务器编程》学习总结(十三)——多进程编程

  服务器接收到客户端数据时,通过管道告知专门为其他服务器服务的子进程向对应客户端发送数据。

  我们知道,当父进程创建子进程之后,父进程中打开的文件描述符仍然保持打开,所以文件描述符可以很方便地从父进程传递到子进程。需要特别注意的是进程描述符的传递并不是单纯地传送一个值,而是要在接收进程中创建一个新的文件描述符,并且该文件描述符和发送进程中被传递的文件描述符指向内核中的同一个文件表项。那么问题就来了,我们如何从子进程将文件描述符传送给父进程呢?推广来说,如何在不相关的两个进程之间传送文件描述符呢?这时我们就需要利用UNIX域socket在进程间传递特殊的辅助数据,我们来看一个例子:

 1 /*************************************************************************
 2     > File Name: 13-5.cpp
 3     > Author: Torrance_ZHANG
 4     > Mail: 597156711@qq.com
 5     > Created Time: Tue 27 Feb 2018 03:35:13 AM PST
 6  ************************************************************************/
 7 
 8 #include"head.h"
 9 using namespace std;
10 
11 static const int CONTROL_LEN = CMSG_LEN(sizeof(int));
12 
13 void send_fd(int fd, int fd_to_send) {
14     struct iovec iov[1];
15     struct msghdr msg;
16     char buf[0];
17 
18     iov[0].iov_base = buf;
19     iov[0].iov_len = 1;
20     msg.msg_name = NULL;
21     msg.msg_namelen = 0;
22     msg.msg_iov = iov;
23     msg.msg_iovlen = 1;
24 
25     cmsghdr cm;
26     cm.cmsg_len = CONTROL_LEN;
27     cm.cmsg_level = SOL_SOCKET;
28     cm.cmsg_type = SCM_RIGHTS;
29     *(int *)CMSG_DATA(&cm) = fd_to_send;
30     msg.msg_control = &cm;
31     msg.msg_controllen = CONTROL_LEN;
32 
33     sendmsg(fd, &msg, 0);
34 }
35 
36 int recv_fd(int fd) {
37     struct iovec iov[1];
38     struct msghdr msg;
39     char buf[0];
40 
41     iov[0].iov_base = buf;
42     iov[0].iov_len = 1;
43     msg.msg_name = NULL;
44     msg.msg_namelen = 0;
45     msg.msg_iov = iov;
46     msg.msg_iovlen = 1;
47     
48     cmsghdr cm;
49     msg.msg_control = &cm;
50     msg.msg_controllen = CONTROL_LEN;
51     recvmsg(fd, &msg, 0);
52 
53     int fd_to_read = *(int*)CMSG_DATA(&cm);
54     return fd_to_read;
55 }
56 
57 int main() {
58     int pipefd[2];
59     int fd_to_pass = 0;
60     int ret = socketpair(AF_UNIX, SOCK_DGRAM, 0, pipefd);
61     assert(ret != -1);
62 
63     pid_t pid = fork();
64     assert(pid >= 0);
65 
66     if(pid == 0) {
67         close(pipefd[0]);
68         fd_to_pass = open("test.txt", O_RDWR, 0666);
69         send_fd(pipefd[1], (fd_to_pass > 0) ? fd_to_pass : 0);
70         close(fd_to_pass);
71         exit(0);
72     }
73 
74     close(pipefd[1]);
75     fd_to_pass = recv_fd(pipefd[0]);
76     char buf[1024];
77     memset(buf, 0, sizeof(buf));
78     read(fd_to_pass, buf, 1024);
79     printf("I got fd %d and data %s\n", fd_to_pass, buf);
80     close(fd_to_pass);
81 }

《Linux高性能服务器编程》学习总结(十三)——多进程编程