FlinkAPI_Environment_输入源_算子转化流程
import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.Random;
/**
* @author regotto
*/
public class TransformTest {
private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static void main(String[] args) throws Exception {
DataStream<String> dataStream = env.readTextFile("D:\\");
env.setParallelism(1);
// map, 映射操作, 将数据映射封装为 SensorReading
DataStream<SensorReading> map = dataStream.map(value -> {
String[] fields = value.split(",");
return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));
});
map.print("map");
// flatMap, 将原来的数据打散然后映射
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String s : value.split(",")) {
out.collect(s);
}
}
}).print("flatMap");
// filter, 过滤器
dataStream.filter((FilterFunction<String>) value -> value.startsWith("1")).print("filter");
// map 进行滚动聚合求当前温度最大值, keyBy 可以用指定位置, 属性, 自定义 keySelector
KeyedStream<SensorReading, String> keyedStream = map.keyBy(SensorReading::getId);
keyedStream.max("temperature").print("max temperature");
// reduce 聚合, 求最大温度下的最大时间戳记录
keyedStream.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading curData, SensorReading newData) throws Exception {
return new SensorReading(curData.getId(),
newData.getTimestamp(),
Math.max(curData.getTemperature(),
newData.getTemperature()));
}
}).print("最大温度下的最新时间");
// split&select 根据温度把数据分为高温, 低温
SplitStream<SensorReading> splitStream = keyedStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading value) {
return value.getTemperature() > 36 ? Collections.singletonList("high") : Collections.singletonList("low");
}
});
DataStream<SensorReading> high = splitStream.select("high");
DataStream<SensorReading> low = splitStream.select("low");
DataStream<SensorReading> all = splitStream.select("high", "low");
high.print("高温流");
low.print("低温流");
all.print("all");
// connect&coMap, 将高温处理为二元组, 与低温进行合并, 输出状态信息
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStream =
high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
}).connect(low);
connectedStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
@Override
public Object map1(Tuple2<String, Double> value) throws Exception {
return new Tuple3<>(value.f0, value.f1, "高温报警");
}
@Override
public Object map2(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), "温度正常");
}
}).print("connect&coMap");
// 使用 union 合并 hig, low
high.union(low, all).print("union");
env.execute();
}
}