今天看了ASIO的介绍,不太明白asio在POSIX上如何用reactor模拟proactor。所以稍微看了下源代码,此文当作笔记。
ASIO Proactor:
Proactor design pattern (adapted from [POSA2])
— Asynchronous Operation
Defines an operation that is executed asynchronously, such as an asynchronous read or write on a socket.
— Asynchronous Operation Processor
Executes asynchronous operations and queues events on a completion event queue when operations complete. From a high-level point of view, services like
stream_socket_service
are asynchronous operation processors.
— Completion Event Queue
Buffers completion events until they are dequeued by an asynchronous event demultiplexer.
— Completion Handler
Processes the result of an asynchronous operation. These are function objects, often created using
boost::bind
.
— Asynchronous Event Demultiplexer
Blocks waiting for events to occur on the completion event queue, and returns a completed event to its caller.
— Proactor
Calls the asynchronous event demultiplexer to dequeue events, and dispatches the completion handler (i.e. invokes the function object) associated with the event. This abstraction is represented by the
io_service
class.
— Initiator
Application-specific code that starts asynchronous operations. The initiator interacts with an asynchronous operation processor via a high-level interface such as
basic_stream_socket
, which in turn delegates to a service likestream_socket_service
.
Implementation Using Reactor
On many platforms, Asio implements the Proactor design pattern in terms of a Reactor, such as select
, epoll
or kqueue
. This implementation approach corresponds to the Proactor design pattern as follows:
— Asynchronous Operation Processor
A reactor implemented using
select
,epoll
orkqueue
. When the reactor indicates that the resource is ready to perform the operation, the processor executes the asynchronous operation and enqueues the associated completion handler on the completion event queue.
— Completion Event Queue
A linked list of completion handlers (i.e. function objects).
— Asynchronous Event Demultiplexer
This is implemented by waiting on an event or condition variable until a completion handler is available in the completion event queue.
1. Initiator使用Asynchronous Operation Processor发起异步I/O操作2. 保存每个异步I/O操作的参数,包括回调函数的地址,并将其放入Completion Event Queue3. Proactor调用Asynchronous Event Demultiplexer检测完成事件。4. 当检测到I/O操作完成事件,从Completion Event Queue中取出对应的异步I/O操作,并且分派到相应的Completion Handler。5. Completion Handler调用回调函数。
reactor模拟的实现:
task_io_service<reactor> io_service_impl。
// Run the event loop until interrupted or no more work. size_t run(boost::system::error_code& ec) { ec = boost::system::error_code(); if (outstanding_work_ == 0) { stop(); return 0; } typename call_stack<task_io_service>::context ctx(this); idle_thread_info this_idle_thread; this_idle_thread.next = 0; boost::asio::detail::mutex::scoped_lock lock(mutex_); size_t n = 0; for (; do_one(lock, &this_idle_thread); lock.lock()) if (n != (std::numeric_limits<size_t>::max)()) ++n; return n; }
1. this_idle_thread是个保存idle线程的链表。
2. 如果没有异步操作要处理,那么就加入到idle线程的链表中去,一直在那边阻塞等下去。
size_t do_one(boost::asio::detail::mutex::scoped_lock& lock, idle_thread_info* this_idle_thread) { ...... else if (this_idle_thread) { // Nothing to run right now, so just wait for work to do. this_idle_thread->next = first_idle_thread_; first_idle_thread_ = this_idle_thread; this_idle_thread->wakeup_event.clear(lock); this_idle_thread->wakeup_event.wait(lock); } else { return 0; } } return 0; }
socket_service:
类型:
datagram_socket_service
raw_socket_service
socket_acceptor_service
stream_socket_service
当构造socket_service的时候间接会调用init_task():
// Initialise the task, if required. void init_task() { boost::asio::detail::mutex::scoped_lock lock(mutex_); if (!shutdown_ && !task_) { task_ = &use_service<Task>(this->get_io_service()); op_queue_.push(&task_operation_); wake_one_thread_and_unlock(lock); } }
1. 这边的task_operation就是用来表示异步操作即将开始的,而后面会发现op_queue中不但出存放异步操作即将开始的指针,还会存放完成队列指针。
2. 唤醒一个当前的idle线程。
这个时候idle线程就会继续循环,下面我们来看完整的do_one()函数:
size_t do_one(boost::asio::detail::mutex::scoped_lock& lock, idle_thread_info* this_idle_thread) { bool polling = !this_idle_thread; bool task_has_run = false; while (!stopped_) { if (!op_queue_.empty()) // 操作队列不为空 { // Prepare to execute first handler from queue. operation* o = op_queue_.front(); op_queue_.pop(); bool more_handlers = (!op_queue_.empty()); if (o == &task_operation_) // 如果是异步操作即将开始,即socket_service创建完成 { task_interrupted_ = more_handlers || polling; // If the task has already run and we're polling then we're done. if (task_has_run && polling) { task_interrupted_ = true; op_queue_.push(&task_operation_); return 0; } task_has_run = true; if (!more_handlers || !wake_one_idle_thread_and_unlock(lock)) lock.unlock(); op_queue<operation> completed_ops; task_cleanup c = { this, &lock, &completed_ops }; // 当所在的block结束的时候,即下面的通过reactor去等待并且执行操作,去将任务加入到完成队列中,还会添加一个task_operation_ (void)c; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling, otherwise we want to return // as soon as possible. task_->run(!more_handlers && !polling, completed_ops); // 通过reactor去等待并且执行操作 } else { if (more_handlers) // 如果操作队列中还有需要处理的任务 wake_one_thread_and_unlock(lock); // 唤醒idle线程 else lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. work_finished_on_block_exit on_exit = { this }; (void)on_exit; // Complete the operation. May throw an exception. o->complete(*this); // deletes the operation object //异步操作完成回调 return 1; } } else if (this_idle_thread) // 加入到idle线程链表中,并且wait阻塞。 { // Nothing to run right now, so just wait for work to do. this_idle_thread->next = first_idle_thread_; first_idle_thread_ = this_idle_thread; this_idle_thread->wakeup_event.clear(lock); this_idle_thread->wakeup_event.wait(lock); } else { return 0; } } return 0; }
struct task_cleanup { ~task_cleanup() { 添加完成操作指针到操作队列,并且重新插入task_operation_ lock_->lock(); task_io_service_->task_interrupted_ = true; task_io_service_->op_queue_.push(*ops_); task_io_service_->op_queue_.push(&task_io_service_->task_operation_); } task_io_service* task_io_service_; boost::asio::detail::mutex::scoped_lock* lock_; op_queue<operation>* ops_; };
这边还会有很多细节,等到啥时有空的时候来个深入分析。