package
com.rosh.flink.test;
import
org.apache.flink.api.common.functions.MapFunction;
import
org.apache.flink.api.common.functions.ReduceFunction;
import
org.apache.flink.api.java.tuple.Tuple2;
import
org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
java.util.ArrayList;
import
java.util.List;
import
java.util.Random;
/**
* 我们将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个
* 用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,
* 记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。
*/
public
class
TransReduceTest {
public
static
void
main(String[] args)
throws
Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(
1
);
//随机生成数据
Random random =
new
Random();
List<Integer> userIds =
new
ArrayList<>();
for
(
int
i =
1
; i <=
10
; i++) {
userIds.add(random.nextInt(
5
));
}
DataStreamSource<Integer> userIdDS = env.fromCollection(userIds);
//每个ID访问记录一次
SingleOutputStreamOperator<Tuple2<Integer, Long>> mapDS = userIdDS.map(
new
MapFunction<Integer, Tuple2<Integer, Long>>() {
@Override
public
Tuple2<Integer, Long> map(Integer value)
throws
Exception {
return
new
Tuple2<>(value, 1L);
}
});
//统计每个user访问多少次
SingleOutputStreamOperator<Tuple2<Integer, Long>> sumDS = mapDS.keyBy(tuple -> tuple.f0).reduce(
new
ReduceFunction<Tuple2<Integer, Long>>() {
@Override
public
Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2)
throws
Exception {
return
new
Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});
sumDS.print(
"sumDS ->>>>>>>>>>>>>"
);
//把所有分区合并,求出最大的访问量
SingleOutputStreamOperator<Tuple2<Integer, Long>> maxDS = sumDS.keyBy(key ->
true
).reduce(
new
ReduceFunction<Tuple2<Integer, Long>>() {
@Override
public
Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2)
throws
Exception {
if
(value1.f1 > value2.f1) {
return
value1;
}
else
{
return
value2;
}
}
});
maxDS.print(
"maxDS ->>>>>>>>>>>"
);
env.execute(
"TransReduceTest"
);
}
}