基于ASIO的异步IO编程

时间:2025-01-18 13:27:19

系列文章目录


文章目录

  • 系列文章目录
  • 前言
  • 1. asio基础概念
    • 1. 同步操作
    • 2. 异步操作
    • 3. 异步代理
    • 4. 相关特征
    • Buffers
    • stream-oriented I/O objects
      • 定制内存分配器, cancellation_slot
      • debugging
      • 异步操作组合
      • Completion Token的适配器
    • 5 网络
      • 对sockets的高层次封装
    • 6 一些异步封装
      • 计时器
      • 文件
      • 管道
      • 串口
      • 信号处理
    • 7 ssl支持
  • 2. 计时器 & UDP/TCP编程
  • 3. 系统提供的异步接口
  • 4. 最佳实践
  • references


前言

异步IO介绍见references
asio和boost.asio的区别
https://think-async.com/Asio/AsioAndBoostAsio.html


1. asio基础概念

host <-> hostname / domain / url
service <-> port

1. 同步操作

asio::io_context io_ctx;  // io上下文相当于asio上层与操作系统之间的一个桥梁、中间件
asio::ip::tcp::socket socket(io_ctx);
socket.connect(server_endpoint);  // 同步操作
// 应用通过通过connect接口告诉socket对象建立一个tcp链接,socket对象将其转发到io_context
// io execution context调用系统接口建立连接,将结果告诉socket对象,然年后它再返回告诉应用连接结果

2. 异步操作

asio::io_context io_ctx;  // io上下文相当于asio上层与操作系统之间的一个桥梁、中间件
asio::ip::tcp::socket socket(io_ctx);
std::function<void(const asio::error_code&)> handler;
handler = [&](const asio::error_code& ec){ }  // 在io_context::run()调用线程执行
socket.async_connect(server_endpoint, handler);  // 异步操作分两步,1.初始化 2.执行handler函数
io_ctx.run();  // 执行流阻塞在这里,直到没有新的异步io handler可供处理

每调用一次async_X操作时好像入队(生产一个),每次事件来了时都会出队(消费一个),若 .run()时 队中有操作则阻塞,无操作则返回。

3. 异步代理

由于在调用handler之前,所有申请的资源都已释放,因此在handler函数内可继续执行初始化(回调函数注册)操作,此为异步代理。
异步代理包含多个异步操作

handler = [&](const asio::error_code& ec){
    socket.async_connect(server_endpoint, handler);  // 异步代理
}  // 在io_context::run()调用线程执行

4. 相关特征

包括执行器用哪种,内存该如何分配,取消slot如何等

异步操作的实现类似下面:

template <typename CompletionToken, completion_signature... Signatures>
struct async_result
{
    template <typename Initiation, completion_handler_for<Signatures...> CompletionHandler, typename... args>
    static void initiate(Initiation&& initiation, CompletionHandler&& completion_handler, Args&&... args)
    {
        template <typename T> using sfw = std::forward<T>;
        sfw<Initiation>(Initiation)(sfw<CompletionHandler>(completion_handler), sfw<Args>(args)...);
    }
};


using FnType = void(asio::error_code, size_t);
template< FnType CompletionToken>
auto async_read_some(asio::ip::tcp& s, const asio::mutable_buffer& b, FnType&& token)
{
    auto init = [](auto completion_handler, asio::ip::tcp::socket* s, const asio::mutable_buffer& b){
        std::thread(
            [](auto completion_handler, asio::ip::tcp::socket* s, const asio::mutable_buffer &b) {
                asio::error_code ec;
                size_t n = s->read_some(b, ec);
                std::move(completion_handler)(ec, n);
            }, std::move(completion_handler), s, b
        ).detach();
    };
    return asio::async_result<typename std::decay_t<FnType>, void(asio::error_code, size_t)>
        ::initiate(init, std::forward<CompletionToken>(token), &s, b);
}

高层次级别抽象
在这里插入图片描述

proactor模式和reactor模式的区别

reactor为同步IO模型,主线程负责通知子线程,子线程负责读写以及业务逻辑处理
proactor为异步IO模型,主线程和内核负责通知和读写数据,子线程负责业务逻辑处理

不同对象是线程安全的,同一个对象不能多线程同时读写,他不是安全的

Strands
保证strand中的异步操作不会并发执行,这在服务端处理用户可以避免数据竞争

#include <functional>
#include <iostream>
#include <asio.hpp>
#include <vector>
#include <thread>

using namespace std;
using namespace asio;

void multi_context()
{
  vector<io_context*> ctxs;
  for (int i = 0; i < 100; ++i)
  {
    ctxs.push_back(new io_context);
  }
  for (int i = 0; i < 100; ++i)
  {
    asio::post( *ctxs[i] , [](){
      cout << gettid() << "]" << 1 << 2 << 3 << 4 << 5 << 6 << 7 << 8 << 9 << endl;
    });  
  }
  vector<std::thread> ths;
  for (int i = 0; i < 100; ++i)
  {
    ths.emplace_back([i, &ctxs](){ ctxs[i]->run(); });
  }
  for (int i = 0; i < 100; ++i)
  {
    ths[i].join();
  }
}
void no_strand()
{
  io_context io_ctx;
  // auto strand = asio::make_strand(io_ctx);
  for (int i = 0; i < 100; ++i)
  {
    asio::post( io_ctx , [](){
      cout << gettid() << "]" << 1 << 2 << 3 << 4 << 5 << 6 << 7 << 8 << 9 << endl;
    });  
  }
  vector<std::thread> ths;
  for (int i = 0; i < 100; ++i)
  {
    ths.emplace_back([i, &io_ctx](){ io_ctx.run(); });
  }

  for (int i=0; i < 100; ++i)
  {
    ths[i].join();
  }
}
void has_strand()
{
  io_context io_ctx;
  auto strand = asio::make_strand(io_ctx);
  for (int i = 0; i < 100; ++i)
  {
    asio::post( strand , [](){
      cout << gettid() << "]" << 1 << 2 << 3 << 4 << 5 << 6 << 7 << 8 << 9 << endl;
    });  
  }
  vector<std::thread> ths;
  for (int i = 0; i < 100; ++i)
  {
    ths.emplace_back([i, &io_ctx](){ io_ctx.run(); });
  }

  for (int i=0; i < 100; ++i)
  {
    ths[i].join();
  }
}

Buffers

  • A scatter-read receives data into multiple buffers.
  • A gather-write transmits multiple buffers.

stream-oriented I/O objects

  • There are no message boundaries. The data being transferred is a continuous sequence of bytes.
  • Read or write operations may transfer fewer bytes than requested. This is referred to as a short read or short write.

read_some()

async_read_some()

write_some()

async_write_some()

ip::tcp::socket

read(), async_read(), write() and async_write().

ip::tcp::socket socket(my_io_context);
...
socket.non_blocking(true);
...
socket.async_wait(ip::tcp::socket::wait_read, read_handler);
...
void read_handler(asio::error_code ec)
{
  if (!ec)
  {
    std::vector<char> buf(socket.available());
    socket.read_some(buffer(buf));
  }
}

异步读,直到读到xxx

read_until() and async_read_until()

class http_connection
{
  ...

  void start()
  {
    asio::async_read_until(socket_, data_, "\r\n",
        boost::bind(&http_connection::handle_request_line, this, _1));
  }

  void handle_request_line(asio::error_code ec)
  {
    if (!ec)
    {
      std::string method, uri, version;
      char sp1, sp2, cr, lf;
      std::istream is(&data_);
      is.unsetf(std::ios_base::skipws);
      is >> method >> sp1 >> uri >> sp2 >> version >> cr >> lf;
      ...
    }
  }

  ...

  asio::ip::tcp::socket socket_;
  asio::streambuf data_;
};

// read_until支持多个重载
typedef asio::buffers_iterator<
    asio::streambuf::const_buffers_type> iterator;

std::pair<iterator, bool>
match_whitespace(iterator begin, iterator end)
{
  iterator i = begin;
  while (i != end)
    if (std::isspace(*i++))
      return std::make_pair(i, true);
  return std::make_pair(i, false);
}
...
asio::streambuf b;
asio::read_until(s, b, match_whitespace);

// 可调用类,特性萃取
class match_char
{
public:
  explicit match_char(char c) : c_(c) {}

  template <typename Iterator>
  std::pair<Iterator, bool> operator()(
      Iterator begin, Iterator end) const
  {
    Iterator i = begin;
    while (i != end)
      if (c_ == *i++)
        return std::make_pair(i, true);
    return std::make_pair(i, false);
  }

private:
  char c_;
};
namespace asio {
  template <> struct is_match_condition<match_char>
    : public boost::true_type {};
} // namespace asio
...
asio::streambuf b;
asio::read_until(s, b, match_char('a'));

定制内存分配器, cancellation_slot

当使用用户定义的对象 Handler h时,获取分配器使用如下方法

asio::associated_allocator_t<Handler> a = asio::get_associated_allocator(h);
class my_handler
{
public:
  // Custom implementation of Allocator type requirements.
  typedef my_allocator allocator_type;

  // Return a custom allocator implementation.
  allocator_type get_allocator() const noexcept
  {
    return my_allocator();
  }

  void operator()() { ... }
};
namespace asio {

  template <typename Allocator>
  struct associated_allocator<my_handler, Allocator>
  {
    // Custom implementation of Allocator type requirements.
    typedef my_allocator type;

    // Return a custom allocator implementation.
    static type get(const my_handler&,
        const Allocator& a = Allocator()) noexcept
    {
      return my_allocator();
    }
  };

} // namespace asio


asio::async_read(my_socket, my_buffer,
    asio::bind_cancellation_slot(
      my_cancellation_slot,
      [](asio::error_code e, std::size_t n)
      {
        ...
      }
    )
  );

cancellation slot and signal. signal发送,slot接收(一个覆盖式handler)

class session
  : public std::enable_shared_from_this<proxy>
{
  ...

  void do_read()
  {
    auto self = shared_from_this();
    socket_.async_read_some(
        buffer(data_),
        asio::bind_cancellation_slot(
          cancel_signal_.slot(),
          [self](std::error_code error, std::size_t n)
          {
            ...
          }
        )
      );
  }

  ...

  void request_cancel()
  {
    cancel_signal_.emit(asio::cancellation_type::total);
  }

  ...

  asio::cancellation_signal cancel_signal_;
};

debugging

通过加入一些宏
https://think-async.com/Asio/asio-1.30.2/doc/asio/overview/core/handler_tracking.html
显示调试信息

异步操作组合

将几个异步操作组成一个简单的状态机

struct async_echo_implementation
{
    asio::ip::tcp::socket& socket_;
    asio::mutable_buffer buffer_;
    enum {starting, reading, writing} state_;
    
    template <typename Self> void 
    operator()(Self& self, asio::error_code error = {}, size_t n = 0)
    {
        switch (state_)
        {
        case starting:
            state_ = reading;
            socket_.async_read_some(buffer_, std::move(self));
            break;
        case reading:
            if (error)
            {
                self.complete(error, 0);
            }
            else
            {
                state_ = writing;
                asio::async_write(socket_, buffer_, asio::transfer_exactly(n), std::move(self));
            }
            break;
        case writing:
            self.complete(error, n);
            break;
        }
    }
};
template<typename CompletionToken>
auto async_echo(ip::tcp::socket& socket, mutable_buffer buffer, CompletionToken&& token) ->
    typename async_result<