【本文正在参加2023年第一期优质创作者激励计划】
前言
之前我们介绍了在Openharmony消息通知机制中的系统层事件发布流程,了解事件发布的来龙去脉,但消息订阅和事件发布在Openharmony中是息息相关的,在实际的消息使用过程中,我们只有发布消息事件后,才能订阅相关消息,消息订阅也是消息通知中重要不可缺少的一环,因此通过消息订阅,我们才能逐步地展露Openharmony消息通知机制神秘面纱。这篇文章我们来介绍消息订阅在Openharmony 3.1Release中整体流程,深入的分析消息订阅实现原理,消息订阅的函数调用关系,使开发者能够从整体了解消息订阅流程,便于快速适配消息和处理消息通知相关问题。
整体流程
代码流程
1、在消息使用的过程中,首先初始化订阅消息相关参数,添加具体事件如:
COMMON_EVENT_PACKAGE_REMOVED,然后进一步处理调用函数CommonEventManager ::SubscribeCommonEvent。
matchingSkills.AddEvent(EventFwk::CommonEventSupport::COMMON_EVENT_PACKAGE_REMOVED);
OHOS::EventFwk::CommonEventSubscribeInfo subscriberInfo(matchingSkills);
systemEventSubscriber_ = std::make_shared<SystemEventSubscriber>(subscriberInfo);
if (systemEventSubscriber_ == nullptr) {
HKS_LOG_E("huks system subscriber nullptr");
return false;
}
return OHOS::EventFwk::CommonEventManager::SubscribeCommonEvent(systemEventSubscriber_);
2、CommonEventManager::SubscribeCommonEvent的实现,关联到CommonEvent类的订阅函数SubscribeCommonEvent上。
{
return DelayedSingleton<CommonEvent>::GetInstance()->SubscribeCommonEvent(subscriber);
}
3、CommonEvent::SubscribeCommonEvent的具体实现,订阅初始化调用CommonEventProxy的实现类
{
…..
sptr<IRemoteObject> commonEventListener = nullptr;
uint8_t subscribeState = CreateCommonEventListener(subscriber, commonEventListener);
if (subscribeState == INITIAL_SUBSCRIPTION) {
EVENT_LOGD("before SubscribeCommonEvent proxy valid state is %{public}d", isProxyValid_);
return commonEventProxy_->SubscribeCommonEvent(subscriber->GetSubscribeInfo(), commonEventListener);
} else if (subscribeState == ALREADY_SUBSCRIBED) {
return true;
} else {
return false;
}
}
4、订阅事件通过代理端向调用SendRequest函数发送CES_SUBSCRIBE_COMMON_EVENT消息到服务端。
const CommonEventSubscribeInfo &subscribeInfo, const sptr<IRemoteObject> &commonEventListener)
{
EVENT_LOGD("start");
MessageParcel data;
MessageParcel reply;
……
bool ret = SendRequest(ICommonEvent::Message::CES_SUBSCRIBE_COMMON_EVENT, data, reply);
if (ret) {
ret = reply.ReadBool();
}
EVENT_LOGD("end");
return ret;
}
5、服务端的stub函数接收消息,对不同的消息类型进行处理,当遇到CES_SUBSCRIBE_COMMON_EVENT消息进行处理,调用SubscribeCommonEvent函数
{
if (data.ReadInterfaceToken() != GetDescriptor()) {
EVENT_LOGE("local descriptor is not equal to remote");
return ERR_TRANSACTION_FAILED;
}
switch (code) {
…….
case static_cast<uint32_t>(ICommonEvent::Message::CES_SUBSCRIBE_COMMON_EVENT): {
std::unique_ptr<CommonEventSubscribeInfo> subscribeInfo(data.ReadParcelable<CommonEventSubscribeInfo>());
sptr<IRemoteObject> commonEventListener = data.ReadRemoteObject();
if (!subscribeInfo) {
EVENT_LOGE("Failed to ReadParcelable<CommonEventSubscribeInfo>");
return ERR_INVALID_VALUE;
}
if (commonEventListener == nullptr) {
EVENT_LOGE("Failed to ReadParcelable<IRemoteObject>");
return ERR_INVALID_VALUE;
}
bool ret = SubscribeCommonEvent(*subscribeInfo, commonEventListener);
if (!reply.WriteBool(ret)) {
EVENT_LOGE("Failed to write reply");
return ERR_INVALID_VALUE;
}
break;
}
…….
default:
EVENT_LOGW("unknown, code = %{public}u, flags= %{public}u", code, option.GetFlags());
return IPCObjectStub::OnRemoteRequest(code, data, reply, option);
}
return NO_ERROR;
}
6、IPC服务框架Stub调用server服务端SubscribeCommonEvent函数,通过std::bind绑定函数,绑定到InnerCommonEventManager::SubscribeCommonEvent 函数
const CommonEventSubscribeInfo &subscribeInfo, const sptr<IRemoteObject> &commonEventListener)
{
…….
std::function<void()> SubscribeCommonEventFunc = std::bind(&InnerCommonEventManager::SubscribeCommonEvent,
innerCommonEventManager_,subscribeInfo,commonEventListener,recordTime,IPCSkeleton::GetCallingPid(),
callingUid,callerToken,bundleName);
return handler_->PostTask(SubscribeCommonEventFunc);
}
7、绑定函数InnerCommonEventManager::SubscribeCommonEvent,详细实现,调用PublishStickyEvent函数
const sptr<IRemoteObject> &commonEventListener, const struct tm &recordTime, const pid_t &pid, const uid_t &uid,
const Security::AccessToken::AccessTokenID &callerToken, const std::string &bundleName)
{
……
auto record = DelayedSingleton<CommonEventSubscriberManager>::GetInstance()->InsertSubscriber(
sp, commonEventListener, recordTime, eventRecordInfo);
PublishStickyEvent(sp, record);
return true;
};
8、PublishStickyEvent函数接收处理事件,调用并调用controlPtr_->PublishStickyCommonEvent函数
const std::shared_ptr<CommonEventSubscribeInfo> &sp, const std::shared_ptr<EventSubscriberRecord> &subscriberRecord)
{
…….
for (auto commonEventRecord : commonEventRecords) {
if (!commonEventRecord) {
EVENT_LOGW("commonEventRecord is nullptr and get next");
continue;
}
EVENT_LOGI("publish sticky event : %{public}s",
commonEventRecord->commonEventData->GetWant().GetAction().c_str());
commonEventRecord->publishInfo->SetOrdered(false);
if (!controlPtr_) {
EVENT_LOGE("CommonEventControlManager ptr is nullptr");
return false;
}
controlPtr_->PublishStickyCommonEvent(*commonEventRecord, subscriberRecord);
}
return true;
}
9、PublishStickyCommonEvent先检查订阅记录的合法性,然后调用ProcessUnorderedEvent处理相关事件。
const CommonEventRecord &eventRecord, const std::shared_ptr<EventSubscriberRecord> &subscriberRecord)
{
EVENT_LOGI("enter");
if (!subscriberRecord) {
EVENT_LOGE("subscriberRecord is null");
return false;
}
return ProcessUnorderedEvent(eventRecord, subscriberRecord);
}
10、无序事件处理,绑定函数std::bind调用NotifyUnorderedEvent
const CommonEventRecord &eventRecord, const std::shared_ptr<EventSubscriberRecord> &subscriberRecord)
{
…….
std::function<void()> innerCallback =
std::bind(&CommonEventControlManager::NotifyUnorderedEvent, this, eventRecordPtr);
if (eventRecord.isSystemEvent) {
ret = handler_->PostImmediateTask(innerCallback);
} else {
ret = handler_->PostTask(innerCallback);
}
return ret;
}
11、通知处理函数NotifyUnorderedEvent 通过CommonEventListener调用到代理类EventReceiveProxy 的NotifyEvent函数
{
……
for (auto vec : eventRecord->receivers) {
size_t index = eventRecord->nextReceiver++;
eventRecord->curReceiver = vec->commonEventListener;
if (vec->isFreeze) {
eventRecord->deliveryState[index] = OrderedEventRecord::SKIPPED;
DelayedSingleton<CommonEventSubscriberManager>::GetInstance()->InsertFrozenEvents(vec, *eventRecord);
} else {
……
if (ret == OrderedEventRecord::DELIVERED) {
eventRecord->state = OrderedEventRecord::RECEIVEING;
commonEventListenerProxy->NotifyEvent(
*(eventRecord->commonEventData), false, eventRecord->publishInfo->IsSticky());
eventRecord->state = OrderedEventRecord::RECEIVED;
}
}
}
……
}
12、在EventReceiveProxy::NotifyEvent函数中发送消息CES_NOTIFY_COMMON_EVENT
{
……
int32_t result = remote->SendRequest(
static_cast<uint32_t>(IEventReceive::Message::CES_NOTIFY_COMMON_EVENT), data, reply, option);
if (result != OHOS::NO_ERROR) {
EVENT_LOGE("Failed to SendRequest, error code: %{public}d", result);
return;
}
EVENT_LOGD("end");
}
服务端接收CES_NOTIFY_COMMON_EVENT消息然后调用NotifyEvent函数处理
{
if (data.ReadInterfaceToken() != GetDescriptor()) {
EVENT_LOGE("local descriptor is not equal to remote");
return ERR_TRANSACTION_FAILED;
}
switch (code) {
case static_cast<uint32_t>(IEventReceive::Message::CES_NOTIFY_COMMON_EVENT): {
std::unique_ptr<CommonEventData> eventData(data.ReadParcelable<CommonEventData>());
bool ordered = data.ReadBool();
bool sticky = data.ReadBool();
if (eventData == nullptr) {
EVENT_LOGE("callback stub receive common event data is nullptr");
return ERR_INVALID_VALUE;
}
NotifyEvent(*eventData, ordered, sticky);
break;
}
default:
EVENT_LOGW("event receive stub receives unknown code, code = %{public}u", code);
return IPCObjectStub::OnRemoteRequest(code, data, reply, option);
}
return NO_ERROR;
}
13、CommonEventListener是EventReceiveStub服务端的实现类因此调用到NotifyEvent函数,最终通过绑定函数std::bind调用OnReceiveEvent函数进行消息的接口处理
{
EVENT_LOGI("enter");
std::lock_guard<std::mutex> lock(mutex_);
if (!IsReady()) {
EVENT_LOGE("not ready");
return;
}
std::function<void()> onReceiveEventFunc =
std::bind(&CommonEventListener::OnReceiveEvent, this, commonEventData, ordered, sticky);
handler_->PostTask(onReceiveEventFunc);
}
14、绑定函数CommonEventListener::OnReceiveEvent最终接收和处理消息
const bool &ordered, const bool &sticky)
{
……
commonEventSubscriber_->SetAsyncCommonEventResult(result);
commonEventSubscriber_->OnReceiveEvent(commonEventData);
if ((commonEventSubscriber_->GetAsyncCommonEventResult() != nullptr) && ordered) {
commonEventSubscriber_->GetAsyncCommonEventResult()->FinishCommonEvent();
}
EVENT_LOGI("end");
}
15、接收订阅消息的函数处理COMMON_EVENT_PACKAGE_REMOVED相关消息
{
auto want = data.GetWant();
std::string action = want.GetAction();
if (action == OHOS::EventFwk::CommonEventSupport::COMMON_EVENT_PACKAGE_REMOVED) {
int uid = want.GetIntParam(AppExecFwk::Constants::UID, -1);
//消息处理
……
}
}
从上面我们就梳理整个消息订阅流程,希望对大家有所帮助。
附件链接:https://ost.51cto.com/resource/2572
本文作者:软通动力HOS