【Flink-1.17-教程】-【四】Flink DataStream API(5)转换算子(Transformation)【分流】-1)使用 filter 简单实现

时间:2024-02-01 12:45:28

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用 .filter() 方法进行筛选,就可以得到拆分之后的流了。

案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

public class SplitByFilterDemo {
    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.setParallelism(1);

        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

        /**
         * TODO 使用filter来实现分流效果
         * 缺点: 同一个数据,要被处理两遍(调用两次filter)
          */

        SingleOutputStreamOperator<String> even = socketDS.filter(value -> Integer.parseInt(value) % 2 == 0);
        SingleOutputStreamOperator<String> odd = socketDS.filter(value -> Integer.parseInt(value) % 2 == 1);


        even.print("偶数流");
        odd.print("奇数流");


        env.execute();
    }
}

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?