Linux进程间通信_管道

时间:2024-01-26 22:28:22

进程间通信概述

在开发过程中,有时会需要进程间进行一些交流和互动,比如一个进程向另一个进程发送数据、发送命令、发送通知或进行某种协同,进程之间的这些行为,其实是在进行进程间通信

进程间通信,即是让多个进程之间可以实现数据层面的交互。由于进程的独立性,进程间通信的成本并不低。进程间通信,本质是让不同的进程的进程看到同一份“资源”,这份“资源”是系统中特定形式的内存空间,而且“资源”的提供者不能是任意一个进程,因为这会破坏进程的独立性,是不被允许的。所以这份共享资源的提供者是操作系统。进程通过访问这份共享资源来进行通信,本质是在访问操作系统,所以进程间通信只能通过系统调用来完成。

Linux进程间通信_管道_进程池

承上,操作系统为用户提供系统调用接口以支持进程间通信,通信时,从底层设计,到接口设计,都由操作系统完成。操作系统一般会有一个独立的通信模块,这个模块往往隶属于文件系统,被称为IPC通信模块。常见的进程间通信方式有管道通信、消息队列和共享内存等,本系列文章会逐一对这些方式进行或浅或深的讨论。

匿名管道 pipe

管道是一种基于文件的进程间通信方式,也是类 UNIX 系统中最古老的一种通信方式。管道(pipe)是一种内存级文件,它在内存中存在自己的 file_struct 等内核数据结构和缓冲区,但不在磁盘中存在。将管道文件作为公共资源,管道文件的缓冲区即是一块共享内存空间,借此可以进行进程间通信。规定一个管道的通信方向是单向的。

基本原理

一种管道通信方式为,借助一个未命名的内存级文件实现通信,这种方式即为匿名管道通信。由于这个匿名的文件只能在继承体系中被父进程和其他子进程同时看到,所以匿名管道只能支持具有亲缘关系的进程之间的通信

首先,父进程会以读、写方式分别打开一个内存级文件两次,此时就会有两个 struct file,指向同一个缓冲区和 inode;然后创建子进程,子进程的 task_struct 和 files_struct 拷贝自父进程,此时父、子进程可以看到同一份文件资源;最后根据需要的通信方向,关闭父、子进程的读端或写端,即可实现父、子进程之间的单向通信。这即是匿名管道通信的基本原理和流程。

匿名管道通信不拘泥于父、子进程之间,只要是具有血缘关系的进程,都可以看到同一份管道资源,并借此进行通信,但这种通信方式仍最常见于父、子进程之间。父进程打开内存级文件两次,是为了使父、子进程在读、写同一个缓冲区时更好地进行协同。虽然可以建立两个管道实现进程之间的双向通信,但是一般不建议这样做,因为有更好的方式做到这点。

系统调用接口

使用 pipe(2) 系统调用来分别以读和写方式打开一个匿名文件两次,即创建一个匿名管道。函数原型为:

#include <unistd.h>

int pipe(int pipefd[2]);
//成功返回0; 失败返回-1

pipe(2) 的参数是一个作为输出型参数的数组 pipefd,pipefd 有两个元素,分别是匿名文件的读、写 struct file 的文件描述符。父进程 fork 出子进程后,子进程即可看到同一个管道。因为管道的本质是文件,所以用文件读写的方式向管道内进行读写数据即可。下面是一个使用 pipe(2) 进行简单的父子进程间通信的demo代码。

#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <cstring>
#include <iostream>

void ChildRead(int fd)
{
    char msg[1024]; msg[0] = '\0';
    ssize_t s = read(fd, msg, sizeof(msg));
    if(s > 0)
    {
        msg[s] = '\0';
        std::cout << msg << std::endl;
    }
}

void FatherWrite(int fd)
{
    sleep(3); //父进程先休眠3秒
    char msg[1024] = "I am father";
    write(fd, msg, strlen(msg));
}

int main()
{
    int pipefd[2];
    int ret = pipe(pipefd); if(ret < 0) { perror("pipe err"); exit(1); }
    pid_t id = fork();
    if(id < 0) { perror("fork err"); exit(1); }
    else if(id == 0)
    {
        close(pipefd[1]); //子进程关闭写端
        ChildRead(pipefd[0]); //子进程读
    }
    else
    {
        close(pipefd[0]); //父进程关闭读端
        FatherWrite(pipefd[1]); //父进程写
        waitpid(id, nullptr, 0); //等待子进程
    }
    return 0;
}
/*
运行程序,3秒后输出:
I am father
*/

Linux进程间通信_管道_进程池_02

运行程序,在3秒后输出子进程写入管道的信息,这表明在前3秒父进程在等待子进程向管道中写入,说明父、子进程在通信过程中是会进行协同的,具体的协同表现会在下面匿名管道的特性部分讨论。

特性

通过了解匿名管道的原理和代码,可以知道匿名管道具有以下几点特性:

  • 管道通信是单向的,不管是匿名管道还是命名管道;
  • 进行管道通信的进程之间会进行协同;
  • 管道通信是面向字节流的,进行单纯的管道通信时,读端的进程无法确定写端进程所写数据的具体格式;
  • 匿名管道文件的生命周期随进程,进程退出,匿名管道会自动被回收
  • 具有亲缘关系的进程才能进行匿名管道通信。

除此之外,管道通信中还可能出现 4 种特殊情况:

  • 读写端正常,管道为空,此时读端阻塞;
  • 读写端正常,管道填满,此时写端阻塞;
  • 读端正常,写端关闭,此时 read 返回 0,表明已经读到管道文件的结尾;
  • 写端正常,读端关闭,此时的写操作已经没有意义,操作系统会通过 SIGPIPE 信号杀死写端进程。

这四种情况,一定程度上反映了读、写端进程之间的协同。

基于匿名管道的进程池demo

进程池是匿名管道的一个实际应用场景。池化技术允许用户提前申请好一批资源以便后续使用,避免了在需要使用这些资源时多次重复申请所带来的效率问题,是一种以空间换时间的典型做法。进程池可以提前创建一批进程,并在需要时将任务分配给这些进程,避免重复创建和销毁进程,节省资源消耗。

基于匿名管道的进程池,其工作方式为,父进程根据需要创建一批子进程,并通过匿名管道与这些子进程建立通信,这样就可以将任务派发给子进程进而让子进程执行。

Linux进程间通信_管道_进程池_03

首先对管道进行描述和封装,本文所示的进程池demo是单向的,不考虑子进程向父进程发送执行结果的情况,所以只封装父进程关心的写端的文件描述符,而不必考虑子进程读端的文件描述符。

struct channel
{
    channel(int fd, pid_t SlaveId, int masterId, std::string& SlaveName)
        :_fd(fd), _SlaveId(SlaveId), _masterId(masterId), _SlaveName(SlaveName)
    { }

    int _fd; //写端fd
    pid_t _SlaveId; //子进程id
    pid_t _masterId; //父进程pid
    std::string _SlaveName; //子进程名
};

使用 vector 对 channel 进行组织:

private:
    std::vector<channel> _process_pool;

当进程池被构造时,父进程会创建指定数量个子进程和管道,与子进程建立单向通信,并对管道进行包装和组织;子进程转而去对执行 read(2),此时管道中没有数据,子进程会阻塞,直到父进程派发任务

//constructor
ProcessPool()
{
  srand(time(nullptr)); //这里初始化随机数种子是为了后面测试
  std::vector<int> oldfds; //保存已经存在的读端fd,原因在后文解释
  for(int i = 0; i < process_count; ++i)
  {
    int pipefd[2]; //建立管道
    int ret = pipe(pipefd); if(ret < 0) { perror("InitProcessPool:pipe err"); exit(1); }
    int id = fork(); //创建子进程
    if(id < 0) { perror("InitProcessPool:fork err"); exit(1); }
    else if(id == 0)
    {
      //逐个关闭 oldfds 中的读端fd,原因在后文解释
      for(auto fd : oldfds) { close(fd); }
      close(pipefd[1]); //关闭写端
      dup2(pipefd[0], 0); //将读端重定向到0,方便后面子进程进行读操作
      ChildDoTask(); //子进程读取并执行任务
      exit(0);
    }
    else
    {
      close(pipefd[0]); //关闭读端
      oldfds.push_back(pipefd[1]);
      //封装并保存写端fd
      std::string SlaveName = "Slave No." + std::to_string(i);
      _process_pool.push_back(channel(pipefd[1], id, getpid(), SlaveName));
    }
  }
}

析构时,父进程关闭所有写端 fd,并对子进程进行回收即可:

~ProcessPool()
{
  for(int i = 0; i < process_count; ++i) {
    close(_process_pool[i]._fd);
    std::cout << "channel fd closed -- fd:" << _process_pool[i]._fd << std::endl;
    waitpid(_process_pool[i]._SlaveId, nullptr, 0);
    std::cout << "child clean -- pid:" << _process_pool[i]._SlaveId << std::endl;
  }
}

进程池向外提供两个主要接口:任务投放和主动销毁。进程池收到任务之后,这里采用随机选择 channel 的方式来保证子进程的负载均衡。为了避免资源耗尽,保证系统的稳定性,这里使用单例模式组织整个进程池的代码,对外提供getProcessPool的静态接口。进程池demo的完整代码如下:

const int process_count = 10; //默认子进程数量

struct channel
{
    channel(int fd, pid_t SlaveId, int masterId, std::string& SlaveName)
        :_fd(fd), _SlaveId(SlaveId), _masterId(masterId), _SlaveName(SlaveName)
    { }

    int _fd;
    pid_t _SlaveId;
    pid_t _masterId;
    std::string _SlaveName;
};

class ProcessPool
{
public:
    static ProcessPool* getProcessPool()
    {
        if(_singletonProcessPool == nullptr) {
            _singletonProcessPool = new ProcessPool;
        }
        return _singletonProcessPool;
    }

    //传入任务的编号,投放给进程池
    void TaskAllocation(int taskNo)
    {
        //考虑子进程的负载均衡:随机channel
        int ChannelNo = rand() % process_count;
        channel slaveChannel = _process_pool[ChannelNo];
        int no = taskNo;
        write(slaveChannel._fd, &no, sizeof(taskNo));
        std::cout << "Father allocate a task complete -- taskNo:" << taskNo << std::endl;
    }

    void Destroy()
    {
        std::cout << "ProcessPool Destroy" << std::endl;
        this->~ProcessPool();
    }

    ~ProcessPool()
    {
        for(int i = 0; i < process_count; ++i) {
            close(_process_pool[i]._fd);
            std::cout << "channel fd closed -- fd:" << _process_pool[i]._fd << std::endl;
            waitpid(_process_pool[i]._SlaveId, nullptr, 0);
            std::cout << "child clean -- pid:" << _process_pool[i]._SlaveId << std::endl;
        }
        //sleep(5);
    }

private:
    void ChildDoTask()
    {
        int taskNo = 0;
        while(true)
        {
            ssize_t s = read(0, &taskNo, sizeof(taskNo));
            if(s > 0)
            {
                std::cout << "Child get a task -- taskNo:" << taskNo << std::endl;
                taskList[taskNo]();
            }
            else if(s == 0) { break; }
            else { perror("ChildDoTask:read err"); exit(1); }
        }
    }

private:
    std::vector<channel> _process_pool;

private:
    static ProcessPool* _singletonProcessPool;
    
    ProcessPool(const ProcessPool& p_p) = delete;
    ProcessPool& operator=(const ProcessPool& p_p) = delete;

    ProcessPool()
    {
        srand(time(nullptr));
        std::vector<int> oldfds;
        for(int i = 0; i < process_count; ++i)
        {
            int pipefd[2];
            int ret = pipe(pipefd); if(ret < 0) { perror("InitProcessPool:pipe err"); exit(1); }
            int id = fork();
            if(id < 0) { perror("InitProcessPool:fork err"); exit(1); }
            else if(id == 0)
            {
                for(auto fd : oldfds) { close(fd); }
                close(pipefd[1]);
                dup2(pipefd[0], 0);
                ChildDoTask();
                exit(0);
            }
            else
            {
                close(pipefd[0]);
                oldfds.push_back(pipefd[1]);
                std::string SlaveName = "Slave No." + std::to_string(i);
                _process_pool.push_back(channel(pipefd[1], id, getpid(), SlaveName));
            }
        }
    }
};

ProcessPool* ProcessPool::_singletonProcessPool = nullptr;

在这个进程池demo中,如果在循环创建子进程和管道时不加调整,会造成这样的问题:从第一个子进程往后,每个子进程都会继承父进程对前面所有子进程的写端,造成下面的情景:

Linux进程间通信_管道_进程池_04

在这个情形下,每个子进程都有能力向前面的子进程发送信息,为了保证每个管道只有一个读端和一个写端的单向通信,要避免这种情况发生。除了逻辑上不合理之外,由于除了最后一个管道之外,每个管道都有其他子进程存在对其的读端,所以当父进程从前向后依次关闭读端 fd 时,这个管道文件并不能被立即释放,在析构时可能会造成问题。一种解决方式为,在创建子进程时,让子进程关闭所有自己继承的、已经存在的管道的写端。于是就有了构造函数中的 oldfds 数组和代码for(auto fd : oldfds) { close(fd); }

命名管道 fifo

基本原理

命名管道的原理与匿名管道类似,都是让不同进程看到同一个内存级文件资源来实现通信。不同的是,命名管道可以支持不相关的进程进行通信,这需要让不同的进程知道打开的是同一个文件,这一点可以通过自定义文件的路径和文件名实现。即命名管道通过 文件路径+文件名 让不同的进程之间看到同一份文件资源,以实现通信

系统调用接口

命名管道有两个主要接口:

#include <sys/types.h>
#include <sys/stat.h>

int mkfifo(const char *pathname, mode_t mode);
//成功返回0,失败返回-1

mkfifo(3) 是一个3号文档的C语言接口。创建一个内存级文件,pathname 为文件的路径和文件名,mode 为这个文件的权限。要通信的进程,打开的文件  pathname 必须相同。一般将mode默认设置为 0666 即可。

#include <unistd.h>

int unlink(const char *pathname);
//成功返回0,失败返回-1

unlink(2) 可以使一个文件的硬链接数减一,在管道通信中可以释放一个管道文件。

创建管道文件成功后,还需要用 open(2) 打开这个管道文件;close(2) 一个管道文件后,还需要用 unlink(2) 将这个管道文件删除。除此之外,对管道的读写直接使用文件系列的系统调用接口即可。

下面是一份 Server 进程和 Client 进程使用命名管道进行通信的demo代码:

/*
filename:comm.hpp
创建和删除管道文件
*/
Log log;

const char* FIFONAME = "file.fifo";

class FIFO
{
public:
    static FIFO* getInstance()
    {
        if(instance == nullptr) {
            instance = new FIFO();
        }
        return instance;
    }

    const char* getFifoName()
    {
        return _fifo_name;
    }

    ~FIFO()
    {
        unlink(_fifo_name);
        log(LOG, "FIFO closed");
    }

private:
    FIFO(const FIFO& fifo) = delete;
    FIFO& operator=(const FIFO& fifo) = delete;

    FIFO(const char* fifoName = FIFONAME)
        :_fifo_name(fifoName)
    {
        int ret = mkfifo(fifoName, 0666);
        if(ret < 0) { log(FATAL, "FIFO:mkfifo err"); exit(1); }
        log(LOG, "FIFO mked");
    }

private:
    const char* _fifo_name;

    static FIFO* instance;
};

FIFO* FIFO::instance = nullptr;
/*
filename:server.cpp
接收信息
*/
int main()
{
    char buff[1024]; buff[0] = '\0';
    //以只读打开管道文件
    int fd = open(FIFO::getInstance()->getFifoName(), O_RDONLY);
    while(true)
    {
        //读取管道
        ssize_t s = read(fd, buff, sizeof(buff));
        if(s > 0)
        {
            buff[s] = '\0';
            std::cout << "Server get a meg: " << buff << std::endl;
            log(LOG, "Server get a meg...");
        } //client进程退出后,s返回0,server停止读
        else if(s == 0) { break; }
        else { log(ERROR, "%s:line:%d:read err", __FILE__, __LINE__); exit(1); }
    }
    FIFO::getInstance()->~FIFO(); //删除管道文件
    log(LOG, "server quit");
    return 0;
}
/*
filename:client.cpp
发送信息
*/
int main()
{
    //以追加写方式打开管道文件
    int fd = open("file.fifo", O_APPEND | O_WRONLY);
    if(fd < 0) { log(FATAL, "%s:%d:open err", __FILE__, __LINE__); exit(1); }
    std::string buff;
    while(true)
    {
        std::cout << "please enter@ ";
        std::getline(std::cin, buff);
        //写入管道
        ssize_t s = write(fd, buff.c_str(), buff.size());
        if(s < 0) { log(ERROR, "%s:%d:write err", __FILE__, __LINE__); exit(1); }
        else { log(LOG, "Clicent send a msg..."); }
    }
    log(LOG, "clicent quit");
    return 0;
}

特性

相比匿名管道,命名管道有以下几点不同:

  • 命名管道支持不相关的进程之间的通信;
  • 命名管道文件需要被主动删除,否则进程退出后这个文件仍然会存在。

除此之外,命名管道拥有与匿名管道相同的特性,并且读写端之间存在协同。