目录
0. 相关文章链接
1. Flink中的累加器概述
2. 编码步骤
3. 代码演示
0. 相关文章链接
Flink文章汇总
1. Flink中的累加器概述
Flink中的累加器,与Mapreduce counter的应用场景类似,可以很好地观察task在运行期间的数据变化,如在Flink job任务中的算子函数中操作累加器,在任务执行结束之后才能获得累加器的最终结果。
Flink有以下内置累加器,每个累加器都实现了Accumulator接口。
- IntCounter
- LongCounter
- DoubleCounter
2. 编码步骤
- 创建累加器:private IntCounter numLines = new IntCounter();
- 注册累加器:getRuntimeContext().addAccumulator("num-lines", );
- 使用累加器:(1);
- 获取累加器的结果:("num-lines")
3. 代码演示
import ;
import ;
import ;
import ;
import ;
import ;
import ;
import ;
/**
* Author itcast
* Desc 演示Flink累加器,统计处理的数据条数
*/
public class OtherAPI_Accumulator {
public static void main(String[] args) throws Exception {
//
ExecutionEnvironment env = ();
//
DataSource<String> dataDS = ("aaa", "bbb", "ccc", "ddd");
//
MapOperator<String, String> result = (new RichMapFunction<String, String>() {
//-1.创建累加器
private IntCounter elementCounter = new IntCounter();
Integer count = 0;
@Override
public void open(Configuration parameters) throws Exception {
(parameters);
//-2注册累加器
getRuntimeContext().addAccumulator("elementCounter", elementCounter);
}
@Override
public String map(String value) throws Exception {
//-3.使用累加器
(1);
count+=1;
("不使用累加器统计的结果:"+count);
return value;
}
}).setParallelism(2);
//
("data/output/test", );
//
//-4.获取加强结果
JobExecutionResult jobResult = ();
int nums = ("elementCounter");
("使用累加器统计的结果:"+nums);
}
}
此博客根据某马2020年贺岁视频改编而来:【狂野大数据】Flink1.12从入门到精通#2021#流批一体#黑马程序员#大数据_哔哩哔哩_bilibili
注:其他相关文章链接由此进 -> Flink文章汇总