Android异步消息框架

时间:2022-08-27 16:15:38
自从rtsp从stagefright播放器移植到NuPlayer之后,你会发现相关的类中存在许多类似下面的代码: ================================11111111111111==========================
NuPlayerDriver::NuPlayerDriver()     : mLooper(new ALooper) {    mLooper->setName("NuPlayerDriver Looper");    mLooper->start(            false,            true,              PRIORITY_AUDIO);    mPlayer = new NuPlayer;    mLooper->registerHandler(mPlayer);}====================222222222222222=============================    sp msg = new AMessage(kWhatPerformSeek, mReflector->id());    msg->setInt32("generation", ++mSeekGeneration);    msg->setInt64("timeUs", seekTimeUs);    msg->post(200000ll);=====================333333333333333============================    void NuPlayer::RTSPSource::onMessageReceived(const sp &msg) {    }=================================================================这就是谷歌在Android native层实现的一个异步消息机制,在这个机制中几乎不存在同步锁,所有的处理都是异步的,将变量封装到一个消息AMessage结构体中,然后放到队列中去,后台专门有一个线程会从这个队列中取出消息然后执行,执行函数就是onMessageReceived,这个函数中会有很多分支,用于处理不同的消息;在很多类中都会有各种消息post出来,而后台的异步消息处理线程又是怎么知道发送给哪个类的onMessageReceived函数处理呢,要搞懂这个问题,就需要把谷歌实现的这个异步消息处理框架搞明白,下面就来分析这个框架。
一:消息异步消息处理框架,消息是载体,所以我们先了解消息这个类AMessage类struct AMessage : public RefBase {    构造函数,包括两个参数,第一个参数指明这是个什么消息,用于在onMessageReceived处理分支中进行匹配,第二个参数target用于后台线程在处理这个消息的时候知道发给哪个类处理,后面看构造这个消息的时候是这两个参数是怎么传进来的    AMessage(uint32_t what = 0, ALooper::handler_id target = 0);    void setWhat(uint32_t what);    uint32_t what() const;    void setTarget(ALooper::handler_id target);    ALooper::handler_id target() const;
    void clear();    这个消息类中定义了一堆set和find方法,用于在在传递消息过程中携带各种信息    void setObject(const char *name, const sp &obj);    void setBuffer(const char *name, const sp &buffer);    void setMessage(const char *name, const sp &obj);    bool findBuffer(const char *name, sp *buffer) const;    bool findMessage(const char *name, sp *obj) const;    void post(int64_t delayUs = 0);protected:    virtual ~AMessage();  析构函数private:    uint32_t mWhat;    ALooper::handler_id mTarget;    两个重要的私有成员变量};
正如开头部分看的那样,构造一个消息的过程如下:
void NuPlayer::start() {    (new AMessage(kWhatStart, id()))->post();}void AMessage::post(int64_t delayUs) {    gLooperRoster.postMessage(this, delayUs);}post之后发生的事情由gLooperRoster负责,我们暂且不管。

二:框架搭建现在来看处理异步消息框架的搭建,这里在copy一段搭建框架的几行代码  1      sp《ALooper》mNetLooper;  2      mNetLooper(new ALooper),  3      mNetLooper->setName("rtsp net");  4      mNetLooper->start(false ,  5      (1 ? mNetLooper : looper())->registerHandler(mRTPConn);
ALooper类struct ALooper : public RefBase {    typedef int32_t event_id;    typedef int32_t handler_id;    定义的两个整形变量    ALooper();    // Takes effect in a subsequent call to start().    void setName(const char *name);    handler_id registerHandler(const sp &handler);    void unregisterHandler(handler_id handlerID);    status_t start(            bool runOnCallingThread = false,            bool canCallJava = false,            int32_t priority = PRIORITY_DEFAULT            );private:    friend struct ALooperRoster; 这个类比较重要,后面消息post的流程和其相关    事件的结构体封装    struct Event {        int64_t mWhenUs;        sp mMessage;    };   非常重要,后台存放事件的链表    List《Event》 mEventQueue;    struct LooperThread;    sp mThread;    后台处理线程    void post(const sp &msg, int64_t delayUs);    bool loop();};
搭建框架1ALooper::ALooper()    : mRunningLocally(false) {}搭建框架2void ALooper::setName(const char *name) {    mName = name;  //AString mName;}搭建框架3status_t ALooper::start(        bool runOnCallingThread, bool canCallJava, int32_t priority) {    开启真正的后台处理线程    mThread = new LooperThread(this, canCallJava);    status_t err = mThread->run(            mName.empty() ? "ALooper" : mName.c_str(), priority);给这个线程设置一个名字    return err;} run之后就会执行其线程函数threadLooper(),后面有分析。
搭建框架4-1
ALooper::handler_id ALooper::registerHandler(const sp括号AHandler括号 &handler) {    return gLooperRoster.registerHandler(this, handler);}从这个函数的参数来看,能作为handler进行注册的类都必须是继承自AHandler这个类,注册的过程也是交给gLooperRoster处理。
AHandler类分析----消息处理类的父类因为后面要用到AHandler里面的一个非常重要的函数id(),所以我们先分析一下AHandler这个公共的父类:struct AMessage;struct AHandler : public RefBase {    AHandler()        : mID(0) {   mID的初始值为0    }    ALooper::handler_id id() const {        return mID;   id()这个函数用于返回内部变量mID的值,其初始值为0,但是会通过setID函数设置    }    sp looper();protected:    virtual void onMessageReceived(const sp &msg) = 0;private:    friend struct ALooperRoster;    ALooper::handler_id mID;    下面这个函数正式在其友元类ALooperRoster的registerHandler中调用的    void setID(ALooper::handler_id id) {        mID = id;    }};
ALooperRoster类分析post消息和注册handler都要用到gLooperRoster这么一个对象,那下面我就要分析对应的ALooperRoster类struct ALooperRoster {    ALooperRoster();    ALooper::handler_id registerHandler(            const sp括号ALooper括号 looper, const sp括号AHandler括号 &handler);    void unregisterHandler(ALooper::handler_id handlerID);    status_t postMessage(const sp &msg, int64_t delayUs = 0);    void deliverMessage(const sp &msg);    status_t postAndAwaitResponse(            const sp &msg, sp *response);    void postReply(uint32_t replyID, const sp &reply);    sp findLooper(ALooper::handler_id handlerID);private:    struct HandlerInfo {        wp mLooper;        wp mHandler;    };    Mutex mLock;    KeyedVector mHandlers;    ALooper::handler_id mNextHandlerID;    uint32_t mNextReplyID;    Condition mRepliesCondition;    KeyedVector > mReplies;    status_t postMessage_l(const sp &msg, int64_t delayUs);};然后来看一些关键函数的实现搭建框架4-2ALooper::handler_id ALooperRoster::registerHandler(        const sp looper, const sp &handler) {    Mutex::Autolock autoLock(mLock);    还记得上面我们构造一个AMessage的时候有一个参数就是调用id()这个函数返回的一个ALooper::handler_id这个无符号整形变量,id()是AHandler这个类的一个函数,上面已经有分析,其返回的是各个Handler的mID的值,那么这个值是在哪里设置的呢?请继续往下看。    if (handler->id() != 0) {        CHECK(!"A handler must only be registered once.");        return INVALID_OPERATION;    }    HandlerInfo info;    info.mLooper = looper;    info.mHandler = handler;    ALooper::handler_id handlerID = mNextHandlerID++;    mHandlers.add(handlerID, info);    handler->setID(handlerID);    这段代码非常重要,针对每个handler调用registerHandler的时候都会设置一个独一无二的handler_id,最后发送消息进行处理的时候就是通过这个独一无二的handler_id这个变量找到处理handler类的。    return handlerID;    在这个函数中针对这个looper和handler会构造一个HandlerInfo结构体,然后放到pair容器中。这里说一下,一个looper可以有多个handler,但是一一个handler只能跟一个looper。}***********************************************************************好了框架搭建好了,下面就是消息的传递处理过程了,这个留着下回更新。明天又是周一,早睡早起,好好工作,好好研究数据结构。===========================================================================数据结构看到非常经典的几章,本想在博客上好好整理一下学习成果,但是鉴于每一篇博客都不能有始无终,所以还是先把这篇博客补充完整吧*************************************************************************
四:消息处理第一部分,我们看到post之后调用了ALooperRoster的postMessage函数status_t ALooperRoster::postMessage(        const sp<AMessage> &msg, int64_t delayUs) {    Mutex::Autolock autoLock(mLock);    return postMessage_l(msg, delayUs);}status_t ALooperRoster::postMessage_l(        const sp<AMessage> &msg, int64_t delayUs) {    msg->target() 返回的是id, 在上面registerHandler过程中,每个id和其对应的info组成一个pair放到了mHandlers这个容器中,现在通过这个id在找回这个pair    ssize_t index = mHandlers.indexOfKey(msg->target());      if (index < 0) {        ALOGW("failed to post message. Target handler not registered.");        return -ENOENT;    }    找到这个pair的位置index,然后取出对应的HandlerInfo,有了HandlerInfo,也就能知道对应的ALooper了    const HandlerInfo &info = mHandlers.valueAt(index);    sp<ALooper> looper = info.mLooper.promote();    looper->post(msg, delayUs);    return OK;}void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {    Mutex::Autolock autoLock(mLock);    int64_t whenUs;    if (delayUs > 0) {        whenUs = GetNowUs() + delayUs;    } else {        whenUs = GetNowUs();    }    List<Event>::iterator it = mEventQueue.begin();    while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {        ++it;    }    上面这一段代码,主要是遍历消息队列中的消息的时间,然后和我们的消息做对比,最终目的就是所有的消息必须按照时间先后顺序放在队列中等待执行。    构造一个Event消息,将我们的msg设置进去    Event event;    event.mWhenUs = whenUs;    event.mMessage = msg;    将我们的消息放入到消息队列中,等待执行    mEventQueue.insert(it, event);}
五:后台线程执行消息还记得我们在ALooper的start函数中启动了一个后台线程吗,下面我们来分析它status_t ALooper::start(        bool runOnCallingThread, bool canCallJava, int32_t priority) {    ........    mThread = new LooperThread(this, canCallJava);    status_t err = mThread->run(            mName.empty() ? "ALooper" : mName.c_str(), priority);    if (err != OK) {        mThread.clear();    }    return err;}LooperThread线程启动后,就会循环执行线程函数threadLoop,我们看其实现:    virtual bool threadLoop() {        return mLooper->loop();    }调用start时传进来的ALooper的looper方法,    bool ALooper::loop() {    Event event;    {        Mutex::Autolock autoLock(mLock);        ........        取出队头消息,取出后然后把它从队列中删掉,然后调用gLooperRoster的deliverMessage进行发送出去执行        event = *mEventQueue.begin();        mEventQueue.erase(mEventQueue.begin());    }    gLooperRoster.deliverMessage(event.mMessage);    这个函数如果返回true,则线程会不断执行又会进入到这个函数中来,如果返回false,线程就会停止了    return true;}void ALooperRoster::deliverMessage(const sp<AMessage> &msg) {    sp<AHandler> handler;    {        Mutex::Autolock autoLock(mLock);        通过这个消息的id找到保存在mHandlers中的HandlerInfo,然后从HandlerInfo中取出对应的handler,然后调用这个handler这个的onMessageReceived方法进行处理消息。        ssize_t index = mHandlers.indexOfKey(msg->target());        const HandlerInfo &info = mHandlers.valueAt(index);        handler = info.mHandler.promote();    }    handler->onMessageReceived(msg);}如在NuPlayer.cpp中void NuPlayer::onMessageReceived(const sp<AMessage> &msg) {    switch (msg->what()) {        case kWhatSetDataSource:        case kWhatVideoNotify:        case kWhatAudioNotify:        case kWhatRendererNotify:        case kWhatMoreDataQueued:        case kWhatReset:        case kWhatSeek:        case kWhatPause:        case kWhatResume:        default:    }}各个case中的条件就对应msg中的what了,根据不同的what,走不同的case处理。
至此,整个消息的处理过程完毕,总结如下:1:首先需要构造一个AMessage,必须携带两个参数:what(什么消息)和id(谁处理);2:ALooper中有一个后台线程,LooperThread,该线程维护着一个消息队列List<Event> mEventQueue,线程函数不断从这个队列中取出消息执行;3:消息的发送和取出都是调用辅助类ALooperRoster,这个类设置为各个类的友元类,可以方位ALooper,AMessage,AHandler等的受保护的函数。