从上面对于Proactor的分析可知,首先用户以Initiator角色发起一个读写事件。Initiator则通过stream_socket_service来进行相关IO操作。故而,顺着着这条主线,下面我们就进入stream_socket_service的内部来一探究竟。在stream_socket_service.hpp文件中我们看到如下一段语句,而该段代码也将会在以后很多的文件中看到。这段代码的主要作用就是:在完成对所运行的平台进行相关的判定后,决定其所使用的具体实现方式。若所运行环境处于windows平台下则采用iocp的方式来完成异步IO,若处于Linux/Unix/MacOS等环境下则采用select/poll/epoll的Reactor模式来实现。
l stream_socket_service类
#if defined(BOOST_ASIO_HAS_IOCP)
# include <boost/asio/detail/win_iocp_socket_service.hpp>
#else
# include <boost/asio/detail/reactive_socket_service.hpp>
#endif
而 BOOST_ASIO_HAS_IOCP则在detail/config.h文件中给出其定义,如下所示:
/Windows: IO Completion Ports.
#ifdefined(BOOST_WINDOWS) || defined(__CYGWIN__)
# if defined(_WIN32_WINNT) && (_WIN32_WINNT >= 0x0400)
# if !defined(UNDER_CE)
# if !defined(BOOST_ASIO_DISABLE_IOCP)
# define BOOST_ASIO_HAS_IOCP 1
# endif // !defined(BOOST_ASIO_DISABLE_IOCP)
# endif // !defined(UNDER_CE)
# endif// defined(_WIN32_WINNT) && (_WIN32_WINNT >= 0x0400)
#endif// defined(BOOST_WINDOWS) || defined(__CYGWIN__)
//Linux: epoll, eventfd and timerfd.
#ifdefined(__linux__)
#include <linux/version.h>
# if!defined(BOOST_ASIO_DISABLE_EPOLL)
# if LINUX_VERSION_CODE>= KERNEL_VERSION(2,5,45)
# define BOOST_ASIO_HAS_EPOLL 1
# endif // LINUX_VERSION_CODE >=KERNEL_VERSION(2,5,45)
# endif// !defined(BOOST_ASIO_DISABLE_EVENTFD)
# if!defined(BOOST_ASIO_DISABLE_EVENTFD)
# if LINUX_VERSION_CODE >=KERNEL_VERSION(2,6,22)
# define BOOST_ASIO_HAS_EVENTFD 1
# endif // LINUX_VERSION_CODE >=KERNEL_VERSION(2,6,22)
# endif// !defined(BOOST_ASIO_DISABLE_EVENTFD)
# ifdefined(BOOST_ASIO_HAS_EPOLL)
# if (__GLIBC__ > 2) || (__GLIBC__ == 2&& __GLIBC_MINOR__ >= 8)
# define BOOST_ASIO_HAS_TIMERFD 1
# endif // (__GLIBC__ > 2) || (__GLIBC__ ==2 && __GLIBC_MINOR__ >= 8)
# endif// defined(BOOST_ASIO_HAS_EPOLL)
#endif// defined(__linux__)
看到了这里,我想大家都可以清晰的了解到在许多类的定义中均用到的BOOST_ASIO_HAS_IOCP的真实含义了。
同时,在文件中对于stream_socket_service所给出的定义来看,其所继承的基类有两种不同的选择。(如蓝色代码所示)。同样,根据其所处的平台不同,其具体所使用的实现方式也有所不同。
template <typename Protocol>
class stream_socket_service
#if defined(GENERATING_DOCUMENTATION)
: publicboost::asio::io_service::service
#else
:publicboost::asio::detail::service_base<stream_socket_service<Protocol> >
#endif
{
//而其中关于不同平台上所涉及的实现如下:
// The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
typedef detail::win_iocp_socket_service<Protocol>service_impl_type; //完成具体的socket通信
#else
typedef detail::reactive_socket_service<Protocol>service_impl_type;
#endif
...
service_impl_typeservice_impl_;
};
而service_impl_其中之一是便是由win_iocp_socket_service<Protocol>模板类来给出其定义,或者由reactive_socket_service<Protocol>来给出。
那么win_iocp_socket_service<Protocol>类则是来自何方?win_iocp_socket_service派生于win_iocp_socket_service_base。相应,对于非windows平台的实现:reactive_socket_service则继承于reactive_socket_service_base类。
上述的类之间关系可用下图描述:
或者
为了更近一步的探索,我们将走进其中之一的win_iocp_socket_service_base类,做进一步的剖析。,相应的Xnix平台下分析方法类似。
l win_iocp_socket_server_base
在win_iocp_socket_service_base.xpp,, detail/socket_ops.hpp类中则实现了windows平台下的完成端口的主要功能。
在reactive_socket_service_base, detail/socket_ops.hpp类中则实现了基础的IO操作功能,同样我们在文件中可以看出:相关的实现可以在xxx.ipp文件中寻得。其中xxx是相应的类名称。win_iocp_socket_server_base类的主要结构如下:
而在类中的async_send或async_receive等函数中使用到win_iocp_socket_send_op或win_iocp_socket_recv_op类,例如我们在asyn_send函数中可以看到如下的代码:
template <typename ConstBufferSequence,typename Handler>
voidasync_send(base_implementation_type& impl,
const ConstBufferSequence& buffers,
socket_base::message_flags flags, Handler handler)
{
typedef win_iocp_socket_send_op<ConstBufferSequence,Handler>op;
typename op::ptr p = { boost::addressof(handler),
boost_asio_handler_alloc_helpers::allocate(
sizeof(op), handler), 0 };
p.p = new (p.v) op (impl.cancel_token_, buffers, handler);
buffer_sequence_adapter<boost::asio::const_buffer,
ConstBufferSequence> bufs(buffers);
start_send_op (impl, bufs.buffers(), bufs.count(), flags,
(impl.state_ & socket_ops::stream_oriented) != 0 &&bufs.all_empty(),
p.p);
p.v = p.p = 0;
}
而其中的start_send_op(
win_iocp_socket_service_base::base_implementation_type& impl,
WSABUF* buffers, std::size_t buffer_count,
socket_base::message_flags flags,bool noop, operation* op)函数的定义如下:
{
update_cancellation_thread_id(impl);
iocp_service_.work_started();
if (noop)
iocp_service_.on_completion(op);
else if (!is_open(impl))
iocp_service_.on_completion(op,boost::asio::error::bad_descriptor);
else
{
DWORD bytes_transferred = 0;
int result = ::WSASend(impl.socket_,buffers,
static_cast<DWORD>(buffer_count),&bytes_transferred, flags, op, 0);
DWORD last_error = ::WSAGetLastError();
if (last_error == ERROR_PORT_UNREACHABLE)
last_error = WSAECONNREFUSED;
if (result != 0 && last_error !=WSA_IO_PENDING)
iocp_service_.on_completion(op,last_error, bytes_transferred);
else
iocp_service_.on_pending(op);
}
}
从中我们可以看出当系统在通过WSASend完成数据的传输后(如蓝色代码所示),立刻将该完成消息通知并投递到完成消息队列中(如红色代码所示)。而在win_iocp_io_service::on_completion的定义如下,其中红色代码表明其对于完成事件在系统层的投递,以及事件进入事件队列中:
win_iocp_io_service::on_completion(win_iocp_operation*op,
DWORD last_error, DWORD bytes_transferred)
{
// Flag that the operation is ready forinvocation.
op->ready_ = 1;
// Store results in the OVERLAPPED structure.
op->Internal =reinterpret_cast<ulong_ptr_t>(
&boost::asio::error::get_system_category());
op->Offset = last_error;
op->OffsetHigh = bytes_transferred;
// Enqueue the operation on the I/Ocompletion port.
if (!::PostQueuedCompletionStatus(iocp_.handle,
0, overlapped_contains_result, op))
{
// Out of resources. Put on completed queueinstead.
mutex::scoped_lock lock(dispatch_mutex_);
completed_ops_.push(op);
::InterlockedExchange(&dispatch_required_, 1);
}
}
对于其中的完成事件消息队列的数据结构的详细描述见下文。