Flink Window 常见需求背景
需求描述
每隔 5 秒,计算最近 10 秒单词出现的次数 —— 滑动窗口
每隔 5 秒,计算最近 5 秒单词出现的次数 —— 滚动窗口
关于 Flink time 种类 TimeCharacteristic
- ProcessingTime
- IngestionTime
- EventTime
WindowAssigner 的子类
- SlidingProcessingTimeWindows
- SlidingEventTimeWindows
- TumblingEventTimeWindows
- TumblingProcessingTimeWindows
使用 EventTime + WaterMark 处理乱序数据
示意图:
- 使用 onPeriodicEmit 方法发送 watermark,默认每 200ms 发一次。
- 窗口起始时间默认按各个时区的整点时间,支持自定义 offset。
Flink Watermark 机制定义
有序的流的 Watermarks
无序的流的 Watermarks
多并行度流的 Watermarks
深入理解 Flink Watermark
Flink Window 触发的条件:
- watermark 时间 >= window_end_time
- 在 [window_start_time, window_end_time) 区间中有数据存在(注意是左闭右开的区间),而且是以 event time 来计算的
Flink 处理太过延迟数据
Flink 丢弃延迟太多的数据
企业生产中一般不用。
Flink 指定允许再次迟到的时间
治标不治本,企业生产中一般不用。
Flink 收集迟到的数据单独处理
企业生产中应用较为广泛。
Flink 多并行度 Watermark
一个 window 可能会接受到多个 waterMark,我们以最小的为准。
Flink Window 概述
官网介绍
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/
Flink Window 分类
Flink 的 window 分为两种类型的 Window,分别是:Keyed Windows 和 Non-Keyed Windows,他们的使用方式不同:
// Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
// Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Window 的生命周期
- 当属于某个窗口的第一个元素到达的时候,就会创建一个窗口。
- 当时间(event or processing time)超过 window 的结束时间戳加上用户指定的允许延迟(Allowed Lateness)时,窗口将被完全删除。
- 每个 Window 之上,都绑定有一个 Trigger 或者一个 Function(ProcessWindowFunction, ReduceFunction, or AggregateFunction)用来执行窗口内数据的计算。
- 可以给 Window 指定一个 Evictor,它能够在 after the trigger fires 以及 before and/or after the function is applied 从窗口中删除元素。
Flink Window 类型
Flink 流批同一前后的 Window 分类:
tumblingwindows —— 滚动窗口
slidingwindows —— 滑动窗口
session windows —— 会话窗口
global windows —— 全局窗口
Flink Window 操作使用
高级玩法:自定义 Trigger、自定义 Evictor,读者可自行搜索相关文章与代码。
Flink Window 增量聚合
- reduce(ReduceFunction)
- aggregate(AggregateFunction)
- sum()
- min()
- max()
- sum()
Flink Window 全量聚合
- apply(WindowFunction)
- process(ProcessWindowFunction)
Flink Window Join
// 在 Flink 中对两个 DataStream 做 Join
// 1、指定两张表
// 2、指定这两张表的链接字段
stream.join(otherStream) // 两个流进行关联
.where(<KeySelector>) // 选择第一个流的key作为关联字段
.equalTo(<KeySelector>) // 选择第二个流的key作为关联字段
.window(<WindowAssigner>) // 设置窗口的类型
.apply(<JoinFunction>) // 对结果做操作 process apply = foreach
Tumbling Window Join
Sliding Window Join
Session Window Join
Interval Join
核心代码示例:
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});