[Boost基础]并发编程——asio网络库——定时器deadline_timer

时间:2023-03-08 15:32:38

asio库基于操作系统提供的异步机制,采用前摄器设计模式(Proactor)实现了可移植的异步(或者同步)IO操作,而且并不要求使用多线程和锁定,有些的避免了多线程编程带来的诸多有害副作用(如条件竞争、死锁等)。
目前asio主要关注网络通信方面,使用大量的类和函数封装了socket API,提供了一个现代C++风格的网络编程接口,支持TCP,ICMP,UDP等网络通信协议。但asio的异步操作并不局限于网络编程,他还支持串口读写,定时器,SSL等功能,而且asio是一个很好的富有弹性的框架,可以扩展到其他有异步操作需要的领域。

asio概述

asio库基于前摄模式(Proactor)封装了操作系统的select,poll/epoll,kqueue,overlapped I/O等机制,实现了异步IO模型。他的核心类是io_service,相当于前摄模式中的Proactor角色,asio的任何操作都需要有io_service的参与。
在同步模式下,程序发起一个IO操作,向io_service提交请求,io_service把操作转给操作系统,同步的等待。当IO操作完成时,操作系统通知io_service,然后io_service在把结果发回给程序,完成整个同步流程。这个处理流程与多线程join()等待方式很相似。
在异步模式下,程序除了要发起的IO操作,还要定义一个用于回调的完成处理函数。io_service同样把IO操作转交给操作系统执行,但它不同步等待,而是立即返回。调用io_service的run()成员函数可以等待异步操作完成,当异步操作完成时io_service从操作系统获取执行结果,调用完成处理函数。
asio不直接使用操作系统提供的线程,而且定义了一个自己的线程概念:strand,它保证在多线程的环境中代码可以正确的执行,而无需使用互斥量。io_serviec::strand::wrap()函数可以包装在一个函数在strand中执行。
IO操作会经常使用到缓冲区,asio库专门用两个类mutable_buffer和const_buffer来封装这个概念,他们可以被安全的应用在异步的读写操作中,使用*函数buffer()能够包装常用的C++容器类型,如数组、arrary、vector、string等,用read(),write()函数来读取缓冲区。
asio是一个相当复杂的库,本书只涉及其中的基本概念,还有许多异步IO相关的概念,在此不一一介绍。

定时器

定时器是asio库里最简单的一个IO模型示范,提供等待时间终止的功能,通过它我们可以快速熟悉asio的基本使用方法。

#include <iostream>
#include <conio.h>
using namespace std;
#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
using namespace boost;
using namespace boost::asio;
//同步定时器
void test1()
{
io_service ios;//所有的asio程序必须要有一个io_service对象
deadline_timer t(ios,boost::posix_time::seconds(2));//两秒钟后定时器终止
cout<<t.expires_at()<<endl;//查看终止的决定时间
t.wait();//调用wait()同步等待
cout<<"hello asio"<<endl;
//可以把它与thread库的sleep()函数对比研究一下,两种虽然都是等待,但内部机制完全不同:thread库的sleep()使用了互斥量和条件变量,在线程中等待,而asio则是调用了操作系统的异步机制,如select,epool等完成。
//同步定时器的用法很简单,但它演示了asio程序的基本结构和流程:一个asio程序首先要定义一个io_service对象,它是前摄器模式中最重的proactor角色,然后我们声明一个IO操作(这里是定时器),并把它挂接在io_service上,再然后就可以执行后续的 同步或异步操作。
}
//异步定时器:代码大致与同步定时器相同,增加了回调函数,并使用io_service_run()和定时器的async_wait()
//首先我们要定义回调函数,asio库要求回调函数只能有一个参数,而且这个参数必须是const asio::error_code&类型 void myprint(boost::system::error_code ec)
{
cout<<"hello asio"<<endl;
}
void test2()
{
io_service ios;
deadline_timer t(ios,boost::posix_time::seconds(3));
t.async_wait(myprint);//异步等待,传入回调函数,立即返回
cout<<"it show before t expired."<<endl;
ios.run();//很重要,异步IO必须
cout<<"runned"<<endl;//将与hello asio一起输出,说明run()是阻塞函数
//异步等待async_wait(),它通知io_service异步的执行IO操作,并且注册了回调哈思楠,用于在IO操作完成时有事件多路分离器分派返回值(error_code)调用。
//最后我们必须调用io_service的run()成员函数,它启动前摄器的事件处理循环,阻塞等待所有的操作完成并分派事件。如果不调用run()那么虽然被异步执行了,但没有一个等待他完成的机制,回调函数将得不到执行的机会。
}
//下面我们实现一个可以定时执行任意函数的定时器 a_timer(asyc timer),它持有一个asio定时器对象和一个计数器,还有一个function对象用来保存回调函数
class a_timer
{
private:
int count,cout_max; //计数器成员变量
function<void()> f; //function对象,持有无参数无返回的可调用物
deadline_timer t;//asio定时器
public:
//构造函数初始化成员变量,将计数器清理,设置计数器的上限,拷贝存储回调函数,并立即启动定时器
//之所以要"立即"启动,是因为我们必须包装在io_service.run()之前至少有一个异步操作在执行,否则io_service.run()会因为没有事件处理而立即不等待返回。
template<typename F>a_timer(io_service& ios,int x,F func):f(func),cout_max(x),count(0),t(ios,posix_time::microsec(500))//模板类型,可接受任意可调用物
{
t.async_wait(bind(&a_timer::call_func,this,placeholders::error));//命名空间下asio::placeholders的一个占位符error,他的作用类似于bind库的占位符_1,_2,用于传递errror_code值。
} //call_func()内部累加计数器,如果计数器未达到上限则调用function对象f,然后重新设置定时器的终止时间,再次异步等待被调用,从而达到反复执行的目的。
void call_func(const boost::system::error_code &)
{
if(count >= cout_max)
{
return;
}
++count;
f();
t.expires_at(t.expires_at()+posix_time::millisec(500));//设置定时器的终止时间为0.5秒之后
t.async_wait(bind(&a_timer::call_func,this,placeholders::error));
}
};
void print1(int index)
{
cout<<"hello asio:"<<index<<endl;
}
void print2()
{
cout<<"hello boost"<<endl;
}
void test3()
{
io_service ios;
a_timer a(ios,10,bind(print1,2));//启用第一个定时器
a_timer at(ios,5,print2);//启用第二个定时器
ios.run();//ios等待异步调用结束
}
//查询网络地址
/*之前关于TCP通信的所有论述我们都是使用直接的IP地址,但在实际生活中大多数时候我们不可能知道socket连接另一端的地址,而只有一个域名,这时候我们就需要使用resolver类来通过域名获得可用的IP,它可用实现与IP版本无关的网址解析。
resolver使用内部类query和iterator共同完成查询IP地址的工作:首先使用网址和服务器名创建query对象,然后由resolver()函数生产iterator对象,它代表了查询到的ip端点。之后就可以使用socket对象尝试连接,直到找到一个可用的为止。
*/
#include <boost/lexical_cast.hpp>
void resolv_connect(ip::tcp::socket &sock,const char* website ,int port)//使用此函数来封装resolver的调用过程
{
ip::tcp::resolver rlv(sock.get_io_service());//resolver对象
ip::tcp::resolver::query qry(website,lexical_cast<string>(port));//创建一个query对象;query只接受字符串参数 ip::tcp::resolver::iterator iter=rlv.resolve(qry);
ip::tcp::resolver::iterator end;//逾尾迭代器
system::error_code ec=error::host_not_found;
for (; ec && iter != end;++iter)
{
sock.close();
sock.connect(*iter,ec);//尝试连接端点
}
if (ec)//只有当ec为0时,才会连接成功
{
cout<<"can't connect."<<endl;
throw system::system_error(ec);
}
cout<<"connect success."<<endl;
}
void test4()
{
try
{
io_service ios;
ip::tcp::socket sock(ios);
resolv_connect(sock,"www.boost.org",80);
//其他操作
ios.run();
}
catch (std::exception& e)
{
cout<<e.what()<<endl;
}
}
void test(char t)
{
std::cout<<"press key====="<<t<<std::endl;
switch (t)
{
case '1':test1();break;
case '2':test2();break;
case '3':test3();break;
case '4':test4();break;
case 27:
case 'q':exit(0);break;
default: std::cout<<"default "<<t<<std::endl;break;
}
}
void main()
{
while(1)
test(getch());
}