在多线程环境下开始接口是这样实现的。在打开流通道前已经设置好了连接参数,以及将要在打开流通道后监视的设备列表,这是第一步。第二步是创建三个线程,pangu的三个静态函数分别是这三个线程的入口函数。然后再调用开始接口。开始接口只是触发启动事件对象,这个事件对象用来唤醒三个线程中其中一个线程。这个被唤醒的线程将执行打开流通道,以及监视大量设备。
首先是调用打开流通道接口。这是个同步调用,即阻塞式。然后是启动另三个线程协同完成监视大量设备的任务。当这些工作都完成后,再通知其他线程可以进入工作状态。
启动过程也分为几个步骤:打开流通道、查询所有待监视设备的类型和监视设备。改采用什么技术实现三个步骤之间的转换。首先想到的是uv_timer_t定时器。当需转到下一个步骤时,启动下一步的定时器。还有另一种方案是,启动过程作为一个整体由状态决定执行具体步骤。有优劣之分吗?感觉上由定时器触发更符合libuv,转承更顺畅。每个函数也更小巧和单一,易于维护。
这段是采用定时器实现后具体的文件规划。每个步骤的代码将完全放置一个单独的cpp文件中。与cpp文件相对应的hpp头文件将对外只公布一个函数。如此实现的好处就是修改其中一个步骤的具体实现不会影响项目中其他cpp文件的编译,这样会加快编译速度。假定三个步骤分别对应这三个cpp文件:openstream.cpp、qrydevinfo.cpp和monitordevices.cpp。向外对应的函数分别是void openstream()、void startup_query_devices()和void startup_monitor_devices()。三个函数都没有输入参数。
如果采用由状态决定步骤执行,可以想见的是,还必须再创建一个包含这三个cpp文件的总过程文件。采用定时器实现方案就可以减少一个cpp文件。
打开流通道
使用libuv时必须记住阻塞式函数必须放置在工作线程内完成。在这一步,acsOpenStream函数就是个阻塞函数,所以必须将它交给uv在后台执行。uv_queue_work可以完成这个工作。我们将创建一个操作对象,称它为Baton。使用libuv进行开发的人都喜欢使用这个名称。这个Baton也将被传入uv_queue_work函数。但必须再提供给它两个函数。一个用于在后台线程中执行acsOpenStream函数用,由uv负责调用。一个用于在后台线程执行完毕后在主线程内被调用,也由uv负责调用。这两个函数被调用时都可以取得那个Baton对象。在主线程内被调用的回调函数的主要作用是检查打开流通道成功事件是否已收到。如果还未收到,再次调用uv_queue_work。依据那个Baton对象的成员属性可以知道是否是在等待事件还是还未被执行。如果已经被执行过了,在后台线程内不会再调用acsOpenStream,而是休眠100毫秒。Baton对象内还有一个计数用的成员属性,用来判断是否已超过等待时长的上限。如果收到了打开流通道成功事件,则启动下一步的定时器。
#include <log4cxx/logger.h>头文件内容非常简单。
#include <uv.h>
#include "openstream.hpp"
#include "../baton.hpp"
#include "qrydevinfo.hpp"
using namespace log4cxx;
extern LoggerPtrg_zhougongLog;
extern uv_loop_t *loop;
extern MyNvWapangu;
boolg_open_stream_flag = false;
struct OpenStreamBaton : public Baton
{
boolwaitConfirmationFlag_;
size_tcount_;
OpenStreamBaton(MyNvWa * p_pg) :Baton(p_pg), count_(0), waitConfirmationFlag_(false){}
};
void open_stream(uv_work_t *req)
{
OpenStreamBaton *baton = (OpenStreamBaton*) req->data;
if ( baton->waitConfirmationFlag_ == false )
{
baton->pg_->OpenStream("AVAYA#S8300#CSTA#AES1", "ctiuser", "Ctiuser01!");
baton->waitConfirmationFlag_ = true;
}
else
{
++(baton->count_);
Sleep(100);
}
}
void open_stream_cb(uv_work_t *req, int status)
{
OpenStreamBaton *baton = (OpenStreamBaton*) req->data;
if ( baton->pg_->GetEventPoll() == true )
{
CSTAEvent_t * pCstaEvent = (CSTAEvent_t *)baton->pg_->m_szPBXEvent;
if ( pCstaEvent->eventHeader.eventClass == ACSCONFIRMATION &&
pCstaEvent->eventHeader.eventType == ACS_OPEN_STREAM_CONF )
{
g_open_stream_flag = true;
LOG4CXX_INFO(g_zhougongLog, "open stream confirmation received.\r\n");
startup_query_devices();
}
else
{
std::stringstreamsstream;
sstream << "it is not a open stream confirmation " << pCstaEvent->eventHeader.eventClass << " " << pCstaEvent->eventHeader.eventType << "\r\n";
LOG4CXX_INFO(g_zhougongLog, sstream.str());
}
free(baton);
}
else
{
if ( baton->count_ < 20 )
{
LOG4CXX_INFO(g_zhougongLog, "wait for confirmation event.\r\n");
uv_queue_work(loop, &baton->work, open_stream, open_stream_cb);
}
else
{
free(baton);
LOG4CXX_INFO(g_zhougongLog, "can not receive open stream confirmation.\r\n");
}
}
}
void open_stream()
{
OpenStreamBaton * openStreamBaton = new OpenStreamBaton(&pangu);
openStreamBaton->work.data = (void *)openStreamBaton;
uv_queue_work(loop, &openStreamBaton->work, open_stream, open_stream_cb);
}
#include "../baton.hpp"
void open_stream();