Linux系统基础-进程间通信(4)_模拟实现进程池

时间:2024-10-24 07:00:24

个人主页:C++忠实粉丝
欢迎 点赞???? 收藏✨ 留言✉ 加关注????本文由 C++忠实粉丝 原创

Linux系统基础-进程间通信(4)_模拟实现进程池

收录于专栏[Linux学习]
本专栏旨在分享学习Linux的一点学习笔记,欢迎大家在评论区交流讨论????

目录

1. 基础库引入模块

2. Channel类模块

成员变量

构造函数

成员函数

析构函数

总体分析

3. 进程与管道创建模块

参数

函数逻辑

创建管道:

创建子进程:

父进程操作:

总体分析

4. 任务分配与执行控制模块

NextChannel 函数

SendTaskCommand 函数

ctrlProcessOnce 函数

ctrlProcess 函数

5. 清理与资源回收模块

6. 主函数模块

7. 任务管理模块

8. 效果展示


1. 基础库引入模块

功能 : 包含标准库和自定义任务头文件.

#include <iostream>      // 标准输入输出流
#include <string>       // 字符串处理
#include <vector>       // 向量(动态数组)
#include <unistd.h>     // UNIX 标准函数
#include <sys/types.h>  // 数据类型定义
#include <sys/wait.h>   // 进程等待
#include "Task.hpp"     // 自定义任务头文件,假设其中定义了任务相关的函数和类型

 这里引入必要的标准库和自定义库, 以支持后续功能的实现

2. Channel类模块

功能 : 封装管道及进程信息

class Channel
{
public:
    Channel(int wfd, pid_t id, const std::string& name)
        :_wfd(wfd), _subprocessid(id), _name(name)
    {
    }
    int GetWfd() {return _wfd;}
    pid_t GetProcessId() {return _subprocessid;}
    std::string GetName() {return _name;}
    void closeChannel()
    {
        close(_wfd);
    }
    void Wait()
    {
        pid_t rid = waitpid(_subprocessid, nullptr, 0);
        if(rid > 0)
        {
            std::cout << "wait " << rid << "success" << std::endl;
        }
    }
    ~Channel()
    {
    }

private:
    int _wfd;
    pid_t _subprocessid;
    std::string _name;
};

成员变量

int _wfd : 代表写入文件描述符, 用于进程间通信

pid_t _subprocessid : 存储子进程的pid, 用于管理子进程

_name : 存储通道的名称

构造函数

构造函数初始化成员变量, 使用初始化列表初始化 _wfd, _subprocessid, _name

成员函数

int GetWfd() : 

返回写入文件描述符, 方便其他类或函数使用这个文件描述符进行写操作

pid_t GetProcessid() : 

返回子进程pid

std::string GetName() :

返回通道名称

void closeChannel() :

关闭写入文件描述符, 释放系统资源, 函数中使用 close(_wd) 确保在不需要时清理资源

void Wait() : 

等待进程结束, 使用waitpid 函数来阻塞当前进程, 直到指定进程结束, 如果waitpid返回值大于0, 表示成功等待子进程, 控制台会输出成功信息.

析构函数

由于文件描述符在 closeChannel 中关闭, 析构时不需要额外操作.

总体分析

这个Channel类为管理进程间提供了一个简单而有效的封装, 它使得管道的创建和管理变得更为方便, 提供了很好的接口来处理与子进程的交互

3. 进程与管道创建模块

功能 : 创建管道和子进程的功能

//进程与管道创建模块
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
    for(int i = 0; i < num; i++)
    {
        //1. 创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if(n < 0) exit(1);

        //2. 创建子进程
        pid_t id = fork();
        if(id == 0)
        {
            if(!channels->empty())
            {
                //第二次之后, 开始创建新的管道
                for(auto &channel : *channels) channel.CloseChannel();
            }
            //child - read
            close(pipefd[1]);
            dup2(pipefd[0], 0); //将管道的读端重定向到标准输入
            task();
            close(pipefd[0]);
            exit(0);
        }

        //3. 构建一个channel名称
        std::string channel_name = "Channel-" + std::to_string(i);
        //父进程
        close(pipefd[0]);
        //a. 子进程的pid b. 父进程关心的管道的w端
        channels->push_back(Channel(pipefd[1], id, channel_name)); 
    }
}

参数

int num:要创建的子进程数量。

std::vector<Channel> *channels:

指向 Channel 对象的指针,用于存储每个子进程及其对应的管道信息。

task_t task:传入的任务函数指针,子进程执行的任务。

函数逻辑

创建管道:

使用 pipe(pipefd) 创建一个管道,pipefd[0] 为读端,pipefd[1] 为写端。若创建失败,调用 exit(1) 终止程序。

创建子进程:

使用 fork() 创建子进程。返回值 id:

如果 id 为 0,表示当前进程为子进程。

子进程执行以下操作:

        如果 channels 不为空,关闭已存在的管道的写端,确保没有文件描述符泄露。

        关闭管道的写端 (close(pipefd[1])),并使用 dup2(pipefd[0], 0) 将管道的读端重定向到标准输入,以便子进程可以从管道读取数据。

        执行传入的 task() 函数。

        关闭读端并退出。

父进程操作:

父进程关闭管道的读端 (close(pipefd[0])),因为只关心写端。

创建一个通道名称,如 "Channel-0",然后将子进程的 PID 和写端的文件描述符封装到 Channel 对象中,并将其添加到 channels 中。

总体分析

这个函数高效地管理了多进程创建与通信。它确保每个子进程拥有独立的管道,并且在创建新的管道前关闭不再使用的管道,避免资源泄露。task 函数的执行允许用户定义子进程的具体工作,实现灵活的任务分配。这种结构适合需要并行处理的场景,如数据处理或并发计算。

4. 任务分配与执行控制模块

功能 : 负责任务的选择和分配给子进程

//0 1 2 3 4 channelnum
int NextChannel(int channelnum)
{
    static int next = 0;
    int channel = next;
    next++;
    next %= channelnum;
    return channel;
}

//任务分配与执行控制模块
void SendTaskCommand(Channel &Channel, int taskcommand)
{
    write(Channel.GetWfd(), &taskcommand, sizeof(taskcommand));
}

void ctrlProcessOnce(std::vector<Channel> &Channels)
{
    sleep(1);
    //1. 选择一个任务
    int taskcommand = SelectTask();
    //2. 选择一个信道和进程
    int channel_index = NextChannel(Channels.size());
    //3. 发送任务
    SendTaskCommand(Channels[channel_index], taskcommand);
    std::cout << std::endl;
    std::cout << "taskcommand: " << taskcommand << "channel: " << Channels[channel_index].GetName() << "sub process: " << Channels[channel_index].GetProcessId() << std::endl;
}


void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{
    if(times > 0)
    {
        while(times--)
        {
            ctrlProcessOnce(channels);
        }
    }
    else
    {
        while(true)
        {
            ctrlProcessOnce(channels);
        }
    }
} 

NextChannel 函数

功能:循环返回下一个信道的索引。

实现:

使用静态变量 next 来保存当前的信道索引。

每次调用时返回当前的 next 值,并将其增加,随后使用取模操作确保它不会超过 channelnum - 1。

特点:保证信道的选择是循环的,这样可以平均分配任务到各个信道上。

SendTaskCommand 函数

功能:发送任务命令到指定的信道。

实现:

使用 write 函数将 taskcommand 的值写入信道的写端。

Channel.GetWfd() 获取信道的写文件描述符(WFD),确保命令能够被子进程接收。

特点:该函数直接进行系统调用以发送数据.

ctrlProcessOnce 函数

功能:控制单次任务分配和发送。

实现:

首先调用 sleep(1) 暂停执行,以控制任务发送的频率。

使用 SelectTask() 选择一个任务命令。

调用 NextChannel 获取下一个信道的索引。

发送选定的任务到对应信道。

打印发送的任务信息,包括任务命令、信道名称和子进程 ID。

ctrlProcess 函数

功能:控制多个任务的发送。

实现:

接受一个可选的参数 times,用于指定任务发送的次数。

如果 times 大于 0,则执行 times 次任务发送;否则,进入无限循环,不断发送任务。

特点:通过这个函数,用户可以灵活控制任务的发送次数,适应不同的需求场景。

5. 清理与资源回收模块

功能 : 负责关闭管道和回收子进程

void CleanUpChannel(std::vector<Channel> &channels)
{
    for(auto &channels : channels)
    {
        channels.CloseChannel();
        channels.Wait();
    }
}

Wait() 会调用操作系统的 wait 系统调用,确保主进程在子进程结束之前不会继续执行,从而避免僵尸进程。 

6. 主函数模块

功能 : 处理命令行参数和程序的主逻辑。

//主函数模块
//处理命令行参数和程序的主逻辑。

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
        return 1;
    }
    int num = std::stoi(argv[1]);
    LoadTask();

    std::vector<Channel> channels;
    // 1. 创建信道和子进程
    CreateChannelAndSub(num, &channels, work1);

    // 2. 通过channel控制子进程
    ctrlProcess(channels, 5);

    // 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程
    CleanUpChannel(channels);

    // sleep(100);
    return 0;
}

int main(int argc, char *argv[]): 这是程序的入口点。argc 表示命令行参数的数量,argv 是一个字符串数组,存储所有参数。

if (argc != 2): 这个条件检查传入的参数数量是否等于 2。第一个参数(argv[0])是程序本身的名称,第二个参数(argv[1])应该是用户提供的。

7. 任务管理模块

功能 : 主要用于创建, 加载, 和执行一些简单的任务 

#pragma once

#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>

#define TaskNum 3

typedef void (*task_t)(); // task_t 函数指针类型

void Print()
{
    std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
    std::cout << "I am a download task" << std::endl;
}
void Flush()
{
    std::cout << "I am a flush task" << std::endl;
}

task_t tasks[TaskNum];

void LoadTask()
{
    srand(time(nullptr) ^ getpid() ^ 17777);
    tasks[0] = Print;
    tasks[1] = DownLoad;
    tasks[2] = Flush;
}

void ExcuteTask(int number)
{
    if (number < 0 || number > 2)
        return;
    tasks[number]();
}

int SelectTask()
{
    return rand() % TaskNum;
}

void work()
{
    while (true)
    {
        int command = 0;
        int n = read(0, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is : " << getpid() << " handler task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process : " << getpid() << " quit" << std::endl;
            break;
        }
    }
}

void work1()
{
    while (true)
    {
        int command = 0;
        int n = read(0, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is : " << getpid() << " handler task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process : " << getpid() << " quit" << std::endl;
            break;
        }
    }
}

void work2()
{
    while (true)
    {
        int command = 0;
        int n = read(0, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is : " << getpid() << " handler task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process : " << getpid() << " quit" << std::endl;
            break;
        }
    }
}

typedef void (*task_t)();:

这行代码的意思是:task_t 是一个新的类型别名,它代表一个指向返回类型为 void,且不接受任何参数的函数的指针。

8. 效果展示