1、esper的处理模型是持续性的——根据statement中事件流(event stream)、视图(views)、过滤器(filters)等的选择,esper引擎一旦处理事件数据,就会变更statement中监听或subscriber接收到事件信息。
2、insert Stream — 表示新事件进入到引擎,并进入到事件窗口等。
先看个例子 :
select * from Withdrawal
这个例子的IStream 其实就是进入引擎的withdrawal事件流,并作为新事件被推送给listener。如下图所示:
说明:按照上面的EPL语句:select * from withdrawal; 每当有一个withdrawal事件进入时,如W1、W2等,一旦进入到引擎,就会当成新事件(New Events)立刻推送给updatelistener。
3、IRStream — Insert and Remove Stream
当使用IRStream时,EPL中就会有事件窗口——长度窗口(length window)或者 时间窗口(time window)的使用。比如下面的EPL语句:
select * from Withdrawal.win:length(5)
这个EPL语句使用了长度窗口(length window)— win:length(N)。表示引擎会将过去的n条事件保存在事件流中。如下图所示:
说明:上图表示的是使用长度为5的事件窗口,事件的进入以及stream中事件信息的变化,通过引擎推送给监听的事件信息。
win:length(5) 在Stream中最多保存5条数据,参考W5、W6事件的进入。当W5作为新事件进入到事件窗口时,此时窗口中的数据条数为5,达到了窗口的最大长度;W6事件进入时,则把W1从窗口中移除出去——遵循的是FIFO原则(先进先出),每一个进入的新事件都会在监听中作为新事件输出,只有窗口长度5的情况下,才会有旧事件的输出。比如当W6进入时,监听中的新事件为W6,而W1则作为旧事件被监听获取。
4、过滤器(filter)和where语句
首先,从一个EPL开始:
select * from Withdrawal(amount>=200).win:length(5)
在这个EPL中,有一个特殊的语法也就是 Withdrawal(amount>=200),通过Stream(表达式)的语法,即为filter。其实现的功能是对即将进入到事件窗口的事件进行过滤,满足条件的事件,则被放入到窗口中。上面EPL表达的是 只有amount >= 200的withdrawal事件,才可以被放入到长度为5的事件窗口。换句话说,这个事件窗口中所有的事件,其amount属性都不小于200。如下图:
说明:每一个进入的事件,首先通过filter,当满足fiter条件时,才会放入到事件窗口;而进入事件窗口的同时,引擎也会将该事件作为新事件推送给监听或者subscriber。
where语句与filter有所不同,如EPL语句:
select * from Withdrawal.win:length(5) where amount >= 200
以及事件通过where过滤的处理模型如下:
说明:当有新事件进入时,会先进入到事件窗口;在引擎要将事件推送给监听之前,判断where条件,满足where条件的事件,才会作为新事件传送给监听。
5、时间窗口(time window)
时间窗口,是一个滑动的事件窗口,其以系统时间为准,延伸到过去指定的时间间隔。比如win:time(10 seconds),这个时间窗口保存的事件是当前时间以及此前10秒这一时间间隔的所有事件。比如下面的EPL语句:
select * from Withdrawal.win:time(4 sec)
表示时间窗口中的事件是过去4秒钟所有的withdrawal事件。如下图所示:
说明:当第一个事件W1在t+4时刻进入到引擎时,其时间窗口从t到t+4这一时间段,只有一个事件W1,同时该事件作为新事件推送给监听;当在t+5时刻,W2进入到引擎,此时事件窗口的时间范围为 t+1 ~ t+5,窗口数据为 W1和W2,而此时W2也作为新事件输出到监听。时间窗口随着系统时间的变化,其窗口表示的时间范围也发送变化,当在t+8时,因为在t+4(其实是个临界点)这个时刻进入的W1,因为已经不在该时间窗口,故W1作为旧事件被推送给监听。
6、批量窗口(batch window)
批量窗口包括 时间批量窗口(win:time_batch)和长度批量窗口(win:length_batch)。
首先从时间批量开始,Time bath view缓存事件信息并且按照指定时间间隔在一次变更中释放所有缓存的事件。EPL如下:
select * from Withdrawal.win:time_batch(4 sec)
上述时间批量窗口表示每隔4s形成一个事件窗口,老的窗口中的所有事件则作为新事件推送给监听。如下图:
说明:
· t+1时,W1事件发生并进入批量缓存,此时不会通知监听。
· t+3时,W2事件发生并进入批量缓存,不通知监听。
· t+4时,满足了窗口间隔时间,此时缓存中有两个事件W1和W2,引擎处理,并通知监听,此时输出事件为 W1和 W2。此时创建一个新的bath buffer。
· t+6 与 t+7之间有事件 W3进入bath buffer,监听无动作。
· t+8时,引擎处理bath缓存中的事件,并传递给监听。此时输出事件为 W3。Old Events 中包括了 W1和W2.
长度批量窗口,基本上与时间批量窗口一样,比如:
select * from withdrawal.win:length_batch(5)
上面的长度批量窗口,每当窗口事件总数达到5条时,则创建一个新的batch buffer,而老的事件窗口中5条事件作为新事件输出到监听。
【总结】filter和where的区别在于条件执行的时机——fiter是事件进入事件窗口之前就进行了过滤,不满足条件的事件不会进入到窗口,更不会交付给引擎进行处理;而where则是从事件窗口中取出事件,通过引擎进行条件筛选,满足条件的事件则作为新事件交付给监听。从这个地方,可以看出,在过滤相同条件时,filter的效率会高于where,所以在能使用filter的时候,尽量不要使用where语句进行事件筛选。
事件窗口——时间窗口和长度窗口,这里时间窗口时一个滑动的窗口,随着时间推移,窗口也在不断移动;长度窗口更像是一个固定长度的queue,当达到窗口的总容量时,移除窗口中最先进入的事件(FIFO),并作为旧事件交付给监听。
批量窗口,其实就是每个多久或者每个多少条事件做一次输出,本次输出的内容为新事件;当下一次输出时,上一次输出的新事件也就成了本次输出的旧事件。