flink 最后一个窗口一直没有新数据,窗口不关闭问题

时间:2025-04-08 19:20:42
public static class WatermarkDemoFunction implements WatermarkStrategy<JSONObject>{ private Tuple2<Long,Boolean> state = Tuple2.of(0L,true); @Override public WatermarkGenerator<JSONObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<JSONObject>() { private long maxWatermark; @Override public void onEvent(JSONObject waterSensor, long l, WatermarkOutput watermarkOutput) { maxWatermark = Math.max(maxWatermark,waterSensor.getLong("ts")); state.f0 = System.currentTimeMillis(); System.out.println("maxWatermark is " + maxWatermark); state.f1 = false; } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { //乱序时间 long outOfTime = 3000L; if (maxWatermark - outOfTime <=0){ } else { // 10s内没有数据则关闭当前窗口 System.out.println("() - state.f0:" + (System.currentTimeMillis() - state.f0)); System.out.println("state.f1:" + state.f1); if (System.currentTimeMillis() - state.f0 >= 9000L && !state.f1){ watermarkOutput.emitWatermark(new Watermark(maxWatermark + 6000L)); state.f1 = true; System.out.println("触发窗口,maxWatermark + 6000L:" + (maxWatermark + 6000L)); } else { System.out.println("正常发送水印"); watermarkOutput.emitWatermark(new Watermark(maxWatermark - outOfTime)); } } } }; } }