Flink(18):Flink之累加器

时间:2025-04-13 07:34:04

目录

0. 相关文章链接

1. Flink中的累加器概述

2. 编码步骤

3. 代码演示


0. 相关文章链接

Flink文章汇总

1. Flink中的累加器概述

        Flink中的累加器,与Mapreduce counter的应用场景类似可以很好地观察task在运行期间的数据变化,如在Flink job任务中的算子函数中操作累加器,在任务执行结束之后才能获得累加器的最终结果。

Flink有以下内置累加器每个累加器都实现了Accumulator接口。

  • IntCounter
  • LongCounter
  • DoubleCounter

2. 编码步骤

  1. 创建累加器:private IntCounter numLines = new IntCounter();
  2. 注册累加器:getRuntimeContext().addAccumulator("num-lines", );
  3. 使用累加器:(1);
  4. 获取累加器的结果:("num-lines")

3. 代码演示

import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;

/**
 * Author itcast
 * Desc 演示Flink累加器,统计处理的数据条数
 */
public class OtherAPI_Accumulator {
    public static void main(String[] args) throws Exception {
        //
        ExecutionEnvironment env = ();

        //
        DataSource<String> dataDS = ("aaa", "bbb", "ccc", "ddd");

        //
        MapOperator<String, String> result = (new RichMapFunction<String, String>() {
            //-1.创建累加器
            private IntCounter elementCounter = new IntCounter();
            Integer count = 0;

            @Override
            public void open(Configuration parameters) throws Exception {
                (parameters);
                //-2注册累加器
                getRuntimeContext().addAccumulator("elementCounter", elementCounter);
            }

            @Override
            public String map(String value) throws Exception {
                //-3.使用累加器
                (1);
                count+=1;
                ("不使用累加器统计的结果:"+count);
                return value;
            }
        }).setParallelism(2);

        //
        ("data/output/test", );

        //
        //-4.获取加强结果
        JobExecutionResult jobResult = ();
        int nums = ("elementCounter");
        ("使用累加器统计的结果:"+nums);
    }
}

此博客根据某马2020年贺岁视频改编而来:【狂野大数据】Flink1.12从入门到精通#2021#流批一体#黑马程序员#大数据_哔哩哔哩_bilibili

注:其他相关文章链接由此进 -> Flink文章汇总