flink window的early计算

时间:2020-12-31 05:04:24

转发请注明原创地址:https://www.cnblogs.com/dongxiao-yang/p/9391815.html

 

背景
flink 提供了完善的窗口机制, api中支持常见的三种窗口形式,滚动窗口,滑动窗口和session窗口。下面的图片显示了三种窗口的划分区别:
滚动窗口
flink window的early计算
滑动窗口
flink window的early计算
session窗口
flink window的early计算

Tumbing Windows:滚动窗口,窗口之间时间点不重叠。它是按照固定的时间,或固定的事件个数划分的,分别可以叫做滚动时间窗口和滚动事件窗口。
Sliding Windows:滑动窗口,窗口之间时间点存在重叠。对于某些应用,它们需要的时间是不间断的,需要平滑的进行窗口聚合。例如,可以每30s记算一次最近1分钟用户所购买的商品数量的总数,这个就是时间滑动窗口;或者每10个客户点击购买,然后就计算一下最近100个客户购买的商品的总和,这个就是事件滑动窗口。
Session Windows:会话窗口,经过一段设置时间无数据认为窗口完成。

在默认的场景下,所有的窗口都是到达时间语义上的windown end time后触发对整个窗口元素的计算,但是在部分场景的情况下,业务方需要在窗口时间没有结束的情况下也可以获得当前的聚合结果,比如每隔五分钟获取当前小时的sum值,这种情况下,官方提供了对于上述窗口的定制化计算器ContinuousEventTimeTriggerContinuousProcessingTimeTrigger

下面是一个使用ContinuousProcessingTimeTrigger的简单例子:

 

public class ContinueTriggerDemo {

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        String hostName = "localhost";
        Integer port = Integer.parseInt("8001");
        ;

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        // 从指定socket获取输入数据
        DataStream<String> text = env.socketTextStream(hostName, port);

        text.flatMap(new LineSplitter()) //数据语句分词
                .keyBy(0) // 流按照单词分区
                .window(TumblingProcessingTimeWindows.of(Time.seconds(120)))// 设置一个120s的滚动窗口
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))//窗口每统计一次当前计算结果
                .sum(1)// count求和
                .map(new Mapdemo())//输出结果加上时间戳
                .print();

        env.execute("Java WordCount from SocketTextStream Example");

    }

    /**
     * Implements the string tokenizer that splits sentences into words as a
     * user-defined FlatMapFunction. The function takes a line (String) and
     * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
     * Integer>).
     */
    public static final class LineSplitter implements
            FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

    public static final class Mapdemo
            implements
            MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {

        @Override
        public Tuple3<String, String, Integer> map(Tuple2<String, Integer> value)
                throws Exception {
            // TODO Auto-generated method stub

            DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String s = format2.format(new Date());

            return new Tuple3<String, String, Integer>(value.f0, s, value.f1);
        }
    }
    


}

在本地启动端口 :nc -lk 8001 并启动flink程序
输入数据:

           aa
           aa
           bb

观察程序数据结果日志

5> (aa,2018-07-30 16:08:20,2)
5> (bb,2018-07-30 16:08:20,1)
5> (aa,2018-07-30 16:08:40,2)
5> (bb,2018-07-30 16:08:40,1)
5> (aa,2018-07-30 16:09:00,2)
5> (bb,2018-07-30 16:09:00,1)
5> (aa,2018-07-30 16:09:20,2)
5> (bb,2018-07-30 16:09:20,1)
5> (aa,2018-07-30 16:09:40,2)
5> (bb,2018-07-30 16:09:40,1)

在上述输入后继续输入

    aa

日志结果统计为

5> (aa,2018-07-30 16:10:00,3)
5> (bb,2018-07-30 16:10:00,1)

根据日志数据可见,flink轻松实现了一个窗口时间长度为120s并每20s向下游发送一次窗口当前聚合结果的功能。