Flink CEP能够从事件流中匹配特定的事件模式 (事件中定义的一组条件和规则), 并对其进行相应的操作和处理。例如:在金融领域中检测欺诈交易行为时,需要考虑多笔交易之间的时序和金额关系。在物联网领域中需要在设备运行状态事件流中实时监测设备状态变化及设备之间协作。
1. Flink CEP开发步骤
使用Flink CEP对事件进行模式匹配时,从四个步骤入手
#1.定义输入事件流
DataStream<Event> input = ...;
#2.定义模式匹配规则
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(SimpleCondition.of(event -> event.getId() == 42))
.next("middle")
.subtype(SubEvent.class)
.where(SimpleCondition.of(subEvent -> subEvent.getVolume() >= 10.0))
.followedBy("end")
.where(SimpleCondition.of(event -> event.getName().equals("end")));
#3.模式匹配应用在事件流上检测
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
#4.对匹配的复杂事件进行结果输出
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(Map<String, List<Event>> pattern,
Context ctx,Collector<Alert> out) throws Exception {
out.collect(createAlertFrom(pattern));
}
});
2.Pattern API设置事件模式
使用 量词和 条件 设置事件模式
2.1 量词
oneOrMore()
指定模式期望匹配到的事件至少出现一次。假设pattern表示匹配a事件,pattern.oneOrMore()表示匹配a事件一次到多次。
times(#ofTimes)
指定模式期望匹配到的事件正好出现的次数。假设pattern表示匹配a事件,pattern.times(2)表示匹配a事件2次。
times(#fromTimes,$toTimes)
指定模式期望匹配到的事件出现次数在#fromTimes和#toTimes之间。假设pattern表示匹配a事件,pattern.times(2,4)表示匹配a事件2、3、4次。
timesOrMore(#times)
指定模式期望匹配到的事件至少出现times次。假设pattern表示匹配a事件,pattern.timesOrMore(2)表示匹配a事件2次到多次。
optional()
指定该模式是可选的,表示该模式可以出现也可以不出现,对前面这些量词都适用,在多个单独模式组合成组合模式中使用才有意义。假设pattern表示匹配a事件,pattern.times(2,4).optional()表示匹配a事件0、2、3、4次。
greedy()
指定该模式是贪心的,会重复尽可能多的次数,只对量词适用,不支持模式组,在 多个单独模式组合成组合模式 中使用才有意义。假设pattern表示匹配a事件,pattern.times(2,4).greedy()表示尽可能多的匹配a事件,如果输入4次a,那么2次a和3次a事件都不算匹配事件
2.2 条件
2.2.1 简单条件
简单条件非常简单,从事件本身中获取信息来进行判断,只需要考虑当前事件本身即可。
start.where(SimpleCondition.of(value -> value.getName().startsWith("foo")));
2.2.2 组合条件
组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连。如果需要使用OR逻辑,直接使用or方法连接条件即可。
pattern
.where(SimpleCondition.of(value -> ... /*一些判断条件*/))
.or(SimpleCondition.of(value -> ... /*一些判断条件*/));
2.2.3 迭代条件
迭代条件需要在 当前事件处理中获取到该模式先前匹配的事件 进行对比或处理才能决定当前事件是否被匹配,这种需要对该模式先前匹配事件进行处理作为判断当前事件是否匹配模式的条件叫做迭代条件。
middle.oneOrMore()
.subtype(SubEvent.class)
.where(new IterativeCondition<SubEvent>() {
@Override
public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception {
//判断当前事件是否以foo开头
if (!value.getName().startsWith("foo")) {
return false;
}
double sum = value.getPrice();
//获取当前模式先前已经匹配的事件,计算sum总和
for (Event event : ctx.getEventsForPattern("middle")) {
sum += event.getPrice();
}
return Double.compare(sum, 5.0) < 0;
}
});
3. 事件模式的种类
3.1 单例模式
一个事件的匹配,比如:基站日志通话数据流匹配通话时长大于10s的事件
3.2 组合模式
多个单独模式组合在一起就形成了模式序列,即:组合模式。
可以对单独模式通过调用next、followedBy、followedByAny方法进行连接,不同方法表示了事件之间不同的连续策略
分别对应严格邻近(Strict Contiguity)、宽松邻近(Relaxed Contiguity)、非确定宽松邻近(Non-Deterministic Relaxed Contiguity)三种事件连续策略。
3.2.1 严格邻近
严格邻近中期望所有匹配的事件严格一个接一个出现,中间没有任何不匹配的事件
3.2.2 宽松邻近
宽松邻近并不会像严格邻近这么严格,两个匹配的事件之间可以有其他不匹配的事件出现,也就是说宽松邻近会忽略没有成功匹配的事件直到下一个匹配事件出现为止。
3.2.3 非确定宽松邻近
非确定宽松邻近策略相比于宽松邻近策略更加宽松,可以忽略已经匹配的事件,也就是说可以重复使用匹配过的事件,非确定宽松邻近比宽松邻近匹配的结果往往更多。
3.3 循环模式
某个单独模式使用了oneOrMore()/times这些量词形成循环模式,默认循环中匹配各个事件之间连续策略为宽松邻近策略
4. 跳过策略
Flink CEP中对于给定的一个模式匹配,同一个事件可能会分配到多个匹配结果中。例如,复杂事件模式为a+ b (表示a事件可以有1到多个,再往后可以跟上1个b事件),输入数据流:a1,a2,b 输出结果会有{a1,a2,b、a1,b、a2,b} ,a1事件就会输出到多个匹配结果中,一些情况下我们希望对这些匹配的结果进行精简,可以使用Flink CEP中提供的“匹配后跳过策略”(After Match Skip Strategy)。
AfterMatchSkipStrategy.noSkip()
AfterMatchSkipStrategy.skipToNext()
AfterMatchSkipStrategy.skipPastLastEvent()
AfterMatchSkipStrategy.skipToFirst(patternName)
AfterMatchSkipStrategy.skipToLast(patternName)