进程间通信(Process To Process Communication)是指在并行计算过程中,各进程之间进行数据交互或消息传递,其通信量的大小主要取决于并行设计的粒度划分和各个执行进程之间的相对独立性。也就是在多进程环境下,使用的数据交互、事件通知等方法使各进程协同工作。
进程通信的目的
-
数据传输:一个进程需要将它的数据发送给另一个进程
-
资源共享:多个进程之间共享同样的资源。
-
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止 时要通知父进程)。
-
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。
进程间的通信:本质就是让不同的进程,看向同一份资源
进程通信方式一:
管道通信:
管道是Unix中最古老的进程间通信的形式。
我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”
匿名管道:
用法: pipe函数 传入一个数组 返回一个数组 里面包含两个数字 即是文件描述符 0是读写 1是写入
pipe函数用来创建一个无名管道,这个管道本质是个伪文件即内核缓冲区,通常大小为4KB 供给两个进程之间的数据交互
1.用pipe函数模拟两个进程之间交互数据
#include <iostream> #include <vector> #include <cstdio> #include <cstring> #include <unordered_map> #include <ctime> #include <cstdlib> #include <sys/wait.h> #include <sys/types.h> #include <unistd.h> #include <cassert> #include <string> using namespace std; typedef void (*functor)();//函数指针 vector<functor> functors;//存的是函数指针类型 unordered_map<uint32_t, string> info; void f1() { cout<<"这是一个处理日志的任务,执行的进程 ID ["<<getpid()<<"]" <<"执行时间是["<<time(nullptr)<<"]\n"<<endl; } void f2() { cout<<"这是一个备份数据的任务,执行的进程 ID ["<<getpid()<<"]" <<"执行时间是["<<time(nullptr)<<"]\n"<<endl; } void f3() { cout<<"这是一个处理网络连接的任务,执行的进程 ID ["<<getpid()<<"]" <<"执行时间是["<<time(nullptr)<<"]\n"<<endl; } void loadFunctor() { //对每个任务进行描述 info.insert({functors.size(),"处理日志的任务"}); functors.push_back(f1);//里面存的并不是函数 而是指针 每个指针指向对应的函数 info.insert({functors.size(),"备份数据的任务"}); functors.push_back(f2); info.insert({functors.size(),"处理网络连接的任务"}); functors.push_back(f3); } //pipe通信 ---匿名管道 int main() { //1.创建管道 int pipefd[2]={0}; if(pipe(pipefd)!=0)//如果创建失败 { cerr<<"pipe error"<<endl; return 1; } //2.创建子进程 pid_t id=fork(); if(id<0)//如果创建失败 { cerr<<"fork error"<<endl; return 2; } else if(id==0)//当是子进程时 { #define NUM 1024 char buffer[NUM]; //子进程用来进程读取 那么就关掉它的写 close(pipefd[1]); while(1) { memset(buffer,0,sizeof(buffer));//把数组内全部修改为0 ssize_t s=read(pipefd[0],buffer,sizeof(buffer)-1);//从pipefd 0下标读到buffer内 \0不用读 if(s>0) { //读取成功 buffer[s]='\0'; cout<<"子进程收到信息,内容是:\n"<<buffer; } else if(s==0) { cout<<"父进程写完了,我也退出了"<<endl; break; } else { //位置错误 } } close(pipefd[0]);//当都读完时,就关闭读 exit(0); } else { //父进程用来写 就要关掉读 close(pipefd[0]); const char *msg="你好子进程,我是父进程.\n"; int cnt=0; while(cnt<5) { write(pipefd[1],msg,strlen(msg)); cnt++; } //写完就关闭写 close(pipefd[1]); cout<<"父进程写完了"<<endl; } pid_t res=waitpid(id,nullptr,0); if(res>0) { cout<<"等待子进程成功"<<endl; } return 0; }
2.父进程控制子进程
#include <iostream> #include <vector> #include <cstdio> #include <cstring> #include <unordered_map> #include <ctime> #include <cstdlib> #include <sys/wait.h> #include <sys/types.h> #include <unistd.h> #include <cassert> #include <string> using namespace std; typedef void (*functor)();//函数指针 vector<functor> functors;//存的是函数指针类型 unordered_map<uint32_t, string> info; void f1() { cout<<"这是一个处理日志的任务,执行的进程 ID ["<<getpid()<<"]" <<"执行时间是["<<time(nullptr)<<"]\n"<<endl; } void f2() { cout<<"这是一个备份数据的任务,执行的进程 ID ["<<getpid()<<"]" <<"执行时间是["<<time(nullptr)<<"]\n"<<endl; } void f3() { cout<<"这是一个处理网络连接的任务,执行的进程 ID ["<<getpid()<<"]" <<"执行时间是["<<time(nullptr)<<"]\n"<<endl; } void loadFunctor() { //对每个任务进行描述 info.insert({functors.size(),"处理日志的任务"}); functors.push_back(f1);//里面存的并不是函数 而是指针 每个指针指向对应的函数 info.insert({functors.size(),"备份数据的任务"}); functors.push_back(f2); info.insert({functors.size(),"处理网络连接的任务"}); functors.push_back(f3); } //父进程控制子进程 int main() { //0.加载任务列表 loadFunctor(); //1.创建管道 int pipefd[2]={0}; if(pipe(pipefd)!=0)//创建失败时.. { cerr<<"pipe error"<<endl; return 1; } //2.创建子进程 pid_t id=fork(); if(id<0)//创建失败时.. { cerr<<"fork error"<<endl; return 2; } else if(id==0)//子进程 { //3.关闭不需要的文件描述符fd //子进程要读 所以关闭写 close(pipefd[1]); //4.业务处理 while(true) { uint32_t operatorType=0; //如果有数据,就读取,如果没有数据,就阻塞等待,等待任务的到来 ssize_t s=read(pipefd[0],&operatorType,sizeof(uint32_t)); if(s==0) { cout<<"父进程结束写入 子进程也结束读取"<<endl; break; } assert(s==sizeof(uint32_t)); //assert断言 是编译有效 debug模式 //release模式 断言就没有 //一旦断言没有,s变量就被定义 没有使用 relseasemos中,就可能会有错误 //void(s); //当取到的数在正常范围时 if(operatorType<functors.size()) { functors[operatorType]();//调用对应的函数指针 } else//当取到的数不在正常范围时 { cerr<<"bug? operaotrType="<<operatorType<<std::endl; } } close(pipefd[0]);//停止读取 关闭 exit(0); } else//父进程 { srand((long long)time(nullptr)); //关闭不需要的文件描述符 //父进程写入 所以关闭读取 close(pipefd[0]); //4.指派任务 int num=functors.size(); int cnt=10; while(cnt--) { //5.形成任务码 uint32_t commandCode=rand()%num; cout<<"父进程指派任务完成,任务是:"<<info[commandCode] <<"任务的编号:"<<cnt<<endl; //派发任务 //向文件缓冲区内写入uin32_t类型的整数 write(pipefd[1],&commandCode,sizeof(uint32_t)); sleep(1); } cout<<"父进程结束写入"<<endl; //父进程写一次 子进程读一次 close(pipefd[1]); pid_t res=waitpid(id,nullptr,0); if(res) cout<<"wait success"<<endl; } return 0; }
3.父进程控制多条进程
#include <iostream> #include <vector> #include <cstdio> #include <cstring> #include <unordered_map> #include <ctime> #include <cstdlib> #include <sys/wait.h> #include <sys/types.h> #include <unistd.h> #include <cassert> #include <string> using namespace std; typedef void (*functor)();//函数指针 vector<functor> functors;//存的是函数指针类型 unordered_map<uint32_t, string> info; void f1() { cout<<"这是一个处理日志的任务,执行的进程 ID ["<<getpid()<<"]" <<"执行时间是["<<time(nullptr)<<"]\n"<<endl; } void f2() { cout<<"这是一个备份数据的任务,执行的进程 ID ["<<getpid()<<"]" <<"执行时间是["<<time(nullptr)<<"]\n"<<endl; } void f3() { cout<<"这是一个处理网络连接的任务,执行的进程 ID ["<<getpid()<<"]" <<"执行时间是["<<time(nullptr)<<"]\n"<<endl; } void loadFunctor() { //对每个任务进行描述 info.insert({functors.size(),"处理日志的任务"}); functors.push_back(f1);//里面存的并不是函数 而是指针 每个指针指向对应的函数 info.insert({functors.size(),"备份数据的任务"}); functors.push_back(f2); info.insert({functors.size(),"处理网络连接的任务"}); functors.push_back(f3); } typedef pair<int32_t,int32_t> elem; int processNum=1000; void work(int blockFD) { cout<<"进程["<<getpid()<<"]"<<"开始工作"<<endl; //子进程核心工作的代码 while(true) { uint32_t operaotrCode=0; //阻塞等待 等待父进程写入数据 ssize_t s=read(blockFD,&operaotrCode,sizeof(uint32_t)); //获取到的数据存入operaotrCode if(s==0)break; assert(s==sizeof(uint32_t)); (void)s; //处理任务 if(operaotrCode<functors.size()) { functors[operaotrCode](); } } cout<<"进程["<<getpid()<<"]"<<"结束工作"<<endl; } void blanceSendTask(const vector<elem> &processFDs) { srand((long long)time(nullptr)); while(true) { sleep(1); //选择一个进程 processFDs内是存储进程个数和信息的 uint32_t pick=rand()%processFDs.size(); //选择一个任务 uint32_t task=rand()%functors.size(); //把任务给指定进程 并分配任务 write(processFDs[pick].second,&task,sizeof(uint32_t)); //打印对应的提示信息 cout<<"父进程指派任务->"<<info[task] <<"给进程:"<<processFDs[pick].first <<"编号: "<<pick<<endl; } } //父进程控制多条进程 int main() { //加载内存 loadFunctor(); //存储进程的信息 类型为pair pair的first为进程的id second为进程写的文件描述符 vector<elem> assignMap; //创建processNum个进程 for(int i=0;i<processNum;i++) { //保管fd的对象 int pipefd[2]={0}; //创建管道 if(pipe(pipefd)!=0) { cerr<<"pipe ?"<<endl; return 1; } pid_t id=fork(); if(id==0) { //子进程读取 所以关闭写入 close(pipefd[1]); //执行任务 work(pipefd[0]); //执行后 关闭读 close(pipefd[0]); exit(0); } //父进程 //做写入 所以关闭读 close(pipefd[0]); //用pair存储进程的信息 id和他写的文件描述符 elem e(id,pipefd[1]); //并且把pair同一放在一个位置 进行管理 assignMap.push_back(e); } cout<<"n个进程已经全部生成,并且全部备份完毕"<<endl; //父进程 派发任务 blanceSendTask(assignMap); //回收资源 for(int i=0;i<processNum;i++) { if(waitpid(assignMap[i].first,nullptr,0)>0)//等待进程 并且成功 { cout<<"wait for:pid="<<assignMap[i].first<<"wait success!" <<"number: "<<i<<endl; close(assignMap[i].second); } } return 0; }
管道读写规则
当管道内为空是 读会等待写的写入 即 pipe内部自带访问控制机制 同步和互斥机制
管道的特点
- 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创 建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
- 管道提供流式服务
- 一般而言,进程退出,管道释放,所以管道的生命周期随进程
- 一般而言,内核会对管道操作进行同步与互斥
- 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道
- 半双公是一方读/写 全双公是一方同时可以读写。
命名管道:
mkfifo函数 使用文件名创建FIFO。但FIFO也是文件,是个特殊类型的文件。允许两个进程进行交互数据的特殊文件,默认权限为0666但有umask过滤。
命名管道与匿名管道的区别
- 匿名管道只能让有血缘关系的进程之间通信,而命名管道能让无血缘关系的两个进程之间通信
- 匿名管道由pipe函数提供的数组内的文件描述符打开。
- 命名管道由mkfifo函数创建,本质也是文件,打开用open。
- FIFO(命名管道)与pipe(匿名管道)之间唯一的区别在它们创建与打开的方式不同,一但这些工作完 成之后,它们具有相同的语义。
- 虽然命名和匿名都叫管道,但是他们的管道是不一样的。命名管道是个文件,特殊的文件FIFO,而匿名的管道是个伪文件即内核缓冲区。
用mkfifo函数实现两个进程交互数据
comm.h文件
#pragma once #include <iostream> #include <cstdio> #include <string> #include <cstring> #include <cerrno> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> #define IPC_PATH "./.fifo"//提供一个文件
serveFifo.cpp
#include "comm.h" using namespace std; //写入 int main() { umask(0); if(mkfifo(IPC_PATH,0600)!=0)//提供一个文件 创建FIFO //命名管道 路径,权限 { cerr<<"mkfifo error"<<endl; return 1; } //返回文件描述符 int pipeFd=open(IPC_PATH,O_RDONLY);//以读的方式打开文件 if(pipeFd<0) { cerr<<"open fifo error"<<endl; return 2; } #define NUM 1024 char buffer[NUM]; while(true) { ssize_t s=read(pipeFd,buffer,sizeof(buffer)-1);//从指定文件描述符 //,读取,放入到buffer内 if(s>0) { buffer[s]='\0'; cout<<"客户端->服务器"<<buffer<<endl; } else if(s==0) { cout<<"客户端退出 服务器退出"; break; } else { cout<<"read: "<<strerror(errno)<<endl; break; } } close(pipeFd); cout<<"客户端退出"<<endl; unlink(IPC_PATH);//关闭指定文件 return 0; }
clientFifo.cpp
#include "comm.h" using namespace std; //读取 int main() { int pipeFd=open(IPC_PATH,O_WRONLY); //打开文件路径,以写的方式打开 if(pipeFd<0) { cerr<<"open: "<<strerror(errno)<<endl; return 1; } #define NUM 1024 char line[NUM]; while(true) { printf("服务器->客户端:"); fflush(stdout);//刷新缓冲区 memset(line,0,sizeof(line)); if(fgets(line,sizeof(line),stdin)!=nullptr)//从键盘读取 放入到line { line[strlen(line)-1]='\0';//去掉回车 write(pipeFd,line,strlen(line));//从指定文件描述符写入 把line的数据写入到文件 } else//读取失败时 结束 { break; } } close(pipeFd);//关闭文件 cout<<"客户端退出"<<endl; return 0; }
进程通信方式二:
共享内存
共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到 内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据
选项
- IPC_CREAT:创建共享内存,如果存在,就获取,不存在,就创建。
- IPC_EXCL:不单独使用,必须和IPC_CREAT配合,如果不存在指定的共享内存,就创建,如果存在,就出错返回.
- IPC_RMID:删除共享内存段
shmget函数
创建共享内存
int shmget(key_t key, size_t size, int shmflg);
- key共享内存的标识符 共享内存段的名字
- size共享内存的大小 共享内存的单位4KB为单位,例:我创建了4KB以内的共享内存,但操作系统依旧给我提供了4KB的空间,但我依旧只能使用我创建的空间。如果我创建了4KB以上8KB以下的空间,那么操作系统会提供8KB的空间,但我依旧只能使用我创建的空间。4KB*N
- shmflg 选项
返回值是共享内存段的标识符 让不同进程能使用相同的共享内存 就靠标识符 失败返回-1
shmat函数
将共享内存段链接到进程地址空间
void *shmat(int shmid, const void *shmaddr, int shmflg);
- shmid:共享内存标识符
- shmaddr:指定链接的地址
- shmflg:选项
返回值是一个指针,指向共享内存第一个节,失败返回-1
- shmaddr为NULL,核心自动选择一个地址
- shmaddr不为NULL且shmflg无SHM_RND标记,则以shmaddr为连接地址。
- shmaddr不为NULL且shmflg设置了SHM_RND标记,则连接的地址会自动向下调整为SHMLBA的整数倍。公式:shmaddr - (shmaddr % SHMLBA)
- shmflg=SHM_RDONLY,表示连接操作用来只读共享内存
shmdt函数
将共享内存段与当前进程脱离
int shmdt(const void *shmaddr);
- shmdaddr 由shmat返回的指针
返回值 成功返回0 失败返回-1
将共享内存段与进程脱离,不代表删除共享内存
shmctl函数
用于控制共享内存
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
- shmid:共享内存标识符
- cmd:选项
- buf:指向一个保存着共享内存的模式状态和访问权限的数据结构
返回值:成功返回0,失败返回-1
ftok函数
系统建立IPC通讯(如消息队列、共享内存时)必须指定一个ID值。通常情况下,该id值通过ftok函数得到
key_t ftok(const char *pathname, int proj_id);
- const char *pathname:用于产生key_t值的文件名(文件必须存在),
- int proj_id:proj_id :是子序号,虽然为int,但是只有8个比特被使用(0-255)
用共享内存实现两个进程同时使用共享内存
comm.hpp
#include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/stat.h> #include <fcntl.h> #include "log.hpp" #define PATH_NAME "/home/moxuan/"//路径 #define PROJ_ID 0x14//随便一个值 #define MEM_SIZE 4096 key_t CreateKey() { key_t key=ftok(PATH_NAME,PROJ_ID); //给定一个路径,和一个值 通过算法创建一个不会重复的key值 if(key<0) { std::cerr<<"ftok: "<<strerror(errno)<<std::endl; exit(1); } return key; }
log.hpp
#pragma once #include<iostream> #include<ctime> std::ostream &Log() { //自己写一个打印cout 现在前面打印下面这些 最后在打印要打印的东西 std::cout<<"Fot Debug |"<<" timestamp "<<(uint64_t)time(nullptr)<<" | "; return std::cout; }
IpcShmCli.cpp
#include"Comm.hpp" #include"log.hpp" #include<cstdio> #include<unistd.h> using namespace std; int main() { //使用共享内存的角色 //获取相同的key值 key_t key=CreateKey(); Log()<<"key: "<<key<<endl; //获取共享内存 int shmid=shmget(key,MEM_SIZE,IPC_CREAT); if(shmid<0) { Log()<<"shmget: "<<strerror(errno)<<endl; return 2; } //将共享内存与当前进程挂接 char *str=(char*)shmat(shmid,nullptr,0); //使用... sleep(3); //去关联 shmdt(str); //在使用者的角度 你不需要去关闭共享内存 你不用了 直接去关联即可 return 0; }
IpcShmSer.cpp
#include"Comm.hpp" #include"log.hpp" #include<unistd.h> using namespace std; const int flags=IPC_CREAT|IPC_EXCL; //在创建者的角度 需要删除共享内存 //共享内存不会随进程而销毁,而是需要手动去释放 int main() { key_t key=CreateKey();//获取key Log()<<"key: "<<key<<endl; int shmid=shmget(key,MEM_SIZE,flags|0666); //创建共享内存 key值 大小 操作 权限(可有可无 没写默认为0) 尽量要写权限 if(shmid<0)//-1时创建失败 { Log()<<"shmget "<<strerror(errno)<<endl; return 2; } Log()<<"create shm success,shmid: "<<shmid<<endl; //将共享内存与进程长产生关联 char *str=(char*)shmat(shmid,nullptr,0); Log()<<"attach shm:"<<shmid<<"success"<<endl; //用.. sleep(5); //去关联 shmdt(str); //传入chmat返回的指针即可 Log()<<"detach shm: "<<shmid<<"success"<<endl; //删除共享内存 shmctl(shmid,IPC_RMID,nullptr); Log()<<"delete shm: "<<shmid<<" success"<<endl; return 0; }
随后先执行Ser创建共享内存,再执行Cli去使用共享内存
通过命令 ipcs -m 可以发现连接的进程由0->1->2->1->0
原来没创建共享内存,连接数为0 随后创建了共享内存并连接了1个,再使用共享内存连接了1个,使用后,分别调开两个连接,变0,再删除共享内存。
注意:共享内存与malloc和new不同,共享内存不会随着进程结束而删除,必须手动删除
- 命令行删除: ipcrm -m shmid
- shmctl函数删除