系列文章目录
文章目录
- 系列文章目录
- 前言
- 1. asio基础概念
- 1. 同步操作
- 2. 异步操作
- 3. 异步代理
- 4. 相关特征
- Buffers
- stream-oriented I/O objects
- 定制内存分配器, cancellation_slot
- debugging
- 异步操作组合
- Completion Token的适配器
- 5 网络
- 对sockets的高层次封装
- 6 一些异步封装
- 计时器
- 文件
- 管道
- 串口
- 信号处理
- 7 ssl支持
- 2. 计时器 & UDP/TCP编程
- 3. 系统提供的异步接口
- 4. 最佳实践
- references
前言
异步IO介绍见references
asio和boost.asio的区别 https://think-async.com/Asio/AsioAndBoostAsio.html
1. asio基础概念
host <-> hostname / domain / url
service <-> port
1. 同步操作
asio::io_context io_ctx; // io上下文相当于asio上层与操作系统之间的一个桥梁、中间件
asio::ip::tcp::socket socket(io_ctx);
socket.connect(server_endpoint); // 同步操作
// 应用通过通过connect接口告诉socket对象建立一个tcp链接,socket对象将其转发到io_context
// io execution context调用系统接口建立连接,将结果告诉socket对象,然年后它再返回告诉应用连接结果
2. 异步操作
asio::io_context io_ctx; // io上下文相当于asio上层与操作系统之间的一个桥梁、中间件
asio::ip::tcp::socket socket(io_ctx);
std::function<void(const asio::error_code&)> handler;
handler = [&](const asio::error_code& ec){ } // 在io_context::run()调用线程执行
socket.async_connect(server_endpoint, handler); // 异步操作分两步,1.初始化 2.执行handler函数
io_ctx.run(); // 执行流阻塞在这里,直到没有新的异步io handler可供处理
每调用一次async_X操作时好像入队(生产一个),每次事件来了时都会出队(消费一个),若 .run()时 队中有操作则阻塞,无操作则返回。
3. 异步代理
由于在调用handler之前,所有申请的资源都已释放,因此在handler函数内可继续执行初始化(回调函数注册)操作,此为异步代理。
异步代理包含多个异步操作
handler = [&](const asio::error_code& ec){
socket.async_connect(server_endpoint, handler); // 异步代理
} // 在io_context::run()调用线程执行
4. 相关特征
包括执行器用哪种,内存该如何分配,取消slot如何等
异步操作的实现类似下面:
template <typename CompletionToken, completion_signature... Signatures>
struct async_result
{
template <typename Initiation, completion_handler_for<Signatures...> CompletionHandler, typename... args>
static void initiate(Initiation&& initiation, CompletionHandler&& completion_handler, Args&&... args)
{
template <typename T> using sfw = std::forward<T>;
sfw<Initiation>(Initiation)(sfw<CompletionHandler>(completion_handler), sfw<Args>(args)...);
}
};
using FnType = void(asio::error_code, size_t);
template< FnType CompletionToken>
auto async_read_some(asio::ip::tcp& s, const asio::mutable_buffer& b, FnType&& token)
{
auto init = [](auto completion_handler, asio::ip::tcp::socket* s, const asio::mutable_buffer& b){
std::thread(
[](auto completion_handler, asio::ip::tcp::socket* s, const asio::mutable_buffer &b) {
asio::error_code ec;
size_t n = s->read_some(b, ec);
std::move(completion_handler)(ec, n);
}, std::move(completion_handler), s, b
).detach();
};
return asio::async_result<typename std::decay_t<FnType>, void(asio::error_code, size_t)>
::initiate(init, std::forward<CompletionToken>(token), &s, b);
}
高层次级别抽象
proactor模式和reactor模式的区别
reactor为同步IO模型,主线程负责通知子线程,子线程负责读写以及业务逻辑处理
proactor为异步IO模型,主线程和内核负责通知和读写数据,子线程负责业务逻辑处理
不同对象是线程安全的,同一个对象不能多线程同时读写,他不是安全的
Strands
保证strand中的异步操作不会并发执行,这在服务端处理用户可以避免数据竞争
#include <functional>
#include <iostream>
#include <asio.hpp>
#include <vector>
#include <thread>
using namespace std;
using namespace asio;
void multi_context()
{
vector<io_context*> ctxs;
for (int i = 0; i < 100; ++i)
{
ctxs.push_back(new io_context);
}
for (int i = 0; i < 100; ++i)
{
asio::post( *ctxs[i] , [](){
cout << gettid() << "]" << 1 << 2 << 3 << 4 << 5 << 6 << 7 << 8 << 9 << endl;
});
}
vector<std::thread> ths;
for (int i = 0; i < 100; ++i)
{
ths.emplace_back([i, &ctxs](){ ctxs[i]->run(); });
}
for (int i = 0; i < 100; ++i)
{
ths[i].join();
}
}
void no_strand()
{
io_context io_ctx;
// auto strand = asio::make_strand(io_ctx);
for (int i = 0; i < 100; ++i)
{
asio::post( io_ctx , [](){
cout << gettid() << "]" << 1 << 2 << 3 << 4 << 5 << 6 << 7 << 8 << 9 << endl;
});
}
vector<std::thread> ths;
for (int i = 0; i < 100; ++i)
{
ths.emplace_back([i, &io_ctx](){ io_ctx.run(); });
}
for (int i=0; i < 100; ++i)
{
ths[i].join();
}
}
void has_strand()
{
io_context io_ctx;
auto strand = asio::make_strand(io_ctx);
for (int i = 0; i < 100; ++i)
{
asio::post( strand , [](){
cout << gettid() << "]" << 1 << 2 << 3 << 4 << 5 << 6 << 7 << 8 << 9 << endl;
});
}
vector<std::thread> ths;
for (int i = 0; i < 100; ++i)
{
ths.emplace_back([i, &io_ctx](){ io_ctx.run(); });
}
for (int i=0; i < 100; ++i)
{
ths[i].join();
}
}
Buffers
- A scatter-read receives data into multiple buffers.
- A gather-write transmits multiple buffers.
stream-oriented I/O objects
- There are no message boundaries. The data being transferred is a continuous sequence of bytes.
- Read or write operations may transfer fewer bytes than requested. This is referred to as a short read or short write.
read_some()
async_read_some()
write_some()
async_write_some()
ip::tcp::socket
read(), async_read(), write() and async_write().
ip::tcp::socket socket(my_io_context);
...
socket.non_blocking(true);
...
socket.async_wait(ip::tcp::socket::wait_read, read_handler);
...
void read_handler(asio::error_code ec)
{
if (!ec)
{
std::vector<char> buf(socket.available());
socket.read_some(buffer(buf));
}
}
异步读,直到读到xxx
read_until() and async_read_until()
class http_connection
{
...
void start()
{
asio::async_read_until(socket_, data_, "\r\n",
boost::bind(&http_connection::handle_request_line, this, _1));
}
void handle_request_line(asio::error_code ec)
{
if (!ec)
{
std::string method, uri, version;
char sp1, sp2, cr, lf;
std::istream is(&data_);
is.unsetf(std::ios_base::skipws);
is >> method >> sp1 >> uri >> sp2 >> version >> cr >> lf;
...
}
}
...
asio::ip::tcp::socket socket_;
asio::streambuf data_;
};
// read_until支持多个重载
typedef asio::buffers_iterator<
asio::streambuf::const_buffers_type> iterator;
std::pair<iterator, bool>
match_whitespace(iterator begin, iterator end)
{
iterator i = begin;
while (i != end)
if (std::isspace(*i++))
return std::make_pair(i, true);
return std::make_pair(i, false);
}
...
asio::streambuf b;
asio::read_until(s, b, match_whitespace);
// 可调用类,特性萃取
class match_char
{
public:
explicit match_char(char c) : c_(c) {}
template <typename Iterator>
std::pair<Iterator, bool> operator()(
Iterator begin, Iterator end) const
{
Iterator i = begin;
while (i != end)
if (c_ == *i++)
return std::make_pair(i, true);
return std::make_pair(i, false);
}
private:
char c_;
};
namespace asio {
template <> struct is_match_condition<match_char>
: public boost::true_type {};
} // namespace asio
...
asio::streambuf b;
asio::read_until(s, b, match_char('a'));
定制内存分配器, cancellation_slot
当使用用户定义的对象 Handler h时,获取分配器使用如下方法
asio::associated_allocator_t<Handler> a = asio::get_associated_allocator(h);
class my_handler
{
public:
// Custom implementation of Allocator type requirements.
typedef my_allocator allocator_type;
// Return a custom allocator implementation.
allocator_type get_allocator() const noexcept
{
return my_allocator();
}
void operator()() { ... }
};
namespace asio {
template <typename Allocator>
struct associated_allocator<my_handler, Allocator>
{
// Custom implementation of Allocator type requirements.
typedef my_allocator type;
// Return a custom allocator implementation.
static type get(const my_handler&,
const Allocator& a = Allocator()) noexcept
{
return my_allocator();
}
};
} // namespace asio
asio::async_read(my_socket, my_buffer,
asio::bind_cancellation_slot(
my_cancellation_slot,
[](asio::error_code e, std::size_t n)
{
...
}
)
);
cancellation slot and signal. signal发送,slot接收(一个覆盖式handler)
class session
: public std::enable_shared_from_this<proxy>
{
...
void do_read()
{
auto self = shared_from_this();
socket_.async_read_some(
buffer(data_),
asio::bind_cancellation_slot(
cancel_signal_.slot(),
[self](std::error_code error, std::size_t n)
{
...
}
)
);
}
...
void request_cancel()
{
cancel_signal_.emit(asio::cancellation_type::total);
}
...
asio::cancellation_signal cancel_signal_;
};
debugging
通过加入一些宏
https://think-async.com/Asio/asio-1.30.2/doc/asio/overview/core/handler_tracking.html
显示调试信息
异步操作组合
将几个异步操作组成一个简单的状态机
struct async_echo_implementation
{
asio::ip::tcp::socket& socket_;
asio::mutable_buffer buffer_;
enum {starting, reading, writing} state_;
template <typename Self> void
operator()(Self& self, asio::error_code error = {}, size_t n = 0)
{
switch (state_)
{
case starting:
state_ = reading;
socket_.async_read_some(buffer_, std::move(self));
break;
case reading:
if (error)
{
self.complete(error, 0);
}
else
{
state_ = writing;
asio::async_write(socket_, buffer_, asio::transfer_exactly(n), std::move(self));
}
break;
case writing:
self.complete(error, n);
break;
}
}
};
template<typename CompletionToken>
auto async_echo(ip::tcp::socket& socket, mutable_buffer buffer, CompletionToken&& token) ->
typename async_result<