十二、Flink自定义 FlatMap 方法

时间:2025-03-02 07:55:19
1、概述

1)作用

flatMap是将数据先map在打平,输入一个元素,可以输出0到多个元素

2)使用

1.匿名内部类

表达式

3.实现FlatMapFunction接口

4.继承RichFlatMapFunction

2、代码实现
import ;
import ;
import ;
import ;
import ;
import ;
import ;

public class MyFlatmapDemo {
    public static void main(String[] args) throws Exception {
        // 创建执行环境的配置,添加webUI的端口号
        Configuration configuration = new Configuration();
        ("", 8081);
        StreamExecutionEnvironment env = (configuration);

        // 从端口接入数据
        DataStreamSource<String> lines = ("localhost", 8888);

        // 1、匿名内部类
        SingleOutputStreamOperator<String> flatMapStream1 = (new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                for (String word : (" ")) {
                    (word);
                }
            }
        });

        // 2、lambda表达式
        SingleOutputStreamOperator<String> flatMapStream2 = ((FlatMapFunction<String, String>) (value, out) -> {
            for (String word : (" ")) {
                (word);
            }
        }).returns();

        // 3、继承 FlatmapFunction
        SingleOutputStreamOperator<String> flatMapStream3 = (new MyFlatmapFunc());

        // 4、继承 RichFlatMapFunction
        SingleOutputStreamOperator<String> flatMapStream4 = (new MyRichFlatMapFunc());

        ();
        ();
        ();
        ();

        ();
    }
}

class MyFlatmapFunc implements FlatMapFunction<String,String>{
    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String word : (" ")) {
            (word);
        }
    }
}

class MyRichFlatMapFunc extends RichFlatMapFunction<String,String>{
    @Override
    public void open(Configuration parameters) throws Exception {
        (parameters);
        ("可以在rich方法中创建状态和定时器");
    }

    @Override
    public void close() throws Exception {
        ();
    }

    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {
        for (String word : (" ")) {
            (word);
        }
    }
}
3、执行结果

1)输入测试数据

nc -lk 8888
hello world

控制台输出执行结果

3> hello
3> world
5> hello
5> world
8> hello
8> world
7> hello
7> world