boost.asio之异步编程

时间:2022-09-08 22:12:45

异步的需求

同步编程比异步编程简单很多。这是因为,线性的思考是很简单的(调用A,调用A结束,调用B,调用B结束,然后继续,这是以事件处理的方式来思考)。后面你会碰到这种情况,比如:五件事情,你不知道它们执行的顺序,也不知道他们是否会执行!

尽管异步编程更难,但是你会更倾向于选择使用它,比如:写一个需要处理很多并发访问的服务端。并发访问越多,异步编程就比同步编程越简单。

假设:你有一个需要处理1000个并发访问的应用,从客户端发给服务端的每个信息都会再返回给客户端,以‘\n’结尾。

同步方式的代码,1个线程:

using namespace boost::asio;
struct client {
ip::tcp::socket sock;
char buff[1024]; // 每个信息最多这么大
int already_read; // 你已经读了多少
};
std::vector<client> clients;
void handle_clients() {
while ( true)
for ( int i = 0; i < clients.size(); ++i)
if ( clients[i].sock.available() ) on_read(clients[i]);
}
void on_read(client & c) {
int to_read = std::min( 1024 - c.already_read, c.sock.available());
c.sock.read_some( buffer(c.buff + c.already_read, to_read));
c.already_read += to_read;
if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) {
int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff;
std::string msg(c.buff, c.buff + pos);
std::copy(c.buff + pos, c.buff + 1024, c.buff);
c.already_read -= pos;
on_read_msg(c, msg);
}
}
void on_read_msg(client & c, const std::string & msg) {
// 分析消息,然后返回
if ( msg == "request_login")
c.sock.write( "request_ok\n");
else if ...
}

有一种情况是在任何服务端(和任何基于网络的应用)都需要避免的,就是代码无响应的情况。在我们的例子里,我们需要handle_clients()方法尽可能少的阻塞。如果方法在某个点上阻塞,任何进来的信息都需要等待方法解除阻塞才能被处理。

为了保持响应,只在一个套接字有数据的时候我们才读,也就是说,if ( clients[i].sock.available() ) on_read(clients[i])。在on_read时,我们只读当前可用的;调用read_until(c.sock, buffer(…), ‘\n’)会是一个非常糟糕的选择,因为直到我们从一个指定的客户端读取了完整的消息之前,它都是阻塞的(我们永远不知道它什么时候会读取到完整的消息)

这里的瓶颈就是on_read_msg()方法;当它执行时,所有进来的消息都在等待。一个良好的on_read_msg()方法实现会保证这种情况基本不会发生,但是它还是会发生(有时候向一个套接字写入数据,缓冲区满了时,它会被阻塞)
同步方式的代码,10个线程

using namespace boost::asio;
struct client {
  // ... 和之前一样
bool set_reading() {
boost::mutex::scoped_lock lk(cs_);
if ( is_reading_) return false; // 已经在读取
else { is_reading_ = true; return true; }
}
void unset_reading() {
boost::mutex::scoped_lock lk(cs_);
is_reading_ = false;
}
private:
boost::mutex cs_;
bool is_reading_;
};
std::vector<client> clients;
void handle_clients() {
for ( int i = 0; i < 10; ++i)
boost::thread( handle_clients_thread);
}
void handle_clients_thread() {
while ( true)
for ( int i = 0; i < clients.size(); ++i)
if ( clients[i].sock.available() )
if ( clients[i].set_reading()) {
on_read(clients[i]);
clients[i].unset_reading();
}
}
void on_read(client & c) {
// 和之前一样
}
void on_read_msg(client & c, const std::string & msg) {
// 和之前一样
}

为了使用多线程,我们需要对线程进行同步,这就是set_reading()set_unreading()所做的。set_reading()方法非常重要,比如你想要一步实现“判断是否在读取然后标记为读取中”。但这是有两步的(“判断是否在读取”和“标记为读取中”),你可能会有两个线程同时为一个客户端判断是否在读取,然后你会有两个线程同时为一个客户端调用on_read,结果就是数据冲突甚至导致应用崩溃。

你会发现代码变得极其复杂。

同步编程有第三个选择,就是为每个连接开辟一个线程。但是当并发的线程增加时,这就成了一种灾难性的情况。

然后,让我们来看异步编程。我们不断地异步读取。当一个客户端请求某些东西时,on_read被调用,然后回应,然后等待下一个请求(然后开始另外一个异步的read操作)。

异步方式的代码,10个线程

using namespace boost::asio;
io_service service;
struct client {
ip::tcp::socket sock;
streambuf buff; // 从客户端取回结果
}
std::vector<client> clients;
void handle_clients() {
for ( int i = 0; i < clients.size(); ++i)
async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2));
for ( int i = 0; i < 10; ++i)
boost::thread(handle_clients_thread);
}
void handle_clients_thread() {
service.run();
}
void on_read(client & c, const error_code & err, size_t read_bytes) {
std::istream in(&c.buff);
std::string msg;
std::getline(in, msg);
if ( msg == "request_login")
c.sock.async_write( "request_ok\n", on_write);
else if ...
...
// 等待同一个客户端下一个读取操作
async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2));
}

发现代码变得有多简单了吧?client结构里面只有两个成员,handle_clients()仅仅调用了async_read_until,然后它创建了10个线程,每个线程都调用service.run()。这些线程会处理所有来自客户端的异步read操作,然后分发所有向客户端的异步write操作。另外需要注意的一件事情是:on_read()一直在为下一次异步read操作做准备(看最后一行代码)。

异步run(), run_one(), poll(), poll_ one()

为了实现监听循环,io_service类提供了4个方法,比如:run(), run_one(), poll()poll_one()。虽然大多数时候使用service.run()就可以,但是你还是需要在这里学习其他方法实现的功能。

持续运行

再一次说明,如果有等待执行的操作,run()会一直执行,直到你手动调用io_service::stop()。为了保证io_service一直执行,通常你添加一个或者多个异步操作,然后在它们被执行时,你继续一直不停地添加异步操作,比如下面代码:

using namespace boost::asio;
io_service service;
ip::tcp::socket sock(service);
char buff_read[1024], buff_write[1024] = "ok";
void on_read(const boost::system::error_code &err, std::size_t bytes);
void on_write(const boost::system::error_code &err, std::size_t bytes)
{
sock.async_read_some(buffer(buff_read), on_read);
}
void on_read(const boost::system::error_code &err, std::size_t bytes)
{
// ... 处理读取操作 ...
sock.async_write_some(buffer(buff_write,3), on_write);
}
void on_connect(const boost::system::error_code &err) {
sock.async_read_some(buffer(buff_read), on_read);
}
int main(int argc, char* argv[]) {
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 2001);
sock.async_connect(ep, on_connect);
service.run();
}
  1. service.run()被调用时,有一个异步操作在等待。
  2. 当socket连接到服务端时,on_connect被调用了,它会添加一个异步操作。
  3. on_connect结束时,我们会留下一个等待的操作(read)。
  4. on_read被调用时,我们写入一个回应,这又添加了另外一个等待的操作。
  5. on_read结束时,我们会留下一个等待的操作(write)。
  6. on_write操作被调用时,我们从服务端读取另外一个消息,这也添加了另外一个等待的操作。
  7. on_write结束时,我们有一个等待的操作(read)。
  8. 然后一直继续循环下去,直到我们关闭这个应用。

run_one(), poll(), poll_one() 方法

我在之前说过异步方法的handler是在调用了io_service::run的线程里被调用的。因为在至少90%~95%的时候,这是你唯一要用到的方法,所以我就把它说得简单了。对于调用了run_one(), poll()或者poll_one()的线程这一点也是适用的。

run_one()方法最多执行和分发一个异步操作:
* 如果没有等待的操作,方法立即返回0
* 如果有等待操作,方法在第一个操作执行之前处于阻塞状态,然后返回1

你可以认为下面两段代码是等效的:

io_service service;
service.run(); // 或者
while ( !service.stopped()) service.run_once();

你可以使用run_once()启动一个异步操作,然后等待它执行完成。

io_service service;
bool write_complete = false;
void on_write(const boost::system::error_code & err, size_t bytes)
{ write_complete = true; }

std::string data = "login ok”;
write_complete = false;
async_write(sock, buffer(data), on_write);
do service.run_once() while (!write_complete);

还有一些使用run_one()方法的例子,包含在Boost.Asio诸如blocking_tcp_client.cppblocking_udp_client.cpp的文件中。

poll_one方法以非阻塞的方式最多运行一个准备好的等待操作:
* 如果至少有一个等待的操作,而且准备好以非阻塞的方式运行,poll_one方法会运行它并且返回1
* 否则,方法立即返回0

操作正在等待并准备以非阻塞方式运行,通常意味着如下的情况:
* 一个计时器过期了,然后它的async_wait处理方法需要被调用
* 一个I/O操作完成了(比如async_read),然后它的hanlder需要被调用
* 之前被加入io_services实例队列中的自定义handler(这会在之后的章节中详解)

你可以使用poll_one去保证所有I/O操作的handler完成运行,同时做一些其他的工作

io_service service;
while ( true) {
// 运行所有完成了IO操作的handler
while ( service.poll_one()) ;
// ... 在这里做其他的事情 …
}

poll()方法会以非阻塞的方式运行所有等待的操作。下面两段代码是等效的:

io_service service;
service.poll(); // 或者
while ( service.poll_one()) ;

所有上述方法都会在失败的时候抛出boost::system::system_error异常。这是我们所不希望发生的事情;这里抛出的异常通常都是致命的,也许是资源耗尽,或者是你handler的其中一个抛出了异常。另外,每个方法都有一个不抛出异常,而是返回一个boost::system::error_code的重载:

io_service service;
boost::system::error_code err = 0;
service.run(err);
if ( err) std::cout << "Error " << err << std::endl;

异步工作

异步工作不仅仅指用异步地方式接受客户端到服务端的连接、异步地从一个socket读取或者写入到socket。它包含了所有可以异步执行的操作。

默认情况下,你是不知道每个异步handler的调用顺序的。除了通常的异步调用(来自异步socket的读取/写入/接收)。你可以使用service.post()来使你的自定义方法被异步地调用。例如:

#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <iostream>
using namespace boost::asio;
io_service service;
void func(int i) {
std::cout << "func called, i= " << i << std::endl;
}

void worker_thread() {
service.run();
}

int main(int argc, char* argv[]) {
for ( int i = 0; i < 10; ++i)
service.post(boost::bind(func, i));
boost::thread_group threads;
for ( int i = 0; i < 3; ++i)
threads.create_thread(worker_thread);
// 等待所有线程被创建完
boost::this_thread::sleep( boost::posix_time::millisec(500));
threads.join_all();
}

在上面的例子中,service.post(some_function)添加了一个异步方法调用。

这个方法在某一个调用了service.run()的线程中请求io_service实例,然后调用给定的some_funtion之后立即返回。在我们的例子中,这个线程是我们之前创建的三个线程中的一个。你不能确定异步方法调用的顺序。你不要期待它们会以我们调用post()方法的顺序来调用。下面是运行之前代码可能得到的结果:

func called, i= 0
func called, i= 2
func called, i= 1
func called, i= 4
func called, i= 3
func called, i= 6
func called, i= 7
func called, i= 8
func called, i= 5
func called, i= 9

有时候你会想让一些异步处理方法顺序执行。比如,你去一个餐馆(go_to_restaurant),下单(order),然后吃(eat)。你需要先去餐馆,然后下单,最后吃。这样的话,你需要用到io_service::strand,这个方法会让你的异步方法被顺序调用。看下面的例子:

using namespace boost::asio;
io_service service;
void func(int i) {
std::cout << "func called, i= " << i << "/" << boost::this_thread::get_id() << std::endl;
}
void worker_thread() {
service.run();
}
int main(int argc, char* argv[])
{
io_service::strand strand_one(service), strand_two(service);
for ( int i = 0; i < 5; ++i)
service.post( strand_one.wrap( boost::bind(func, i)));
for ( int i = 5; i < 10; ++i)
service.post( strand_two.wrap( boost::bind(func, i)));
boost::thread_group threads;
for ( int i = 0; i < 3; ++i)
threads.create_thread(worker_thread);
// 等待所有线程被创建完
boost::this_thread::sleep( boost::posix_time::millisec(500));
threads.join_all();
}

在上述代码中,我们保证前面的5个线程和后面的5个线程是顺序执行的。func called, i = 0func called, i = 1之前被调用,然后调用func called, i = 2……同样func called, i = 5func called, i = 6之前,func called, i = 6func called, i = 7被调用……你需要注意的是尽管方法是顺序调用的,但是不意味着它们都在同一个线程执行。运行这个程序可能得到的一个结果如下:

func called, i= 0/002A60C8
func called, i= 5/002A6138
func called, i= 6/002A6530
func called, i= 1/002A6138
func called, i= 7/002A6530
func called, i= 2/002A6138
func called, i= 8/002A6530
func called, i= 3/002A6138
func called, i= 9/002A6530
func called, i= 4/002A6138

异步post() VS dispatch() VS wrap()

Boost.Asio提供了三种让你把处理方法添加为异步调用的方式:
* service.post(handler):这个方法能确保其在请求io_service实例,然后调用指定的处理方法之后立即返回。handler稍后会在某个调用了service.run()的线程中被调用。
* service.dispatch(handler):这个方法请求io_service实例去调用给定的处理方法,但是另外一点,如果当前的线程调用了service.run(),它可以在方法中直接调用handler。
* service.wrap(handler):这个方法创建了一个封装方法,当被调用时它会调用service.dispatch(handler),这个会让人有点困惑,接下来我会简单地解释它是什么意思。

在之前的章节中你会看到关于service.post()的一个例子,以及运行这个例子可能得到的一种结果。我们对它做一些修改,然后看看service.dispatch()是怎么影响输出的结果的:

using namespace boost::asio;
io_service service;
void func(int i) {
std::cout << "func called, i= " << i << std::endl;
}
void run_dispatch_and_post() {
for ( int i = 0; i < 10; i += 2) {
service.dispatch(boost::bind(func, i));
service.post(boost::bind(func, i + 1));
}
}
int main(int argc, char* argv[]) {
service.post(run_dispatch_and_post);
service.run();
}

在解释发生了什么之前,我们先运行程序,观察结果:

func called, i= 0
func called, i= 2
func called, i= 4
func called, i= 6
func called, i= 8
func called, i= 1
func called, i= 3
func called, i= 5
func called, i= 7
func called, i= 9

偶数先输出,然后是奇数。这是因为我用dispatch()输出偶数,然后用post()输出奇数。dispatch()会在返回之前调用hanlder,因为当前的线程调用了service.run(),而post()每次都立即返回了。
现在,让我们讲讲service.wrap(handler)wrap()返回了一个仿函数,它可以用来做另外一个方法的参数:

using namespace boost::asio;
io_service service;
void dispatched_func_1() {
std::cout << "dispatched 1" << std::endl;
}
void dispatched_func_2() {
std::cout << "dispatched 2" << std::endl;
}
void test(boost::function<void()> func) {
std::cout << "test" << std::endl;
service.dispatch(dispatched_func_1);
func();
}
void service_run() {
service.run();
}
int main(int argc, char* argv[]) {
test( service.wrap(dispatched_func_2));
boost::thread th(service_run);
boost::this_thread::sleep( boost::posix_time::millisec(500));
th.join();
}

test(service.wrap(dispatched_func_2));会把dispatched_ func_2包装起来创建一个仿函数,然后传递给test当作一个参数。当test()被调用时,它会分发调用方法1,然后调用func()。这时,你会发现调用func()service.dispatch(dispatched_func_2)是等价的,因为它们是连续调用的。程序的输出证明了这一点:

test
dispatched 1
dispatched 2

io_service::strand 类(用来序列化异步调用)也包含了poll(), dispatch()wrap()等成员函数。它们的作用和io_servicepoll(), dispatch()wrap()是一样的。然而,大多数情况下你只需要把io_service::strand::wrap()方法做为io_service::poll()或者io_service::dispatch()方法的参数即可。

保持活动

假设你需要做下面的操作:

io_service service;
ip::tcp::socket sock(service);
char buff[512];
...
read(sock, buffer(buff));

在这个例子中,sockbuff的存在时间都必须比read()调用的时间要长。也就是说,在调用read()返回之前,它们都必须有效。这就是你所期望的;你传给一个方法的所有参数在方法内部都必须有效。当我们采用异步方式时,事情会变得比较复杂。

io_service service;
ip::tcp::socket sock(service);
char buff[512];
void on_read(const boost::system::error_code &, size_t) {}
...
async_read(sock, buffer(buff), on_read);

在这个例子中,sockbuff的存在时间都必须比read()操作本身时间要长,但是read操作持续的时间我们是不知道的,因为它是异步的。

当使用socket缓冲区的时候,你会有一个buffer实例在异步调用时一直存在(使用boost::shared_array<>)。在这里,我们可以使用同样的方式,通过创建一个类并在其内部管理socket和它的读写缓冲区。然后,对于所有的异步操作,传递一个包含智能指针的boost::bind仿函数给它:

using namespace boost::asio;
io_service service;
struct connection : boost::enable_shared_from_this<connection> {
typedef boost::system::error_code error_code;
typedef boost::shared_ptr<connection> ptr;
connection() : sock_(service), started_(true) {}
void start(ip::tcp::endpoint ep) {
sock_.async_connect(ep, boost::bind(&connection::on_connect, shared_from_this(), _1));
}
void stop() {
if ( !started_) return;
started_ = false;
sock_.close();
}
bool started() { return started_; }
private:
void on_connect(const error_code & err) {
// 这里你决定用这个连接做什么: 读取或者写入
if ( !err) do_read();
else stop();
}
void on_read(const error_code & err, size_t bytes) {
if ( !started() ) return;
std::string msg(read_buffer_, bytes);
if ( msg == "can_login") do_write("access_data");
else if ( msg.find("data ") == 0) process_data(msg);
else if ( msg == "login_fail") stop();
}
void on_write(const error_code & err, size_t bytes) {
do_read();
}
void do_read() {
sock_.async_read_some(buffer(read_buffer_), boost::bind(&connection::on_read, shared_from_this(), _1, _2));
}
void do_write(const std::string & msg) {
if ( !started() ) return;
// 注意: 因为在做另外一个async_read操作之前你想要发送多个消息,
// 所以你需要多个写入buffer
std::copy(msg.begin(), msg.end(), write_buffer_);
sock_.async_write_some(buffer(write_buffer_, msg.size()), boost::bind(&connection::on_write, shared_from_this(), _1, _2));
}

void process_data(const std::string & msg) {
// 处理服务端来的内容,然后启动另外一个写入操作
}
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
char read_buffer_[max_msg];
char write_buffer_[max_msg];
bool started_;
};

int main(int argc, char* argv[]) {
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 8001);
connection::ptr(new connection)->start(ep);
}

在所有异步调用中,我们传递一个boost::bind仿函数当作参数。这个仿函数内部包含了一个智能指针,指向connection实例。只要有一个异步操作等待时,Boost.Asio就会保存boost::bind仿函数的拷贝,这个拷贝保存了指向连接实例的一个智能指针,从而保证connection实例保持活动。问题解决!

当然,connection类仅仅是一个框架类;你需要根据你的需求对它进行调整(它看起来会和当前服务端例子的情况相当不同)。

你需要注意的是创建一个新的连接是相当简单的:connection::ptr(new connection)- >start(ep)。这个方法启动了到服务端的(异步)连接。当你需要关闭这个连接时,调用stop()

当实例被启动时(start()),它会等待客户端的连接。当连接发生时。on_connect()被调用。如果没有错误发生,它启动一个read操作(do_read())。当read操作结束时,你就可以解析这个消息;当然你应用的on_read()看起来会各种各样。而当你写回一个消息时,你需要把它拷贝到缓冲区,然后像我在do_write()方法中所做的一样将其发送出去,因为这个缓冲区同样需要在这个异步写操作中一直存活。最后需要注意的一点——当写回时,你需要指定写入的数量,否则,整个缓冲区都会被发送出去。

总结

网络api实际上要繁杂得多,这个章节只是做为一个参考,当你在实现自己的网络应用时可以回过头来看看。

Boost.Asio实现了端点的概念,你可以认为是IP和端口。如果你不知道准确的IP,你可以使用resolver对象将主机名,例如www.yahoo.com转换为一个或多个IP地址。

我们也可以看到API的核心——socket类。Boost.Asio提供了TCP、UDPICMP的实现。而且你还可以用你自己的协议来对它进行扩展;当然,这个工作不适合缺乏勇气的人。

异步编程是刚需。你应该已经明白为什么有时候需要用到它,尤其在写服务端的时候。调用service.run()来实现异步循环就已经可以让你很满足,但是有时候你需要更进一步,尝试使用run_one()、poll()或者poll_one()

当实现异步时,你可以异步执行你自己的方法;使用service.post()或者service.dispatch()

最后,为了使socket和缓冲区(read或者write)在整个异步操作的生命周期中一直活动,我们需要采取特殊的防护措施。你的连接类需要继承自enabled_shared_from_this,然后在内部保存它需要的缓冲区,而且每次异步调用都要传递一个智能指针给this操作。

参考资料: https://github.com/okingniko/boost-asio-cpp-network-programming-chinese