IO模型
io_service对象是asio框架中的调度器,所有异步io事件都是通过它来分发处理的(io对象的构造函数中都需要传入一个io_service对象)。
asio::io_service io_service;
asio::ip::tcp::socket socket(io_service);
在asio框架中,同步的io主要流程如下:
-
应用程序调用IO对象成员函数执行IO操作
-
IO对象向io_service 提出请求.
-
io_service 调用操作系统的功能执行连接操作.
-
操作系统向io_service 返回执行结果.
-
io_service将错误的操作结果翻译为boost::system::error_code类型,再传递给IO对象.
-
如果操作失败,IO对象抛出boost::system::system_error类型的异常.
而异步IO的处理流程则有些不同:
-
应用程序调用IO对象成员函数执行IO操作
-
IO对象请求io_service的服务
-
io_service 通知操作系统其需要开始一个异步连接.
-
操作系统指示连接操作完成, io_service从队列中获取操作结果
-
应用程序必须调用io_service::run()以便于接收结果
-
调用io_service::run()后,io_service返回一个操作结果,并将其翻译为error_code,传递到事件回调函数中
io_service对象
io_servuce的作用: io_servie 实现了一个任务队列,这里的任务就是void(void)的函数。Io_servie最常用的两个接口是post和run,post向任务队列中投递任务,run是执行队列中的任务,直到全部执行完毕,并且run可以被N个线程调用。Io_service是完全线程安全的队列。
io_service对象提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop:
-
post用于发布io事件,如timer,socket读写等,一般由asio框架相应对象调用,无需我们显式调用。
-
run用于监听io事件响应,并执行响应回调,对于异步io操作需要在代码中显式调用,对于同步io操作则由io对象隐式调用(并不是run函数,不过也是等待io事件)。
可见,io_service提供的是一个生产者消费者模型。在异步io操作中需要我们手动控制消费者,调用run函数,它的基本工作模式如下:
-
等待io事件响应,如果所有io事件响应完成则退出
-
等待到io事件响应后,执行其对应的回调
-
继续等待下一个io事件,重复1-2
Io_servie 实现代码的基本类结构:
l Io_servie是接口类,为实现跨平台,采用了策略模式,所有接口均有impl_type实现。根据平台不同impl_type分为
n win_iocp_io_service Win版本的实现,这里主要分析Linux版本。
n task_io_service 非win平台下的实现,其代码结构为:
u detail/task_io_service_fwd.hpp 简单声明task_io_service名称
u detail/task_io_service.hpp 声明task_io_service的方法和属性
u detail/impl/task_io_service.ipp 具体实现文件
u 队列中的任务类型为opertioan,原型其实是typedef task_io_service_operation operation,其实现文件在detail/task_io_service_operation.hpp中,当队列中的任务被执行时,就是task_io_service_operation:: complete被调用的时候。
Io_servie::Post方法的实现
Post向队列中投递任务,然后激活空闲线程执行任务。其实现流程如下:
l Post接收handler作为参数,实际上是个仿函数,通过此仿函数构造出completion_handler对象,completion_handler继承自operation。然后调用post_immediate_completion。
l post_immediate_completion首先将outstanding_work_增加,然后调用post_deferred_completion。
l post_deferred_completion首先加锁将任务入列,然后调用wake_one_thread_and_unlock
l wake_one_thread_and_unlock尝试唤醒当前空闲的线程,其实现中特别之处在于,若没有空闲线程,但是有线程在执行task->run,即阻塞在epoll_wait上,那么先中断epoll_wait执行任务队列完成后再执行epoll_wait。
l first_idle_thread_维护了所有当前空闲线程,实际上使用了Leader/Follower模式,每次唤醒时只唤醒空闲线程的第一个。
Io_servie::run方法的实现
Run方法执行队列中的所有任务,直到任务执行完毕。
l run方法首先构造一个idle_thread_info,和first_idle_thread_类型相同,即通过first_idle_thread_将所有线程串联起来,它这个串联不是立即串联的,当该线程无任务可做是加入到first_idle_thread_的首部,有任务执行时,从first_idle_thread_中断开。这很正常,因为first_idle_thread_维护的是当前空闲线程。
l 加锁,循环执行do_one方法,直到do_one返回false
l do_one每次执行一个任务。首先检查队列是否为空,若空将此线程追加到first_idle_thread_的首部,然后阻塞在条件变量上,直到被唤醒。
l 当被唤醒或是首次执行,若stopped_为true(即此时stop方法被调用了),返回0
l 队列非空,pop出一个任务,检查队列无任务那么简单的解锁,若仍有,调用wake_one_thread_and_unlock尝试唤醒其他空闲线程执行。然后执行该任务,返回1.
l 实际上在执行队列任务时有一个特别的判断if (o == &task_operation_),那么将会执行task_->run,task_变量类型为reactor,在linux平台实现为epoll_reactor,实现代码文件为detail/impl/epoll_reactor.ipp,run方法实际上执行的是epoll_wait,run阻塞在epoll_wait上等待事件到来,并且处理完事件后将需要回调的函数push到io_servie的任务队列中,虽然epoll_wait是阻塞的,但是它提供了interrupt函数,该interrupt是如何实现的呢,它向epoll_wait添加一个文件描述符,该文件描述符中有8个字节可读,这个文件描述符是专用于中断epoll_wait的,他被封装到select_interrupter中,select_interrupter实际上实现是eventfd_select_interrupter,在构造的时候通过pipe系统调用创建两个文件描述符,然后预先通过write_fd写8个字节,这8个字节一直保留。在添加到epoll_wait中采用EPOLLET水平触发,这样,只要select_interrupter的读文件描述符添加到epoll_wait中,立即中断epoll_wait。很是巧妙。!!!实际上就是因为有了这个reactor,它才叫io_servie,否则就是一个纯的任务队列了。
l Run方法的原则是:
n 有任务立即执行任务,尽量使所有的线程一起执行任务
n 若没有任务,阻塞在epoll_wait上等待io事件
n 若有新任务到来,并且没有空闲线程,那么先中断epoll_wait,先执行任务
n 若队列中有任务,并且也需要epoll_wait监听事件,那么非阻塞调用epoll_wait(timeout字段设置为0),待任务执行完毕在阻塞在epoll_wait上。
n 几乎对线程的使用上达到了极致。
n 从这个函数中可以知道,在使用ASIO时,io_servie应该尽量多,这样可以使其epoll_wait占用的时间片最多,这样可以最大限度的响应IO事件,降低响应时延。但是每个io_servie::run占用一个线程,所以io_servie最佳应该和CPU的核数相同。
Io_servie::stop的实现
l 加锁,调用stop_all_threads
l 设置stopped_变量为true,遍历所有的空闲线程,依次唤醒
l task_interrupted_设置为true,调用task_的interrupt方法
l task_的类型为reactor,在run方法中已经做了分析
从中可以看出,io_service是一个工作队列的模型。在使用过程中一般有如下几个需要注意的地方:
1. run函数在io事件完成后会退出,导致后续基于该对象的异步io任务无法执行
由于io_service并不会主动常见调度线程,需要我们手动分配,常见的方式是给其分配一个线程,然后执行run函数。但run函数在io事件完成后会退出,线程会终止,后续基于该对象的异步io任务无法得到调度。
解决这个问题的方法是通过一个asio::io_service::work对象来守护io_service。这样,即使所有io任务都执行完成,也不会退出,继续等待新的io任务。
boost::asio::io_service io;
boost::asio::io_service::work work(io);
io.run();
2. 回调在run函数的线程中同步执行,当回调处理时间较长时阻塞后续io响应
解决这个问题的方法有两种:1. 启动多线程执行run函数(run函数是线程安全的),2. 新启动一个线程(或通过线程池)来执行回调函数。一般来讲,如果回调处理事件不是特别短,应该使用在线程池中处理回调的方式。
3. 回调在run函数的线程中同步执行,io事件较多的时候得不到及时响应
这个其实是性能问题了,在多核cpu上可以通过在多个线程中执行run函数来解决这一问题。这种方式也只能充分利用cpu性能,本身性能问题就不是光靠软件就能解决的。
.net中的异步io调度方式
和io_service这种手动控制的方式比起来,.net则是纯粹的自动档了。IO调度由CLR托管了,无需手动控制。回调也是在线程池中执行,无需担心影响后续IO响应。
正是由于CLR的托管,在.net 的异步IO框架中,就没有类似io_service的调度对象存在,这也符合.net的一贯简洁做法。
◆boost::asio::io_service使用时的注意事项:
①请让boost::asio::io_service和boost::asio::io_service::work搭配使用。
②想让event按照进入(strand)时的顺序被执行,需要boost::asio::io_service要和boost::asio::io_service::strand搭配使用。
③一般情况下,io_service的成员函数的使用顺序:
boost::asio::io_service构造,
boost::asio::io_service::run(),
boost::asio::io_service::stop(),
boost::asio::io_service::reset(),
boost::asio::io_service::run(),
......
boost::asio::io_service析构,
④不论有没有使用io_service::work,run()都会执行完io_service里面的event,(若没有用work,run就会退出)。
⑤一个新创建的io_service不需要执行reset()函数。
⑥在调用stop()后,在调用run()之前,请先调用reset()函数。
⑦函数stop()和reset()并不能清除掉io_service里面尚未执行的event。
我个人认为,也只有析构掉io_service,才能清空它里面的那些尚未执行的event了。(可以用智能指针)。
⑧函数stop(),stopped(),reset(),很简单,请单步调试,以明白它在函数里做了什么。
⑨boost的.hpp文件里面(一般情况下)有各个函数的使用说明,你可以随时查看。
◆下面是boost::asio::io_service的stop()和reset()函数的注释的翻译:
void boost::asio::io_service::stop(); BOOST_ASIO_DECL void stop(); /// Stop the io_service object's event processing loop. /// 停止io_service对象的事件处理循环。 /** * This function does not block, but instead simply signals the io_service to * stop. All invocations of its run() or run_one() member functions should * return as soon as possible. Subsequent calls to run(), run_one(), poll() * or poll_one() will return immediately until reset() is called. */ /** 这个函数不阻塞,而是仅仅表示io_service停止了。 它的run()或run_one()成员函数的调用应当尽快返回。 对run()、run_one()、poll()、poll_one()的随后的调用将会立即返回直到reset()函数被调用了。 */ void boost::asio::io_service::reset(); BOOST_ASIO_DECL void reset(); /// Reset the io_service in preparation for a subsequent run() invocation. /// 重置io_service对象,为随后的run()调用做准备。 /** * This function must be called prior to any second or later set of * invocations of the run(), run_one(), poll() or poll_one() functions when a * previous invocation of these functions returned due to the io_service * being stopped or running out of work. After a call to reset(), the * io_service object's stopped() function will return @c false. * * This function must not be called while there are any unfinished calls to * the run(), run_one(), poll() or poll_one() functions. */ /** io_service被停止,或者执行完handler而缺乏工作时,run()、run_one()、poll()、poll_one()函数的调用会被返回。 这些函数在被调用之前,必须先调用reset函数。 在reset函数被调用后,io_service对象的stopped函数将会返回false。 当run()、run_one()、poll()、poll_one()函数的任何的调用未结束时,这个函数一定不能被调用。 */
◆对stop()和reset()函数的一点说明(是我单步调试时看到的):
在Windows下,boost::asio::io_service类里面有一个数据成员为"stopped_"(Flag to indicate whether the event loop has been stopped.)。它是一个标志,它标志着事件循环是不是被stopped了。而boost::asio::io_service::reset()函数仅仅是赋值"stopped_=0"。boost::asio::io_service::stopped()函数仅仅是判断"0!=stopped_"的真假。你单步调试一下,就什么都知道了。
◆下面是我验证boost::asio::io_service的一个例子:
1 #include <boost/asio.hpp> 2 #include <boost/thread.hpp> 3 #include <boost/atomic.hpp> 4 #include <boost/shared_ptr.hpp> 5 #include <boost/date_time/posix_time/ptime.hpp> 6 #include <boost/date_time.hpp>//boost::posix_time::to_iso_extended_string()需要此头文件。 7 8 //boost::atomic_bool coutFlag = false; 9 //error C2440: 'initializing' : cannot convert from 'bool' to 'boost::atomics::atomic<bool>' 10 //故意写错,可以根据错误信息知道某类型的详细信息。 11 boost::atomic_bool g_coutFlag(false); 12 boost::atomic_int g_numIn(0); 13 boost::atomic_int g_numOut(0); 14 15 boost::thread_group g_thgp; 16 boost::asio::io_service g_io; 17 boost::shared_ptr<boost::asio::io_service::work> g_pWork = \ 18 boost::shared_ptr<boost::asio::io_service::work>(new boost::asio::io_service::work(g_io)); 19 boost::asio::io_service::strand g_strand(g_io); 20 std::vector<boost::posix_time::ptime> g_vecTimes; 21 22 void my_run_4_io_service(boost::asio::io_service& _io, int _idx) 23 { 24 _io.run(); 25 //想得到boost::asio::io_service::run()退出时的时刻,只能对io_service进行封装了。 26 g_vecTimes[_idx] = boost::posix_time::microsec_clock::local_time(); 27 } 28 29 void outFun(int idx) 30 {// io_service执行的handler。 31 ++g_numOut; 32 if (g_coutFlag.load()) 33 std::cout << "outFun: index=" << idx << std::endl; 34 boost::this_thread::sleep_for(boost::chrono::milliseconds(500)); 35 } 36 37 void inFun() 38 { 39 for (int i = 1; i <= 10; ++i) 40 { 41 g_strand.post(boost::bind(outFun, i)); 42 ++g_numIn; 43 boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); 44 } 45 g_coutFlag = true; 46 g_io.stop();//调用它后,不论io_service有没有使用io_service::work类,各个线程的run()都会立即返回。 47 g_vecTimes[0] = boost::posix_time::microsec_clock::local_time(); 48 int numDelta = g_numIn - g_numOut; 49 std::cout << "inFun: numDelta=" << numDelta << std::endl;//还剩多少event没有被执行。 50 } 51 52 int main() 53 { 54 int vecNum = 5; 55 g_vecTimes.reserve(vecNum); g_vecTimes.resize(vecNum); 56 //一个容纳 void fun(int i) 函数的 function对象。 57 boost::function<void(int)> my_lambda_function_object = [vecNum](int secs) 58 { 59 boost::this_thread::sleep_for(boost::chrono::microseconds(1000 * 1000 * secs)); 60 std::cout << "now, time is " << boost::posix_time:: 61 to_iso_extended_string(boost::posix_time::microsec_clock::local_time()) << std::endl; 62 for (int i = 0; i < vecNum; ++i) 63 std::cout << i << " : " << boost::posix_time::to_iso_extended_string(g_vecTimes[i]) << std::endl; 64 }; 65 66 for (int i = 1; i < vecNum; ++i) 67 g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i)); 68 g_thgp.create_thread(inFun); 69 //等待5秒,确保执行完毕我设计的那些操作。 70 my_lambda_function_object(5); 71 //析构掉io_service对应的io_service::work对象,此时io_service里面还有event。 72 g_pWork = nullptr; 73 boost::this_thread::sleep_for(boost::chrono::milliseconds(1000 * 1)); 74 g_io.reset(); 75 boost::this_thread::sleep_for(boost::chrono::seconds(1)); 76 //因为work被析构掉了,所以启动的那些线程在执行完event后,都自行退出了。 77 for (int i = 1; i < vecNum; ++i) 78 g_thgp.create_thread(boost::bind(my_run_4_io_service, boost::ref(g_io), i)); 79 //等待6秒,确保io_service中剩余的event被执行完毕。 80 my_lambda_function_object(6); 81 std::cout << "done." << std::endl; 82 int cmd_val = getchar(); 83 return 0; 84 }