Flink Window中的window 和 apply方法详解

时间:2025-04-09 07:02:31
import org.apache.commons.lang3.time.FastDateFormat; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * @author liu a fu * @version 1.0 * @date 2021/3/7 0007 * @DESC TODO: 窗口统计案例演示:滑动时间窗口(Sliding Time Window),实时交通卡口车流量统计 * * 前面讲解Window窗口案例中,尤其是时间窗口TimeWindow中, * 没有看见`Window`大小(起始时间,结束时间),使用`apply`函数,就可以获取窗口大小。 */ public class StreamSlidingTimeWindowApply { public static void main(String[] args) throws Exception { // 1. 执行环境-env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2. 数据源-source DataStreamSource<String> inputDataStream = env.socketTextStream("", 9999); // 3. 数据转换-transformation /* 数据: a,3 a,2 a,7 d,9 b,6 a,5 b,3 e,7 e,4 */ SingleOutputStreamOperator<Tuple2<String, Integer>> mapDataStream = inputDataStream .filter(line -> null != line && line.trim().split(",").length == 2) .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String line) throws Exception { String[] split = line.trim().split(","); return new Tuple2<String, Integer>(split[0], Integer.parseInt(split[1])); } }); // TODO: 先按照卡口分组,再进行窗口操作,最后聚合累加 SingleOutputStreamOperator<Tuple2<String, Integer>> sumDataStream = mapDataStream .keyBy(0) // 下标索引,卡口编号 // public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) .timeWindow(Time.seconds(10),Time.seconds(5)) // TODO: 滑动时间窗口,每5s统计最近10s的数据 .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() { private FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") ; @Override public void apply(Tuple tuple, // 表示分组Key,封装到元组中 TimeWindow window, // 时间窗口,获取startTime和endTime Iterable<Tuple2<String, Integer>> input, // 窗口中数据,进行聚合操作 Collector<Tuple2<String, Integer>> out) throws Exception { // 获取分组Key String key = (String)((Tuple1)tuple).f0 ; // 获取窗口开始时间和结束时间 long start = window.getStart(); long end = window.getEnd(); // 输出内容 String output = "[" + format.format(start) + "~" + format.format(end) + "] -> " + key ; // 对窗口中的数据进行聚合 int count = 0 ; for (Tuple2<String, Integer> item: input){ count += item.f1 ; } // 最后输出 out.collect(Tuple2.of(output, count)); } }); // 4. 数据终端-sink sumDataStream.printToErr(); // 5. 触发执行-execute env.execute(StreamSlidingTimeWindowApply.class.getSimpleName()) ; } }