libjingle源码分析之一:Thread和SocketServer

时间:2021-07-25 00:01:12


分类:libjingle2012-10-25 20:10249人阅读评论(0)收藏举报

·  摘要

      本文主要分析了libjingle源码中的ThreadSocketServer模块,以及它们是如何协同工作的。首先,介绍了ThreadSocketServer的模型,给出了如何使用Thread的示例。然后,分析了Thread中的默认消息循环的处理流程和如何自己处理消息。

·  概述

      libjingle源码中,ThreadSocketServer模块的原理如下图所示。整个模型实际上是一个消息模型,Thread主要负责处理消息,MessageQue表示的是当前的消息队列,MessageHandler由用户用来定义处理消息的动作。而ThreadManager为单实例,可以获取当前的Thread,这样用户可以往当前的Thread中投递消息。SocketServer代表的是用来侦听Socket的服务,它是一个独立的模块。

      消息的处理流程主要由Thread负责。上图中有两条处理流程,分别用两根带箭头的线表示。左边箭头的处理流程为:当消息队列中没有消息时,Thread将控制权转交给SocketServer,直到有消息时会通知SocketServer返回到Thread。也就是说Thread优先的是处理消息,在空闲时,会让SocketServer侦听socket。右边的箭头是正常的消息处理流程,获取消息并处理用户定义的对应的OnMessage函数。
      SocketServer
模块只是用在libjingle内部,用户并不需要直接使用它。P2P中使用了PhysicalSocketServer作为SocketServer,它的原理如下图所示:

     PhysicalSocketServer主要是侦听基于本地网卡的socketlibjingle中还有一些伪socket),然后分发socket事件到Dispatcher中。DispatcherPhysicalSocketServer的分发体,功能有点类似于MessageHandlerDispatcher中定义了感兴趣的socket事件和对应的处理。

类的关系

      本文提到的一些类的关系如下图所示。Thread类继承自MessageQue,可以通过Thread类来操作队列消息。PhysicalSocketServer除了实现SocketServer接口之外,还可以添加和删除Dispatcher。熟悉了这些类,基本上就了解Thread模块和SocketServer模块的工作原理。

·  使用

      Thread的使用示例参见下面的代码。获取当前线程是通过ThreadCurrent函数,它会转调ThreadManager对象的CurrentThread函数。由于Thread继承自MessageQue,可以直接通过Thread对象来投递消息,Post函数的第一个参数是OnMessage所处的对象,会被保存于Message对象中。处理消息只要重载MessageHandlerOnMessage函数即可。main函数则调用ThreadRun函数进入默认的消息处理循环,默认的消息处理循环在本示例中就是:循环取消息,调用MessageHandlerOnMessage函数。

[cpp]view plaincopyprint?

1.   #include <string>  

2.   #include <iostream>  

3.   #include "talk/base/thread.h"  

4.     

5.   class HelpData : public talk_base::MessageData  

6.   {  

7.   public:  

8.     std::string info_;  

9.   };  

10.   

11. class Police : public talk_base::MessageHandler  

12. {  

13. public:  

14.   enum {  

15.     MSG_HELP,  

16.   };  

17.   

18.   void Help(const std::string& info) {  

19.     HelpData* data = new HelpData;  

20.     data->info_ = info;  

21.     talk_base::Thread::Current()->Post(this, MSG_HELP, data);  

22.   }  

23.   

24.   virtual void OnMessage(talk_base::Message* msg) {  

25.     switch (msg->message_id) {  

26.     case MSG_HELP:  

27.       HelpData* data = (HelpData*)msg->pdata;  

28.       std::cout << "MSG_HELP : " << data->info_ << std::endl;  

29.       break;  

30.     }  

31.   }  

32. };  

33.   

34. int main(int argc, char** argv)  

35. {  

36.   Police p;  

37.   p.Help("Please help me!");  

38.   talk_base::Thread::Current()->Run();  

39.   return 0;  

40. }  

#include <string>

#include <iostream>

#include "talk/base/thread.h"

 

class HelpData : public talk_base::MessageData

{

public:

  std::string info_;

};

 

class Police : public talk_base::MessageHandler

{

public:

  enum {

    MSG_HELP,

  };

 

  void Help(const std::string&info) {

    HelpData* data = new HelpData;

    data->info_ = info;

   talk_base::Thread::Current()->Post(this, MSG_HELP, data);

  }

 

  virtual voidOnMessage(talk_base::Message* msg) {

    switch (msg->message_id) {

    case MSG_HELP:

      HelpData* data =(HelpData*)msg->pdata;

      std::cout <<"MSG_HELP : " << data->info_ << std::endl;

      break;

    }

  }

};

 

int main(int argc, char** argv)

{

  Police p;

  p.Help("Please helpme!");

  talk_base::Thread::Current()->Run();

  return 0;

}

·  处理消息

        Thread的默认消息处理流程可用下图表示。默认消息处理函数的入口为Thread::Run(),另一个内嵌循环是SocketServer::Wait()。箭头指向数据成员则表示,处理相关数据。

        要执行默认消息处理循环,使用下列语句即可:

[cpp]view plaincopyprint?

1.   talk_base::Thread::Current()->Run();  

 talk_base::Thread::Current()->Run();


       
当然,你也可以自己处理消息,可以参见pcp例子中的代码,这段代码用在登录阶段,等待登录操作完成。

[cpp]view plaincopyprint?

1.   // Wait until login succeeds.  

2.   std::vector<uint32> ids;  

3.   ids.push_back(MSG_LOGIN_COMPLETE);  

4.   ids.push_back(MSG_LOGIN_FAILED);  

5.   if (MSG_LOGIN_FAILED == Loop(ids))  

6.     FatalError("Failed to connect");  

  // Wait until login succeeds.

  std::vector<uint32> ids;

  ids.push_back(MSG_LOGIN_COMPLETE);

  ids.push_back(MSG_LOGIN_FAILED);

  if (MSG_LOGIN_FAILED == Loop(ids))

    FatalError("Failed toconnect");


       
首先设置关系的消息id集合,然后进入自定义的消息循环。

[cpp]view plaincopyprint?

1.   // Runs the current thread until a message with the given ID is seen.  

2.   uint32 Loop(const std::vector<uint32>& ids) {  

3.     talk_base::Message msg;  

4.     while (talk_base::Thread::Current()->Get(&msg)) {  

5.       if (msg.phandler == NULL) {  

6.         if (std::find(ids.begin(), ids.end(), msg.message_id) != ids.end())  

7.           return msg.message_id;  

8.         std::cout << "orphaned message: " << msg.message_id;  

9.         continue;  

10.     }  

11.     talk_base::Thread::Current()->Dispatch(&msg);  

12.   }  

13.   return 0;  

14. }  

// Runs the current thread until a message with the given ID is seen.

uint32 Loop(const std::vector<uint32>& ids) {

  talk_base::Message msg;

  while(talk_base::Thread::Current()->Get(&msg)) {

    if (msg.phandler == NULL) {

      if (std::find(ids.begin(),ids.end(), msg.message_id) != ids.end())

        return msg.message_id;

      std::cout <<"orphaned message: " << msg.message_id;

      continue;

    }

   talk_base::Thread::Current()->Dispatch(&msg);

  }

  return 0;

}


       
每次循环中获取消息,然后判断消息的id是否后符合要求。符合要求就返回(表示登录或登录失败),否则就派发消息(就是执行OnMessage函数,和Thread的默认消息处理一样)。

·         未完待续......