FlinkAPI_Environment_输入源_算子转化流程

时间:2025-02-14 18:10:48
import com.regotto.entity.SensorReading; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.util.Collector; import java.util.Arrays; import java.util.Collections; import java.util.Properties; import java.util.Random; /** * @author regotto */ public class TransformTest { private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); public static void main(String[] args) throws Exception { DataStream<String> dataStream = env.readTextFile("D:\\"); env.setParallelism(1); // map, 映射操作, 将数据映射封装为 SensorReading DataStream<SensorReading> map = dataStream.map(value -> { String[] fields = value.split(","); return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2])); }); map.print("map"); // flatMap, 将原来的数据打散然后映射 dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for (String s : value.split(",")) { out.collect(s); } } }).print("flatMap"); // filter, 过滤器 dataStream.filter((FilterFunction<String>) value -> value.startsWith("1")).print("filter"); // map 进行滚动聚合求当前温度最大值, keyBy 可以用指定位置, 属性, 自定义 keySelector KeyedStream<SensorReading, String> keyedStream = map.keyBy(SensorReading::getId); keyedStream.max("temperature").print("max temperature"); // reduce 聚合, 求最大温度下的最大时间戳记录 keyedStream.reduce(new ReduceFunction<SensorReading>() { @Override public SensorReading reduce(SensorReading curData, SensorReading newData) throws Exception { return new SensorReading(curData.getId(), newData.getTimestamp(), Math.max(curData.getTemperature(), newData.getTemperature())); } }).print("最大温度下的最新时间"); // split&select 根据温度把数据分为高温, 低温 SplitStream<SensorReading> splitStream = keyedStream.split(new OutputSelector<SensorReading>() { @Override public Iterable<String> select(SensorReading value) { return value.getTemperature() > 36 ? Collections.singletonList("high") : Collections.singletonList("low"); } }); DataStream<SensorReading> high = splitStream.select("high"); DataStream<SensorReading> low = splitStream.select("low"); DataStream<SensorReading> all = splitStream.select("high", "low"); high.print("高温流"); low.print("低温流"); all.print("all"); // connect&coMap, 将高温处理为二元组, 与低温进行合并, 输出状态信息 ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStream = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), value.getTemperature()); } }).connect(low); connectedStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() { @Override public Object map1(Tuple2<String, Double> value) throws Exception { return new Tuple3<>(value.f0, value.f1, "高温报警"); } @Override public Object map2(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), "温度正常"); } }).print("connect&coMap"); // 使用 union 合并 hig, low high.union(low, all).print("union"); env.execute(); } }