1、概述
1)作用
flatMap是将数据先map在打平,输入一个元素,可以输出0到多个元素
2)使用
1.匿名内部类
表达式
3.实现FlatMapFunction接口
4.继承RichFlatMapFunction
2、代码实现
import ;
import ;
import ;
import ;
import ;
import ;
import ;
public class MyFlatmapDemo {
public static void main(String[] args) throws Exception {
// 创建执行环境的配置,添加webUI的端口号
Configuration configuration = new Configuration();
("", 8081);
StreamExecutionEnvironment env = (configuration);
// 从端口接入数据
DataStreamSource<String> lines = ("localhost", 8888);
// 1、匿名内部类
SingleOutputStreamOperator<String> flatMapStream1 = (new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : (" ")) {
(word);
}
}
});
// 2、lambda表达式
SingleOutputStreamOperator<String> flatMapStream2 = ((FlatMapFunction<String, String>) (value, out) -> {
for (String word : (" ")) {
(word);
}
}).returns();
// 3、继承 FlatmapFunction
SingleOutputStreamOperator<String> flatMapStream3 = (new MyFlatmapFunc());
// 4、继承 RichFlatMapFunction
SingleOutputStreamOperator<String> flatMapStream4 = (new MyRichFlatMapFunc());
();
();
();
();
();
}
}
class MyFlatmapFunc implements FlatMapFunction<String,String>{
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : (" ")) {
(word);
}
}
}
class MyRichFlatMapFunc extends RichFlatMapFunction<String,String>{
@Override
public void open(Configuration parameters) throws Exception {
(parameters);
("可以在rich方法中创建状态和定时器");
}
@Override
public void close() throws Exception {
();
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for (String word : (" ")) {
(word);
}
}
}
3、执行结果
1)输入测试数据
nc -lk 8888
hello world
控制台输出执行结果
3> hello
3> world
5> hello
5> world
8> hello
8> world
7> hello
7> world