【Linux】匿名管道通信场景——进程池

时间:2024-12-02 08:06:05
???? 个人主页:大耳朵土土垚
???? 所属专栏:Linux系统编程

这里将会不定期更新有关Linux的内容,欢迎大家点赞,收藏,评论????????????????????

文章目录

  • 1. 初始化进程池
  • 2. 进程池执行任务
    • 2.1 任务管理
    • 2.2 执行任务
  • 3. 清理进程池
  • 4. 封装与完整实现
  • 5. 结语

1. 初始化进程池

  进程池的实现是依靠匿名管道,通过进程间通信使得父进程能够管理多个进程任务,相当于父进程拥有了很多个进程——进程池,通过不同的进程完成指定的任务。
  所以我们需要创建多个匿名管道和子进程,进行进程间通信,发送信息给子进程让它们根据接收到的信息处理相关任务。
  因为有多个管道和子进程,为了方便父进程使用不同管道发送对应信息给子进程,我们需要将管道的文件描述符以及对应子进程的pid保存起来,我们选择将它们封装在一个Channel类中。又因为有多个匿名管道和子进程,所以将多个Channel类对象储存在C++STL中的容器vector中来方便父进程进行管理进程池。

代码如下:

int InitProcesspool(int num,std::vector<Channel>& channels)
{
    for(int i = 0; i < num; i++)//使用循环创建多个匿名管道和子进程
    {
        //1.创建匿名管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if(n < 0) return 2;//根据不同的返回值判断原因,也可以使用枚举来约定返回值代表的内容

        //2.创建子进程
        pid_t id = fork();
        if(id < 0) return 3;

        //3.建立通信管道,父子进程关闭读端或写端
        if(id == 0)//子进程
        {
            //子进程读取,关闭写端
            ::close(pipefd[1]);
            //dup2
            dup2(pipefd[0],0);
            //子进程需要执行的内容
            Work();
            ::exit(0);
        }

        //父进程
        //父进程写入,关闭读端
        ::close(pipefd[0]);
        channels.emplace_back(pipefd[1],id);//保存在channel对象中并存入vector
    }
    return 0;
}

对子进程内部,我们使用dup2系统调用将匿名管道读端文件描述符与标准输入stdin交换,这样我们就不需要保存不同进程对应匿名管道的读端文件描述符,只需要统一从0号文件描述符中读取内容即可。

对于Channel类

class Channel{
public:
    Channel(int fd,pid_t who):_fd(fd),_who(who)
    {
        _name = "Channel-"+std::to_string(fd)+"-"+std::to_string(who);
    }

    std::string GetName()
    {
        return _name;
    }
    int GetFd()
    {
        return _fd;
    }
    pid_t GetWho()
    {
        return _who;
    }
    void Send(int num)//父进程往匿名管道发送信息
    {
        ::write(_fd,&num,sizeof(num));
    }
    ~Channel()
    {

    }
private:
    int _fd;//保存匿名管道通信的文件描述符
    std::string _name;//名字(自己取的)
    pid_t _who;//子进程pid
};

对于父进程发送给子进程的信息我们选择约定一个数字对应一个任务,不同数字对应不同需要完成的任务,子进程接收到信息后就可以根据数字来确定不同的任务。

2. 进程池执行任务

2.1 任务管理

  执行任务之前我们需要先确定有哪些任务,怎么执行…所以我们需要进行任务管理,同样我们也是使用一个类TaskManager来进行任务管理:

#include<iostream>
#include<unordered_map>
#include<functional>
#include<ctime>

using task_t = std::function<void()>;//函数指针


//不同任务函数
    void Load()
    {
        std::cout<<"正在执行加载任务..."<<std::endl;
    }
    void Del()
    {
        std::cout<<"正在执行删除任务..."<<std::endl;
    }
    void Log()
    {
        std::cout<<"正在执行日志任务..."<<std::endl;
    }

static int number = 0;
class TaskManager
{
public:
    TaskManager()
    {
        srand(time(nullptr));
        InsertTask(Load);
        InsertTask(Del);
        InsertTask(Log);
    }
    int SelectTask()
    {
        return rand()%number;
    }
    void InsertTask(task_t t)
    {
        m[number++] = t;
    }
    void Excute(int num)
    {
        if(m.find(num) == m.end())
            return;
        m[num]();//执行任务
    }
    ~TaskManager()
    {
    }
private:
    std::unordered_map<int,task_t> m;//使用map封装数字与对应的任务函数指针
};

TaskManager tm;

  选择新创建一个源文件Task.hpp来封装上述内容,上述任务管理类中我们使用map来保存数字与任务函数指针的相关关系,这样通过数字就可以确定是哪一个任务函数;此外选择任务使用的方法是随机数的方法,大家可以根据自己的想法确定不同的方式。

2.2 执行任务

  • 发送任务

使用按顺序轮询的方式派发任务给不同的子进程——设置10次任务循环,先通过任务管理类中的选择函数获取任务编号,然后通过父进程进程池管理类将任务编号发送给子进程。

void ExcuteTask(std::vector<Channel>& channels)
{
    int n = 0;
    int count = 10;
    while(count--)//执行10次任务
    {
        //1.选择任务,获取任务编号
        int tasknum = tm.SelectTask();

        //2.选择子进程,使用轮询选择,派发任务
        channels[n++].Send(tasknum);
        n%=channels.size();

        std::cout<<std::endl;
        std::cout<<"*****成功发送"<<10-count<<"个任务*****"<<std::endl;
        
        sleep(2);//每个2s发送一个任务

    }
}

  • 接受并执行任务

子进程接受并执行任务——先通过匿名管道接受父进程发送的任务编号,然后通过任务管理类对象执行任务编号所对应的任务函数。

//子进程接受并执行任务
void Work()
{
    while(true)
    {
        int num = 0;
        int n = ::read(0,&num,sizeof(num));
        if(n == sizeof(num))//读取成功
            tm.Excute(num);//不要发成n
        else if(n == 0)
        {
            break;
        }
        else
        {
            break;
        }
    }
}

3. 清理进程池

我们需要回收匿名管道的文件描述符和子进程

void CleanProcesspool(std::vector<Channel>& channels)
{
    for(auto& c : channels)
        ::close(c.GetFd());
    for(auto& c : channels)
    {
        pid_t rid = ::waitpid(c.GetWho(),nullptr,0);
        if(rid <= 0)
        {
            std::cout<<std::endl;
            std::cout<<"清理子进程失败..."<<std::endl;
            return;
        }
    }

    std::cout<<std::endl;
    std::cout<<"成功清理子进程..."<<std::endl;
}

  注意这里不能使用一个循环来进行清理,如下面代码是错误的:

void CleanProcesspool(std::vector<Channel>& channels)
{
    for(auto& c : channels)//只使用一次循环
    {
    	::close(c.GetFd());
        pid_t rid = ::waitpid(c.GetWho(),nullptr,0);
        if(rid <= 0)
        {
            std::cout<<std::endl;
            std::cout<<"清理子进程失败..."<<std::endl;
            return;
        }
    }

    std::cout<<std::endl;
    std::cout<<"成功清理子进程..."<<std::endl;
}

这是因为在创建子进程时,子进程会继承父进程的文件描述符表,因此在第一个匿名管道创建后,例如父进程的4号文件描述符指向该匿名管道写端,那么在创建第二个子进程时,该子进程会继承4号文件描述符也指向第一个匿名管道写端,因此创建的子进程越多,前面匿名管道写端被指向的就越多,所以仅仅关闭一个进程的写端指向,还有其他的写端指向,所以读端无法读到0,也就无法退出,如下图所示:
在这里插入图片描述
在这里插入图片描述

当创建2个子进程时,第一个匿名管道写端就有两个进程指向,当创建的进程越多,该写端指向的也就越多。

如果要使用一个循环来清理回收子进程,我们可以从后往前关闭文件描述符,因为最后一个管道写端只有父进程指向。

4. 封装与完整实现

  对于父进程管理进程池我们可以封装一个类来更好的进行管理与实现:

#include <iostream>
#include <string>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <sys/wait.h>

#include "Task.hpp"
#include "Channel.hpp"
// master


class ProcessPool
{
public:
    int InitProcesspool(int num)
    {
        for (int i = 0; i < num; i++)
        {
            // 1.创建匿名管道
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0)
                return 2;

            // 2.创建子进程
            pid_t id = fork();
            if (id < 0)
                return 3;

            // 3.建立通信管道,父子进程关闭读端或写端
            if (id == 0) // 子进程
            {
                // 子进程读取,关闭写端
                ::close(pipefd[1]);
                // dup2
                dup2(pipefd[0], 0);
                Work();
                ::exit(0);
            }

            // 父进程
            // 父进程写入,关闭读端
            ::close(pipefd[0]);
            channels.emplace_back(pipefd[1], id);
        }
        return 0;
    }
    void ExcuteTask()
    {
        int n = 0;
        int count = 10;
        while (count--) // 执行10次任务
        {
            // 1.选择任务,获取任务编号
            int tasknum = tm.SelectTask();

            // 2.选择子进程,使用轮询选择,派发任务
            channels[n++].Send(tasknum);
            n %= channels.size();

            std::cout << std::endl;
            // std::cout<<"**************************"<<std::endl;
            std::cout << "*****成功发送" << 10 - count << "个任务*****" << std::endl;
            // std::cout<<"**************************"<<std::endl;
            // std::cout<<std::endl;

            sleep(3);
        }
    }

    void CleanProcesspool()
    {
        for (auto &c : channels)
            ::close(c.GetFd());

        for (auto &c : channels)
        {
            pid_t rid = ::waitpid(c.GetWho(), nullptr, 0);
            if (rid <= 0)
            {
                std::cout << std::endl;
                std::cout << "清理子进程失败..." << std::endl;
                return;
            }
        }

        std::cout << std::endl;
        std::cout << "成功清理子进程..." << std::endl;
    }

private:
    std::vector<Channel> channels;
};

main函数:

#include "ProcessPool.hpp"

int main(int argc, char* argv[])
{
    //0.获取应该创建管道个数num个
    if(argc!=2)
    {
        std::cout<<"请输入管道个数."<<std::endl;
        return 1;
    }
    int num = std::stoi(argv[1]);
    std::vector<Channel> channels;
    
    ProcessPool* pp = new ProcessPool;

    //1.初始化进程池——创建进程池
    pp->InitProcesspool(num);
    
    //2.执行任务
    pp->ExcuteTask();

    //3.任务执行完成,回收子进程
    pp->CleanProcesspool();

    delete pp;
    
    return 0;
}

运行结果如下:

在这里插入图片描述

5. 结语

  以上就是基于匿名管道通信实现的进程池,子进程会根据接受到的信息执行不同的任务,父进程可以看作Master来进行管理创建的多个进程。关键点在于对进程管理的封装以及回收子进程时会有多个进程指向匿名管道的读端,所以回收时要注意可能会出现的bug。