Framework事件机制—Event Hub原理及事件解析

时间:2021-10-04 01:05:18

EventHub简述

Android系统基于Linux系统,由多个子系统组合而成,各子系统分工合作,在各自功能域中扮演关键角色。其中一个比较重要的子系统是Input子系统,正如其名地,挂载于Android的各输入设备的输入事件,会通过Input子系统传输到上层(Android框架层或事件处理层)执行处理流程。

EventHub在Input子系统中可以看做起到连结上下层的一个重要模块:对下,它监听Input设备的加入与删除,获取Input设备的输入事件;往上,它将输入事件整合由InputReader获取并分发至各Android上层服务或应用。简单来说,EventHub的工作原理是,InputReader中持续循环线程loopOnce()调用EventHub的getEvents(),该函数epoll_wait等待着Input设备的输入事件到来。EventHub的工作又可以细分为监听新设备加入与监听设备事件获取两部分。

基本概念

EventHub(事件管理中心)是系统为开发者提供的一种事件管理机制,包含事件订阅、事件注销、事件发送等功能。目的是为了方便开发者在不同组件之间进行通信。

实现原理

EventHub内部持有了一个key-value的数据结构,当事件接收方订阅事件后,EventHub会以事件名为key,回调函数为value将其保存下来。当事件发送方发送事件时,EventHub根据事件名查找对应回调函数,然后调用回调函数完成事件通知。

Framework事件机制—Event Hub原理及事件解析

约束与限制

  • EventHub只有Stage模型的应用才可以使用
  • 在发送和接收事件时需要使用同一个context实例里面的EventHub
  • EventHub采用的是key-value存储,如果在同一个EventHub中订阅了同名的事件,后面订阅的事件会被直接丢弃掉

开发步骤

事件接收方

  1. 保存context实例到globalThis中。 事件发送方和接收方需要使用同一个context实例中的EventHub才可以进行通信,所以这里把context实例保存到globalThis中,接收方才能拿到这个context实例。
  2. 编写回调函数 编写收到事件后需要进行的业务处理函数。
  3. 订阅事件 调用context接口中的eventHub.on来订阅事件
import Ability from '@ohos.application.Ability'
​
export default class MainAbility extends Ability {
    onCreate(want, launchParam) {
        console.log("[Demo] MainAbility onCreate")
        globalThis.abilityWant = want;
        // 1. 保存context实例到globalThis中。
        globalThis.context = this.context;
    }
​
    onDestroy() {
        console.log("[Demo] MainAbility onDestroy")
    }
​
    onWindowStageCreate(windowStage) {
        // Main window is created, set main page for this ability
        console.log("[Demo] MainAbility onWindowStageCreate")
​
        globalThis.startOtherAbility = () => {
            let want = {
                "bundleName": "com.example.eventhub",
                "abilityName": "MainAbility1"
            }
            this.context.startAbility(want);
        }
        // 2. 创建回调函数
        let callback = (a, b) => {
            console.log("a + b = " + (a + b))
        }
        // 3. 订阅事件
        globalThis.context.eventHub.on("calculate",callback);
​
        windowStage.loadContent("pages/index", (err, data) => {
            if (err.code) {
                console.error('Failed to load the content. Cause:' + JSON.stringify(err));
                return;
            }
            console.info('Succeeded in loading the content. Data: ' + JSON.stringify(data))
        });
    }
​
    onWindowStageDestroy() {
        // Main window is destroyed, release UI related resources
        console.log("[Demo] MainAbility onWindowStageDestroy")
    }
​
    onForeground() {
        // Ability has brought to foreground
        console.log("[Demo] MainAbility onForeground")
    }
​
    onBackground() {
        // Ability has back to background
        console.log("[Demo] MainAbility onBackground")
    }
};

事件发送方

  1. 使用和接收方同一个context实例,调用其中的eventHub.emit发送事件
import Ability from '@ohos.application.Ability'
​
export default class MainAbility1 extends Ability {
    onCreate(want, launchParam) {
        console.log("[Demo] MainAbility onCreate")
        globalThis.abilityWant = want;
    }
​
    onDestroy() {
        console.log("[Demo] MainAbility onDestroy")
    }
​
    onWindowStageCreate(windowStage) {
        // Main window is created, set main page for this ability
        console.log("[Demo] MainAbility onWindowStageCreate")
​
        globalThis.sendData = () => {
            // 1. 调用同一个context实例发送事件
            globalThis.context.eventHub.emit("calculate");
        }
​
        windowStage.loadContent("pages/index1", (err, data) => {
            if (err.code) {
                console.error('Failed to load the content. Cause:' + JSON.stringify(err));
                return;
            }
            console.info('Succeeded in loading the content. Data: ' + JSON.stringify(data))
        });
    }
​
    onWindowStageDestroy() {
        // Main window is destroyed, release UI related resources
        console.log("[Demo] MainAbility onWindowStageDestroy")
    }
​
    onForeground() {
        // Ability has brought to foreground
        console.log("[Demo] MainAbility onForeground")
    }
​
    onBackground() {
        // Ability has back to background
        console.log("[Demo] MainAbility onBackground")
    }
};

Event Hub事件中心

Event Hub可以处理多大的数据?

每秒可以处理百万级别的事件(event)。这里的“事件”:就是你收发的数据。 - https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-about,

什么是托管式服务?Managed Service

Event Hub是全托管式的服务。什么意思呢?就是如果不使用Event Hub这样的托管式服务,那么开发者需要自己对大数据流平台进行管理。比如Apache Kafka也是一个大数据流平台,但是它不是全托管的,这意味着开发者将需要自行搭建和管理大数据流的处理,比如搭建(购买和配置)虚拟机集群、安装和管理Kafka、管理储存,也就是说,开发者需要自行进行管理所有涉及的服务、更新、包、版本,或者需要再使用其他平台的服务代为完成这些步骤。而Event Hub为开发者全托管,开发者只需要创建Event Hub,然后就可以进行大数据的收发了,Event Hub保证中间的所有过程,提供稳定的服务。这样开发者对中间过程的控制变弱了,但是可以更加关注自己的业务逻辑。

在大数据流的链条中,client客户端产生数据,server服务器端接收数据和分析数据。Event Hub就像一个client和server中间的缓冲区域(buffer)。

为什么要一个buffer:因为没有buffer的话会造成依赖(dependency)和高耦合(tight coupling)。如果数据量大的话,就会出现问题。而buffer可以在这个数据生产线上起到控流的作用(flow control)。

Event Hub的典型用途是收集在远端产生的遥测(telemetry)数据,包括从1)网络应用的客户端和2)远端设备和门户(如散落各地的共享单车)上获取数据。

工作原理

本质上来讲,Event Hub就是一个暂放数据的地方。

当数据从终端产生,发送数据给一个Event Hub的时候,Event Hub就把数据收集然后写下来,写在其内部的储存里,然后我们就可以阅读这些Event Hub为我们收集的数据,进行可视化、数据分析等等,做我们想做的事情。

Event Hub就好像是一个笔记本,我们从头到尾写,也从头到尾读。

||||||||||||||||||||||||||||||||||||||||||||||||||| --> 写

读–>

笔记本上写下来的数据可以反复阅读,进入Event Hub的数据也可以被多次读取。读取数据的操作并不会将数据从Event Hub上删除。

然而,Event Hub上的数据不是永久保存的。数据到达Event Hub后,会被保存一段时间(这段时间被称作retention day,可以设置为1-7天)。就好比笔记本上,超过某段时间以后,老旧的笔记会被撕掉。

这就是Event Hub的基本工作原理。但是实际上还有一些更细节的概念,开发者们使用时必须要了解。其中最重要的包括Partition和Consumer Group。

什么是Partition?

到达Event Hub的数据其实不是写在同一个地方的,而是写在几个Partition上的。就好比其实Event Hub里面不只有一个笔记本,而是有多个笔记本来记录消息。

Partition好像是Event Hub储存空间的扇区一般。数据在到达Event Hub时,会被挨个分配到某个Partition上,分配默认是轮流的方式(Round Robin)。也就是说,第一条消息到来的时候,会被写到第一个笔记本上,而第二条消息到来的时候,会被写到第二个笔记本上,以此类推。

笔记本 A:1 4

笔记本 B:2 5

笔记本 C:3 <- 6

这种分成多区的形式,目的是为了提供并行接收(读取)的能力。

还是拿笔记本来打比方。比如,一开始的时候,你有一个Event Hub,里面有2个笔记本来记录数据,然后你请了一个人(读取数据的应用)来从这两个笔记本上读取数据(应用可以开两个线程来同时读取数据)。由于刚开始消息并不多,所以一个人的脑子还是够用的。

但是后面,你的业务越做越大,进入的数据流也越来越大,你请的人脑子不够用了(CPU不够,读取速度不够)。这时候,你就需要再请一个人(再开一个读取应用实例),那么这两个人可以每个人管一个笔记本,读取上面的数据。

然而你的业务越做越大,两个人也读不过来了,怎么办呢?这时候你可能要考虑再多加一个笔记本,这样就可以再多请一个人了。不过因为目前partition数量不能在Event Hub创建之后修改,所以只能重新创建一个有更多Partition数量的Event Hub才能满足要求。

当然,Partition并不是越多越好,因为每个Partition都要求有一个单独的Receiver来读取,而这意味着更多CPU资源和Socket连接的代价,所以要谨慎考虑增加Partition的数量,不要随意耗费资源。

最多可以有多少个Partition?

一个Event Hub允许2-32个Partition,在创建Event Hub时设置。目前的话,Event Hub一旦创建就不能修改(只能创建一个新的Event Hub),所以要创建的时候合理预估并行读取的数量。

什么是Consumer Group?

那Consumer Group是什么呢?在Event Hub的概念中,Consumer Group相当于一个读取时的视图(View),我们可以保存一个Consumer Group下,读取流的状态(读取的应用读到了什么位置,或者说偏移量)。这样的话,如果一个应用的读取连接因为某些原因而断开,要重新建立读取连接的话,我们就方便知道要从什么位置继续读取。

我们可以在一个Event Hub中创建多个Consumer Group(最多20个),这样不同的读取应用就可以使用不同的Consumer Group来进行读取。比如,一个Event Hub收集了所有的共享单车的状态数据,而在分析数据的时候,我们有一个应用是用来监测当前单车分布的位置是否需要派人调整,另一个应用则是需要进行用户的使用习惯、行动路线来的分析。两个应用目的不同,读取的频率也并不相同。这样种情况下,就可以使用两个Consumer Group,分别对应每个应用,这样两边的读取可以互不干扰。

如何使用

(*本文是一个介绍,并非教程。所以此处假设已经在Azure里创建了“事件中心命名空间 Event Hub Namespace”和“事件中心 Event Hub”,并且获取了Event Hub的Connection String。)

那么如何使用Event Hub API进行事件的收发呢?

发送接口 Sender API

有两种方式都可以实现将数据发送到Event Hub上的操作。一种是用基于HTTPS协议的REST API来进行发送,也就是在header里设置授权信息,把要发送的数据POST到相应的URL。*

但是更推荐的是第二种发送方式,使用EventClient API。它背后是AMQP协议,更为高效。不过这里我们并不需要理解AMQP的实现方式,只需要使用微软提供的接口。

下面以C#为例,简单介绍发送接口:

API接口在Microsoft.Azure.EventHubs的NuGet包里。

1. 创建 EventData,把要发送的消息放在EventData对象中。

var eventData = new EventData(byteArray);

2.创建 EventHubClient,这时要提供带有授权信息(Connection String)好连接到指定的Event Hub。

EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(connectionString);

3. 调用EventHubClient的发送API来发送数据

eventHubClient.SendAsync(eventData);

接收接口 Receiver API

从Event Hub里面获取出数据的接口基于AMQP协议(并没有基于HTTPS协议的API)。

其中,仍然有两种方式可以实现数据的获取:1)使用EventHubClient的PartitionReceiver;2)使用EventProcessorHost (EPH)API。下面分别介绍:https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-api-overview

使用PartitionReceiver

这是Event Hub提供的一个读取API,可以从一个指定的Partition上读取数据。

API接口在Microsoft.Azure.EventHubs的NuGet包里。

  1. 和发送数据一样,创建EventHubClient连接到Event Hub。这个EventHubClient在发送的时候也有使用到。

EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(connectionString);

  1. 用EventHubClient的CreateReceiver() API来创建PartitionReceiver:

PartitionReceiver partitionReceiver = eventHubClient.CreateReceiver(“$Default”, “0”, DateTime.Now); //从默认ConsumerGroup上,获取“0”号Partition上,从当前时刻以后的所有数据。

在这里,我们想要

  1. 调用PartitionReceiver的接收API来接收消息

var eventDatas = await partitionReceiver.ReceiveAsync(10); // 执行接收,设定最多接收10条数据

  1. 解析数据
​
foreach (var ehEvent in ehEvents)
​
{
​
var message = UnicodeEncoding.UTF8.GetString(ehEvent.Body.Array);
​
// 放入数据分析逻辑的代码
​
}

值得说明的是,一个PartitionReceiver只能从一个partition上读取信息。所以通常的做法是,对于一个Event Hub的每一个partition,都创建一个PartitionReceiver来进行读取,一一对应。

我们可以用EventHubClient的GetRuntimeInfoAsync() API来得到runtime信息,这里面我们可以知道所有的partition,从而一一创建PartitionReceiver:

var runTimeInformation = await eventHubClient.GetRuntimeInformationAsync();
​
foreach (var **partitionId** in **runTimeInformation.PartitionIds**) {
​
var receiver = eventHubClient.**CreateReceiver**(PartitionReceiver.DefaultConsumerGroupName, **partitionId**, Date.Time.Now);
​
}

EventProcessorHost

PartitionReceiver直接从一个指定的partition上读取数据,简单好用。然而,我们往往需要读取多个partition上的数据,并且在大数据的场景下对可扩展性有着相当的需求,这样我们既需要管“给每个partition创建PartitionReceiver”这个事情,还需要管“开了多个应用实例以后partition读取工作的分配”的事情,还要考虑“如果一个应用实例挂了,要怎么从之前的进度开始接着读取”,这样写起来十分复杂。

有没有更自动省力的方法呢?

当然有哒!如果不想要手写一个一个partition的读取、offset的记录、规模的缩放,我们可以使用EventProcessorHost(EPH)来进行处理。

本质上讲,EPH提供两个功能:1. EPH会自动把一个Event Hub中的每一个partition都创建一个EventProcessor(相当于一个receiver),并且把这些Processor平均分配给现有的应用实例去处理,并且实时监控这些应用实例;2. EPH会把读取的进度自动保存下来。这样,不管你有多少个实例在健康运行,或者某个实例挂掉了,都可以保证你的读取正常进行,提供高可用性(availability)。

简单点说,现在你有若干个个笔记本上的数据要读,如果使用PartitionReceiver的方法,就是你请了若干个人来读这些笔记本(创建若干App实例),你自己安排哪个人读哪个笔记本,那么如何分配你就得自己操心。如果使用EPH,那么相当于请了一个经理来管理,经理会根据你请了几个人(开几个App实例),自动分配每个人干的活。如果某个人请假了(某个App挂了),经理也会自动安排他的工作给其他人。同时,经理会记录每个笔记本读到了哪里,以便如果读取工作被中断(如读取连接断开),后面还可以继续从上次的地方接着读。

API接口在Microsoft.Azure.EventHubs.Processor的NuGet包里。**

首先,要实现一个IEventProcessor接口:

​
CloseAsync(), OpenAsync(), ProcessErrorAsync(), **ProcessEventsAsync****()**
​
public class YourEventProcessor : IEventProcessor{
​
public Task CloseAsync(PartitionContext context, CloseReason reason){
​
// your implementation when close
​
}
​
public Task OpenAsync(PartitionContext context){
​
// your implementation when open
​
}
​
public Task ProcessErrorAsync(PartitionContext context, Exception error){
​
// your implementation to process error
​
}
​
public Task ProcessEventAsync(PartitionContext context, IEnumerable<EventData> eventDatas){
​
// your implementation to process event data
​
if(eventDatas != null){
​
foreach(var eventData in eventDatas){
​
// process data here
​
}
​
}
​
return context.CheckpointAsync(); // save offset
​
}
​
}

上面的代码中,实际的数据分析逻辑就写在ProcessEventsAsync()里,最后 “return context.CheckpointAsync();” 进行读取进度的保存。

然后,在主程序中创建EventProcessorHost:

var yourEventProcessorHost = new EventProcessorHost(
​
eventHubPath,
​
consumerGroupName,
​
eventHubConnectionString,
​
storageConnectionString,
​
containerName);

其中,eventHubPath、consumerGroupName、eventHubConnectionString是你创建好的EventHub的验证信息,storageConnectionString和containerName是Azure Storage Account的验证信息,这是用来保存读取进度的,它也需要提前创建好(这里不介绍如何创建)。

下一步,要把这个EPH和刚刚创建的EventProcessor连接起来:

await yourEventProcessorHost.RegisterEventProcessorAsync<YourEventProcessor>();
​
最后,主程序退出时,取关EventProcessor。
​
await yourEventProcessorHost.UnregisterEventProcessorAsync();

这样,即使你有多个应用实例,EPH可以帮你管理,并且以你想要的方式处理。

也可以选择使用别的storage,需要使用ICheckpointManager,这里不详细介绍。

EPH接口正在进行重新优化设计,在本文发文后,接口可能会有较大变动,读者请以实时的官方文档为主。

Event Hub支持的语言以及API包:

- C# .Net https://docs.microsoft.com/en-us/dotnet/api/overview/azure/event-hubs?view=azure-dotnet
​
入门指导文档 https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-programming-guide
​
- Java https://github.com/Azure/azure-event-hubs-java
- Go https://godoc.org/github.com/Azure/azure-event-hubs-go
- Python https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhubs
- JavaScript (Node.js & Browser) https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/eventhub/event-hubs

总结

EventHub的本质是messaging queue的变种——messaging log,是大数据时代下的产物,作为大数据分析流的门户,专门为大数据分析提供了一个缓冲、负载平衡,为大数据的收发提供可靠、易于操作的平台服务。希望这篇文章所介绍的Event Hub的定义、工作原理、使用方式能带给大家启发。也希望有说的不够清楚、不够严谨的地方,请大家多多指正 ????

Framework事件机制—Event Hub原理及事件解析