C++ 网络编程学习五

时间:2024-03-15 21:07:15

C++网络编程学习五

    • 网络结构的更新
    • 单例模式
      • 懒汉单例模式
      • 饿汉单例模式
      • 懒汉式指针
      • 智能指针设计单例类
    • 服务器优雅退出
    • asio的多线程模型IOService
    • asio多线程IOThreadPool
      • epoll 和 iocp的一些知识点

网络结构的更新

  1. asio网络层,会使用io_context进行数据封装,底层的话,在linux中就是epoll模型,在windows就是iocp模型。
    在这里插入图片描述
  2. 当服务器的接受数据较多时,又要处理接收到的信息的逻辑处理,逻辑处理一般会放到一个逻辑处理队列中进行处理。因为有时候逻辑比较复杂。
  3. 通过一个队列,单独线程从队列中取出逻辑函数。从而实现网络线程和逻辑线程分开,由一个队列进行连接。极大地提升网络线程的收发能力,并且可以用多线程的方式管理网络层。
  4. asio的多线程模式:
    • 启动n个线程,每个线程都有一个iocontext,每个线程负责一部分的socket。
    • 一个ioconext由多个线程共享。也可以一定程度上减轻readhandler的负担。
  5. 逻辑处理一般都是单线程的,因为大量的用户同时处理一个逻辑过程的时候,频繁地加锁取消锁,还不如就单线程的来做。

完善消息结构:
消息 = 消息id + 消息长度 + 消息内容。 前两部分统一封装到消息头里,tlv格式。消息id占2个字节,消息长度占2个字节,消息头共4个字节。

更新消息节点:将收取消息的类和发送消息的类,继承自消息基类。

//HEAD_TOTAL_LEN = 4 包含id 和 消息长度。
SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN)
, _msg_id(msg_id){
    //先发送id, 本机字节序转为网络字节序
    short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
    memcpy(_data, &msg_id_host, HEAD_ID_LEN); // 变量按照字节流的方式,写到_data的前两个字节。
    //消息长度和消息体本身  转为网络字节序
    short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
    // 消息体长度信息的拷贝。
    memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);
    // 消息体本身的拷贝。
    memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
}

单例模式

懒汉单例模式

  1. 通过静态成员变量实现单例。存在隐患,对于多线程方式生成的实例可能是多个。
class Single2 {
private:
	Single2() {}
	Single2(const Single2&) = delete;
	Single2& operator=(const Single2&) = delete;
public:
	// 静态局部变量实现单例
	static Single2& GetInst() {
		static Single2 single; // 生命周期和进程一样,函数的局部静态变量生命周期随着进程结束而结束。
		return single;
	}
};

饿汉单例模式

class Single2Hungry
{
private:
    Single2Hungry()
    {
    }
    Single2Hungry(const Single2Hungry &) = delete;
    Single2Hungry &operator=(const Single2Hungry &) = delete;
public:
    static Single2Hungry *GetInst()
    {
        if (single == nullptr)
        {
            single = new Single2Hungry();
        }
        return single;
    }
private:
    static Single2Hungry *single;
};
// 初始化
Single2Hungry *Single2Hungry::single = Single2Hungry::GetInst();

饿汉式是在程序启动时就进行单例的初始化,这种方式也可以通过懒汉式调用,无论饿汉式还是懒汉式都存在一个问题,就是什么时候释放内存?多线程情况下,释放内存就很难了,还有二次释放内存的风险。

懒汉式指针

单例模式的单例由指针存在,创建单例的时候,用加锁的方式进行判断。防止在加锁的过程中,出现单例类被创建的情况。

class SinglePointer
{
private:
    SinglePointer()
    {
    }
    SinglePointer(const SinglePointer &) = delete;
    SinglePointer &operator=(const SinglePointer &) = delete;
public:
    static SinglePointer *GetInst()
    {
        if (single != nullptr)
        {
            return single;
        }
        s_mutex.lock();
        if (single != nullptr)
        {
            s_mutex.unlock();
            return single;
        }
        single = new SinglePointer();
        s_mutex.unlock();
        return single;
    }
private:
    static SinglePointer *single;
    static mutex s_mutex;
};

智能指针设计单例类

class SingleAuto
{
private:
	SingleAuto()
	{
	}
	SingleAuto(const SingleAuto&) = delete;
	SingleAuto& operator=(const SingleAuto&) = delete;
public:
	~SingleAuto()
	{
		cout << "single auto delete success " << endl;
	}
	static std::shared_ptr<SingleAuto> GetInst()
	{
		if (single != nullptr)
		{
			return single;
		}
		s_mutex.lock();
		if (single != nullptr)
		{
			s_mutex.unlock();
			return single;
		}
		single = std::shared_ptr<SingleAuto>(new SingleAuto);
		s_mutex.unlock();
		return single;
	}
private:
	static std::shared_ptr<SingleAuto> single;
	static mutex s_mutex;
};

服务器优雅退出

  • 服务器退出之前,要把服务器逻辑队列中的服务执行完成。
  • asio提供的信号建立的方式,利用signal_set 定义了一系列信号合集,并且绑定了一个匿名函数,匿名函数捕获了io_context的引用,并且函数中设置了停止操作,也就是说当捕获到SIGINT,SIGTERM等信号时,会调用io_context.stop
int main()
{
    try {
        boost::asio::io_context  io_context;
        // 绑定信号 想捕获的信号,都加进去就行了 捕获io_context的收到的信号
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
        // 异步等待方式,等待停止信号
        // 绑定了一个匿名函数,匿名函数捕获了io_context的引用,并且函数中设置了停止操作,也就是说当捕获到SIGINT,SIGTERM等信号时,会调用io_context.stop
        signals.async_wait([&io_context](auto, auto) {
            io_context.stop();
            });
        CServer s(io_context, 10086);
        io_context.run();
    }
    catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << endl;
    }
}

asio的多线程模型IOService

  1. 第一个是启动多个线程,每个线程管理一个iocontext。第二种是只启动一个iocontext,被多个线程共享。
    在这里插入图片描述

  2. 启动线程的个数,不要超过核数。

  3. 每个线程独立调用io_context,一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调:

    • 那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。
    • 如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。可以通过加锁或者逻辑队列的方式解决安全问题。
  4. 多线程的优势:提升了并发能力, 单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用。如果一个回调函数的调用时间较长肯定会影响后续的函数调用。

  5. 通过逻辑队列的方式将网络线程和逻辑线程解耦合,不会出现前一个调用时间影响下一个回调触发的问题。

class AsioIOServicePool :public Singleton<AsioIOServicePool>
{
    friend Singleton<AsioIOServicePool>; //声明友元,用这个类访问构造函数
public:
    using IOService = boost::asio::io_context; // 定义别名
    // 通常会将一些异步操作提交给io_context进行处理,然后该操作会被异步执行,而不会立即返回结果。
    // 如果没有其他任务需要执行,那么io_context就会停止工作,导致所有正在进行的异步操作都被取消。
    // 这时,我们需要使用boost::asio::io_context::work对象来防止io_context停止工作。
    using Work = boost::asio::io_context::work; // work,防止io_context在没有被注册事件的时候退出。
    using WorkPtr = std::unique_ptr<Work>; //work不被改变。

    ~AsioIOServicePool();
    AsioIOServicePool(const AsioIOServicePool&) = delete;//不加引用的话,会造成一个递归构造的危险,不允许拷贝构造
    AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;
    // 使用 round-robin 的方式返回一个 io_service
    boost::asio::io_context& GetIOService();// 轮询的方式
    void Stop();// 停止ioservice
private:
    // 不让外界直接调用
    AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency()); //根据CPU核数去创建线程数量
    std::vector<IOService> _ioServices; //IOService的vector变量,用来存储初始化的多个IOService。
    std::vector<WorkPtr> _works;
    std::vector<std::thread> _threads;
    std::size_t   _nextIOService;//下标
};

// 构造函数,初始化size个ioservice,size个_works, _ioServices绑定到works
// _ioServices放到不同的线程中。
AsioIOServicePool::AsioIOServicePool(std::size_t size) :_ioServices(size),
_works(size), _nextIOService(0) {
    for (std::size_t i = 0; i < size; ++i) {
        _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i]));
    }
    //遍历多个ioservice,创建多个线程,每个线程内部启动ioservice
    for (std::size_t i = 0; i < _ioServices.size(); ++i) {
        _threads.emplace_back([this, i]() {
            _ioServices[i].run();// 如果不绑定work,run()就会直接返回了
            });
    }
}

AsioIOServicePool::~AsioIOServicePool() {
    std::cout << "destruct" << std::endl;
}

// 轮询的方式,每次取出来的ioservice都不同。
boost::asio::io_context& AsioIOServicePool::GetIOService() {
    auto& service = _ioServices[_nextIOService++]; //取出service
    if (_nextIOService == _ioServices.size()) {
        _nextIOService = 0;
    }
    return service;
}

void AsioIOServicePool::Stop() {
    for (auto& work : _works) {
        work.reset();// 想要Service停掉,要先work.reset();
        // work.reset()是让unique指针置空并释放,那么work的析构函数就会被调用,work被析构,其管理的io_service在没有事件监听时就会被释放。
    }
    for (auto& t : _threads) {
        t.join();
    }
}

asio多线程IOThreadPool

  1. 一个IOServicePool开启n个线程和n个iocontext,每个线程内独立运行iocontext, 各个iocontext监听各自绑定的socket是否就绪,如果就绪就在各自线程里触发回调函数。
  2. IOThreadPool:初始化一个iocontext用来监听服务器的读写事件,包括新连接到来的监听也用这个iocontext。只是我们让iocontext.run在多个线程中调用,这样回调函数就会被不同的线程触发,从这个角度看回调函数被并发调用了。
  3. 线程池统一管理一个io_context,每个线程调用一个io_context,会话session都注册到一个,哪个线程调用了io_context.run,哪个线程去就绪队列取出回调函数。
  4. 回调函数对同一个session来说就是不安全的。
class AsioThreadPool :public Singleton<AsioThreadPool>
{
public:
    friend class Singleton<AsioThreadPool>;
    ~AsioThreadPool() {}
    AsioThreadPool& operator=(const AsioThreadPool&) = delete;
    AsioThreadPool(const AsioThreadPool&) = delete;
    boost::asio::io_context& GetIOService();
    void Stop();
private:
    AsioThreadPool(int threadNum = std::thread::hardware_concurrency());
    boost::asio::io_context _service;
    std::unique_ptr<boost::asio::io_context::work> _work; //没有客人来,我也不会让饭店关门
    std::vector<std::thread> _threads;
};

// 初始化列表进行初始化
AsioThreadPool::AsioThreadPool(int threadNum) :_work(new boost::asio::io_context::work(_service)) {
    for (int i = 0; i < threadNum; ++i) {
        _threads.emplace_back([this]() {
            _service.run();
            });
    }
}
  • 线程池里每个线程都会运行_service.run函数,这就是多线程调用一个io_context的逻辑。
  • 因为回调函数是在不同的线程里调用的,所以会存在不同的线程调用同一个socket的回调函数的情况。
  • _service.run 内部在Linux环境下调用的是epoll_wait返回所有就绪的描述符列表在windows上会循环调用GetQueuedCompletionStatus函数返回就绪的描述符,二者原理类似,进而通过描述符找到对应的注册的回调函数,然后调用回调函数。

epoll 和 iocp的一些知识点

IOCP的使用主要分为以下几步:
1 创建完成端口(iocp)对象。
2 创建一个或多个工作线程,在完成端口上执行并处理投递到完成端口上的I/O请求。
3 Socket关联iocp对象,在Socket上投递网络事件。
4 工作线程调用GetQueuedCompletionStatus函数获取完成通知封包,取得事件信息并进行处理。

epoll_wait的工作方式:
1 调用epoll_creat在内核中创建一张epoll表。
2 开辟一片包含n个epoll_event大小的连续空间。
3 将要监听的socket注册到epoll表里。
4 调用epoll_wait,传入之前我们开辟的连续空间,epoll_wait返回就绪的epoll_event列表,
epoll会将就绪的socket信息写入我们之前开辟的连续空间。
  • 使用这种方式,有可能会存在隐患,不同的线程有可能处理同一块Read回调处理函数,存在网络上的并行。
    改进方法:再添加一个strand管理的队列,asio的strand是一个安全队列,里面进行独立的单线程访问。
  • 回调处理放在_strand中进行执行。
void CSession::Start(){
    ::memset(_data, 0, MAX_LENGTH);
    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
        boost::asio::bind_executor(_strand, std::bind(&CSession::HandleRead, this,
            std::placeholders::_1, std::placeholders::_2, SharedSelf())));
}
  • IOThreadPool相比于IOServicePool,速度慢一些。

参考列表
https://www.bilibili.com/video/BV1FV4y1U7oo/
https://llfc.club/category?catid=225RaiVNI8pFDD5L4m807g7ZwmF#!aid/2Qld2hoFIu8ycYBJXQdxwyWEBfy