ASIO 之 剖 析--(3)  以Proactor模式的角度来剖析ASIO

时间:2022-06-10 05:08:47

从上面对于Proactor的分析可知,首先用户以Initiator角色发起一个读写事件。Initiator则通过stream_socket_service来进行相关IO操作。故而,顺着着这条主线,下面我们就进入stream_socket_service的内部来一探究竟。在stream_socket_service.hpp文件中我们看到如下一段语句,而该段代码也将会在以后很多的文件中看到。这段代码的主要作用就是:在完成对所运行的平台进行相关的判定后,决定其所使用的具体实现方式。若所运行环境处于windows平台下则采用iocp的方式来完成异步IO,若处于Linux/Unix/MacOS等环境下则采用select/poll/epoll的Reactor模式来实现。

 l  stream_socket_service类

 #if defined(BOOST_ASIO_HAS_IOCP)

# include <boost/asio/detail/win_iocp_socket_service.hpp>

#else

# include <boost/asio/detail/reactive_socket_service.hpp>

#endif

而 BOOST_ASIO_HAS_IOCP则在detail/config.h文件中给出其定义,如下所示:

/Windows: IO Completion Ports.

#ifdefined(BOOST_WINDOWS) || defined(__CYGWIN__)

# if defined(_WIN32_WINNT) && (_WIN32_WINNT >= 0x0400)

#  if !defined(UNDER_CE)

#   if !defined(BOOST_ASIO_DISABLE_IOCP)

#    define BOOST_ASIO_HAS_IOCP 1

#   endif // !defined(BOOST_ASIO_DISABLE_IOCP)

#  endif // !defined(UNDER_CE)

# endif// defined(_WIN32_WINNT) && (_WIN32_WINNT >= 0x0400)

#endif// defined(BOOST_WINDOWS) || defined(__CYGWIN__)

//Linux: epoll, eventfd and timerfd.

#ifdefined(__linux__)

#include <linux/version.h>

# if!defined(BOOST_ASIO_DISABLE_EPOLL)

#  if LINUX_VERSION_CODE>= KERNEL_VERSION(2,5,45)

#   define BOOST_ASIO_HAS_EPOLL 1

#  endif // LINUX_VERSION_CODE >=KERNEL_VERSION(2,5,45)

# endif// !defined(BOOST_ASIO_DISABLE_EVENTFD)

# if!defined(BOOST_ASIO_DISABLE_EVENTFD)

#  if LINUX_VERSION_CODE >=KERNEL_VERSION(2,6,22)

#   define BOOST_ASIO_HAS_EVENTFD 1

#  endif // LINUX_VERSION_CODE >=KERNEL_VERSION(2,6,22)

# endif// !defined(BOOST_ASIO_DISABLE_EVENTFD)

# ifdefined(BOOST_ASIO_HAS_EPOLL)

#  if (__GLIBC__ > 2) || (__GLIBC__ == 2&& __GLIBC_MINOR__ >= 8)

#   define BOOST_ASIO_HAS_TIMERFD 1

#  endif // (__GLIBC__ > 2) || (__GLIBC__ ==2 && __GLIBC_MINOR__ >= 8)

# endif// defined(BOOST_ASIO_HAS_EPOLL)

#endif// defined(__linux__)

看到了这里,我想大家都可以清晰的了解到在许多类的定义中均用到的BOOST_ASIO_HAS_IOCP的真实含义了。

同时,在文件中对于stream_socket_service所给出的定义来看,其所继承的基类有两种不同的选择。(如蓝色代码所示)。同样,根据其所处的平台不同,其具体所使用的实现方式也有所不同。

template <typename Protocol>

class stream_socket_service

#if defined(GENERATING_DOCUMENTATION)

  : publicboost::asio::io_service::service

#else

:publicboost::asio::detail::service_base<stream_socket_service<Protocol> >

#endif

{

//而其中关于不同平台上所涉及的实现如下: 

// The type of the platform-specific implementation.

#if defined(BOOST_ASIO_HAS_IOCP)

typedef detail::win_iocp_socket_service<Protocol>service_impl_type; //完成具体的socket通信

#else

typedef detail::reactive_socket_service<Protocol>service_impl_type;

#endif

...

service_impl_typeservice_impl_;

};

而service_impl_其中之一是便是由win_iocp_socket_service<Protocol>模板类来给出其定义,或者由reactive_socket_service<Protocol>来给出。

那么win_iocp_socket_service<Protocol>类则是来自何方?win_iocp_socket_service派生于win_iocp_socket_service_base。相应,对于非windows平台的实现:reactive_socket_service则继承于reactive_socket_service_base类。

上述的类之间关系可用下图描述:

 

或者

 

为了更近一步的探索,我们将走进其中之一的win_iocp_socket_service_base类,做进一步的剖析。,相应的Xnix平台下分析方法类似。

 

win_iocp_socket_server_base

在win_iocp_socket_service_base.xpp,, detail/socket_ops.hpp类中则实现了windows平台下的完成端口的主要功能。

在reactive_socket_service_base, detail/socket_ops.hpp类中则实现了基础的IO操作功能,同样我们在文件中可以看出:相关的实现可以在xxx.ipp文件中寻得。其中xxx是相应的类名称。win_iocp_socket_server_base类的主要结构如下:

而在类中的async_send或async_receive等函数中使用到win_iocp_socket_send_op或win_iocp_socket_recv_op类,例如我们在asyn_send函数中可以看到如下的代码:

template <typename ConstBufferSequence,typename Handler>

  voidasync_send(base_implementation_type& impl,

     const ConstBufferSequence& buffers,

     socket_base::message_flags flags, Handler handler)

{

 typedef win_iocp_socket_send_op<ConstBufferSequence,Handler>op;

   typename op::ptr p = { boost::addressof(handler),

     boost_asio_handler_alloc_helpers::allocate(

       sizeof(op), handler), 0 };

   p.p = new (p.v) op (impl.cancel_token_, buffers, handler);

   buffer_sequence_adapter<boost::asio::const_buffer,

       ConstBufferSequence> bufs(buffers);

   start_send_op (impl, bufs.buffers(), bufs.count(), flags,

       (impl.state_ & socket_ops::stream_oriented) != 0 &&bufs.all_empty(),

       p.p);

   p.v = p.p = 0;

}

而其中的start_send_op(

   win_iocp_socket_service_base::base_implementation_type& impl,

   WSABUF* buffers, std::size_t buffer_count,

socket_base::message_flags flags,bool noop, operation* op)函数的定义如下:

{

update_cancellation_thread_id(impl);

 iocp_service_.work_started();

 

 if (noop)

    iocp_service_.on_completion(op);

 else if (!is_open(impl))

    iocp_service_.on_completion(op,boost::asio::error::bad_descriptor);

 else

 {

    DWORD bytes_transferred = 0;

    int result = ::WSASend(impl.socket_,buffers,

        static_cast<DWORD>(buffer_count),&bytes_transferred, flags, op, 0);

    DWORD last_error = ::WSAGetLastError();

    if (last_error == ERROR_PORT_UNREACHABLE)

      last_error = WSAECONNREFUSED;

    if (result != 0 && last_error !=WSA_IO_PENDING)

      iocp_service_.on_completion(op,last_error, bytes_transferred);

    else

      iocp_service_.on_pending(op);

 }

}

从中我们可以看出当系统在通过WSASend完成数据的传输后(如蓝色代码所示),立刻将该完成消息通知并投递到完成消息队列中(如红色代码所示)。而在win_iocp_io_service::on_completion的定义如下,其中红色代码表明其对于完成事件在系统层的投递,以及事件进入事件队列中:

 

win_iocp_io_service::on_completion(win_iocp_operation*op,

    DWORD last_error, DWORD bytes_transferred)

{

  // Flag that the operation is ready forinvocation.

  op->ready_ = 1;

 

  // Store results in the OVERLAPPED structure.

  op->Internal =reinterpret_cast<ulong_ptr_t>(

     &boost::asio::error::get_system_category());

  op->Offset = last_error;

  op->OffsetHigh = bytes_transferred;

 

  // Enqueue the operation on the I/Ocompletion port.

  if (!::PostQueuedCompletionStatus(iocp_.handle,

        0, overlapped_contains_result, op))

  {

    // Out of resources. Put on completed queueinstead.

    mutex::scoped_lock lock(dispatch_mutex_);

    completed_ops_.push(op);

   ::InterlockedExchange(&dispatch_required_, 1);

  }

}

对于其中的完成事件消息队列的数据结构的详细描述见下文。