#创作者激励#消息通知之消息订阅相关流程

时间:2021-08-12 00:59:04

【本文正在参加2023年第一期优质创作者激励计划】

前言

之前我们介绍了在Openharmony消息通知机制中的系统层事件发布流程,了解事件发布的来龙去脉,但消息订阅和事件发布在Openharmony中是息息相关的,在实际的消息使用过程中,我们只有发布消息事件后,才能订阅相关消息,消息订阅也是消息通知中重要不可缺少的一环,因此通过消息订阅,我们才能逐步地展露Openharmony消息通知机制神秘面纱。这篇文章我们来介绍消息订阅在Openharmony 3.1Release中整体流程,深入的分析消息订阅实现原理,消息订阅的函数调用关系,使开发者能够从整体了解消息订阅流程,便于快速适配消息和处理消息通知相关问题。

整体流程

#创作者激励#消息通知之消息订阅相关流程

代码流程

1、在消息使用的过程中,首先初始化订阅消息相关参数,添加具体事件如:

COMMON_EVENT_PACKAGE_REMOVED,然后进一步处理调用函数CommonEventManager ::SubscribeCommonEvent。

matchingSkills.AddEvent(EventFwk::CommonEventSupport::COMMON_EVENT_PACKAGE_REMOVED);
OHOS::EventFwk::CommonEventSubscribeInfo subscriberInfo(matchingSkills);
systemEventSubscriber_ = std::make_shared<SystemEventSubscriber>(subscriberInfo);
if (systemEventSubscriber_ == nullptr) {
    HKS_LOG_E("huks system subscriber nullptr");
    return false;
}

return OHOS::EventFwk::CommonEventManager::SubscribeCommonEvent(systemEventSubscriber_);


2、CommonEventManager::SubscribeCommonEvent的实现,关联到CommonEvent类的订阅函数SubscribeCommonEvent上。

{
    return DelayedSingleton<CommonEvent>::GetInstance()->SubscribeCommonEvent(subscriber);
}  

3、CommonEvent::SubscribeCommonEvent的具体实现,订阅初始化调用CommonEventProxy的实现类

{
    …..
    sptr<IRemoteObject> commonEventListener = nullptr;
    uint8_t subscribeState = CreateCommonEventListener(subscriber, commonEventListener);
    if (subscribeState == INITIAL_SUBSCRIPTION) {
        EVENT_LOGD("before SubscribeCommonEvent proxy valid state is %{public}d", isProxyValid_);
        return commonEventProxy_->SubscribeCommonEvent(subscriber->GetSubscribeInfo(), commonEventListener);
    } else if (subscribeState == ALREADY_SUBSCRIBED) {
        return true;
    } else {
        return false;
    }
}

4、订阅事件通过代理端向调用SendRequest函数发送CES_SUBSCRIBE_COMMON_EVENT消息到服务端。

    const CommonEventSubscribeInfo &subscribeInfo, const sptr<IRemoteObject> &commonEventListener)
{
    EVENT_LOGD("start");

    MessageParcel data;
    MessageParcel reply;
    ……
    bool ret = SendRequest(ICommonEvent::Message::CES_SUBSCRIBE_COMMON_EVENT, data, reply);
    if (ret) {
        ret = reply.ReadBool();
    }

    EVENT_LOGD("end");
    return ret;
}

5、服务端的stub函数接收消息,对不同的消息类型进行处理,当遇到CES_SUBSCRIBE_COMMON_EVENT消息进行处理,调用SubscribeCommonEvent函数

{
    if (data.ReadInterfaceToken() != GetDescriptor()) {
        EVENT_LOGE("local descriptor is not equal to remote");
        return ERR_TRANSACTION_FAILED;
    }

    switch (code) {
        …….
        case static_cast<uint32_t>(ICommonEvent::Message::CES_SUBSCRIBE_COMMON_EVENT): {
                 std::unique_ptr<CommonEventSubscribeInfo> subscribeInfo(data.ReadParcelable<CommonEventSubscribeInfo>());
            sptr<IRemoteObject> commonEventListener = data.ReadRemoteObject();
            if (!subscribeInfo) {
                EVENT_LOGE("Failed to ReadParcelable<CommonEventSubscribeInfo>");
                return ERR_INVALID_VALUE;
            }
            if (commonEventListener == nullptr) {
                EVENT_LOGE("Failed to ReadParcelable<IRemoteObject>");
                return ERR_INVALID_VALUE;
            }
            bool ret = SubscribeCommonEvent(*subscribeInfo, commonEventListener);
            if (!reply.WriteBool(ret)) {
                EVENT_LOGE("Failed to write reply");
                return ERR_INVALID_VALUE;
            }
            break;
        }

        …….

        default:
            EVENT_LOGW("unknown, code = %{public}u, flags= %{public}u", code, option.GetFlags());
            return IPCObjectStub::OnRemoteRequest(code, data, reply, option);
    }

    return NO_ERROR;
}

6、IPC服务框架Stub调用server服务端SubscribeCommonEvent函数,通过std::bind绑定函数,绑定到InnerCommonEventManager::SubscribeCommonEvent 函数

    const CommonEventSubscribeInfo &subscribeInfo, const sptr<IRemoteObject> &commonEventListener)
{
    …….
    std::function<void()> SubscribeCommonEventFunc = std::bind(&InnerCommonEventManager::SubscribeCommonEvent,
        innerCommonEventManager_,subscribeInfo,commonEventListener,recordTime,IPCSkeleton::GetCallingPid(),
        callingUid,callerToken,bundleName);
    return handler_->PostTask(SubscribeCommonEventFunc);
}

7、绑定函数InnerCommonEventManager::SubscribeCommonEvent,详细实现,调用PublishStickyEvent函数

    const sptr<IRemoteObject> &commonEventListener, const struct tm &recordTime, const pid_t &pid, const uid_t &uid,
    const Security::AccessToken::AccessTokenID &callerToken, const std::string &bundleName)
{
    ……

    auto record = DelayedSingleton<CommonEventSubscriberManager>::GetInstance()->InsertSubscriber(
        sp, commonEventListener, recordTime, eventRecordInfo);

    PublishStickyEvent(sp, record);

    return true;
};

8、PublishStickyEvent函数接收处理事件,调用并调用controlPtr_->PublishStickyCommonEvent函数

    const std::shared_ptr<CommonEventSubscribeInfo> &sp, const std::shared_ptr<EventSubscriberRecord> &subscriberRecord)
{
    …….
    for (auto commonEventRecord : commonEventRecords) {
        if (!commonEventRecord) {
            EVENT_LOGW("commonEventRecord is nullptr and get next");
            continue;
        }
        EVENT_LOGI("publish sticky event : %{public}s",
            commonEventRecord->commonEventData->GetWant().GetAction().c_str());
        commonEventRecord->publishInfo->SetOrdered(false);
        if (!controlPtr_) {
            EVENT_LOGE("CommonEventControlManager ptr is nullptr");
            return false;
        }
        controlPtr_->PublishStickyCommonEvent(*commonEventRecord, subscriberRecord);
    }

    return true;
}

9、PublishStickyCommonEvent先检查订阅记录的合法性,然后调用ProcessUnorderedEvent处理相关事件。

    const CommonEventRecord &eventRecord, const std::shared_ptr<EventSubscriberRecord> &subscriberRecord)
{
    EVENT_LOGI("enter");

    if (!subscriberRecord) {
        EVENT_LOGE("subscriberRecord is null");
        return false;
    }
    return ProcessUnorderedEvent(eventRecord, subscriberRecord);
}

10、无序事件处理,绑定函数std::bind调用NotifyUnorderedEvent

    const CommonEventRecord &eventRecord, const std::shared_ptr<EventSubscriberRecord> &subscriberRecord)
{
    …….

    std::function<void()> innerCallback =
        std::bind(&CommonEventControlManager::NotifyUnorderedEvent, this, eventRecordPtr);

    if (eventRecord.isSystemEvent) {
        ret = handler_->PostImmediateTask(innerCallback);
    } else {
        ret = handler_->PostTask(innerCallback);
    }

    return ret;
} 

11、通知处理函数NotifyUnorderedEvent 通过CommonEventListener调用到代理类EventReceiveProxy 的NotifyEvent函数

{
    ……
    for (auto vec : eventRecord->receivers) {
        size_t index = eventRecord->nextReceiver++;
        eventRecord->curReceiver = vec->commonEventListener;
        if (vec->isFreeze) {
            eventRecord->deliveryState[index] = OrderedEventRecord::SKIPPED;
            DelayedSingleton<CommonEventSubscriberManager>::GetInstance()->InsertFrozenEvents(vec, *eventRecord);
        } else {
            ……
            if (ret == OrderedEventRecord::DELIVERED) {
                eventRecord->state = OrderedEventRecord::RECEIVEING;
                commonEventListenerProxy->NotifyEvent(
                    *(eventRecord->commonEventData), false, eventRecord->publishInfo->IsSticky());
                eventRecord->state = OrderedEventRecord::RECEIVED;
            }
        }
    }
    ……
}

12、在EventReceiveProxy::NotifyEvent函数中发送消息CES_NOTIFY_COMMON_EVENT

{
    ……
    int32_t result = remote->SendRequest(
        static_cast<uint32_t>(IEventReceive::Message::CES_NOTIFY_COMMON_EVENT), data, reply, option);
    if (result != OHOS::NO_ERROR) {
        EVENT_LOGE("Failed to SendRequest, error code: %{public}d", result);
        return;
    }

    EVENT_LOGD("end");
}

服务端接收CES_NOTIFY_COMMON_EVENT消息然后调用NotifyEvent函数处理

{
    if (data.ReadInterfaceToken() != GetDescriptor()) {
        EVENT_LOGE("local descriptor is not equal to remote");
        return ERR_TRANSACTION_FAILED;
    }
    switch (code) {
        case static_cast<uint32_t>(IEventReceive::Message::CES_NOTIFY_COMMON_EVENT): {
            std::unique_ptr<CommonEventData> eventData(data.ReadParcelable<CommonEventData>());
            bool ordered = data.ReadBool();
            bool sticky = data.ReadBool();
            if (eventData == nullptr) {
                EVENT_LOGE("callback stub receive common event data is nullptr");
                return ERR_INVALID_VALUE;
            }
            NotifyEvent(*eventData, ordered, sticky);
            break;
        }

        default:
            EVENT_LOGW("event receive stub receives unknown code, code = %{public}u", code);
            return IPCObjectStub::OnRemoteRequest(code, data, reply, option);
    }

    return NO_ERROR;
}

13、CommonEventListener是EventReceiveStub服务端的实现类因此调用到NotifyEvent函数,最终通过绑定函数std::bind调用OnReceiveEvent函数进行消息的接口处理

{
    EVENT_LOGI("enter");

    std::lock_guard<std::mutex> lock(mutex_);
    if (!IsReady()) {
        EVENT_LOGE("not ready");
        return;
    }

    std::function<void()> onReceiveEventFunc =
        std::bind(&CommonEventListener::OnReceiveEvent, this, commonEventData, ordered, sticky);
    handler_->PostTask(onReceiveEventFunc);
}


14、绑定函数CommonEventListener::OnReceiveEvent最终接收和处理消息

const bool &ordered, const bool &sticky)
{
    ……
    commonEventSubscriber_->SetAsyncCommonEventResult(result);

    commonEventSubscriber_->OnReceiveEvent(commonEventData);

    if ((commonEventSubscriber_->GetAsyncCommonEventResult() != nullptr) && ordered) {
        commonEventSubscriber_->GetAsyncCommonEventResult()->FinishCommonEvent();
    }
    EVENT_LOGI("end");
}

15、接收订阅消息的函数处理COMMON_EVENT_PACKAGE_REMOVED相关消息

{
    auto want = data.GetWant();
    std::string action = want.GetAction();
    if (action == OHOS::EventFwk::CommonEventSupport::COMMON_EVENT_PACKAGE_REMOVED) {
        int uid = want.GetIntParam(AppExecFwk::Constants::UID, -1);
        //消息处理
        ……
    } 
}

从上面我们就梳理整个消息订阅流程,希望对大家有所帮助。

附件链接:https://ost.51cto.com/resource/2572

本文作者:软通动力HOS

想了解更多关于开源的内容,请访问:​

​51CTO 开源基础软件社区​

​https://ost.51cto.com/#bkwz​