Flink实现特定统计的归约聚合reduce操作

时间:2024-01-24 20:56:31


package com.rosh.flink.test;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.functions.ReduceFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

import java.util.List;

import java.util.Random;

/**

* 我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个

* 用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,

* 记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。

*/

public class TransReduceTest {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

//随机生成数据

Random random = new Random();

List<Integer> userIds = new ArrayList<>();

for (int i = 1; i <= 10; i++) {

userIds.add(random.nextInt(5));

}

DataStreamSource<Integer> userIdDS = env.fromCollection(userIds);

//每个ID访问记录一次

SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(new MapFunction<Integer, Tuple2<Integer, Long>>() {

@Override

public Tuple2<Integer, Long> map(Integer value) throws Exception {

return new Tuple2<>(value, 1L);

}

});

//统计每个user访问多少次

SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {

@Override

public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {

return new Tuple2<>(value1.f0, value1.f1 + value2.f1);

}

});

sumDS.print("sumDS  ->>>>>>>>>>>>>");

//把所有分区合并,求出最大的访问量

SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key -> true).reduce(new ReduceFunction<Tuple2<Integer, Long>>() {

@Override

public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {

if (value1.f1 > value2.f1) {

return value1;

} else {

return value2;

}

}

});

maxDS.print("maxDS ->>>>>>>>>>>");

env.execute("TransReduceTest");

}

}