Flink DataStream之输出数据到File中

时间:2024-10-30 08:59:25
  • package test01;
  • import .common.;
  • import .configuration.Configuration;
  • import .configuration.MemorySize;
  • import .file.;
  • import ;
  • import ;
  • import ;
  • import .environment.StreamExecutionEnvironment;
  • import ;
  • import ;
  • import java.time.Duration;
  • public class TestOutputFile {
  • public static void main(String[] args) throws Exception {
  • StreamExecutionEnvironment executionEnvironment = (new Configuration());
  • (1);
  • //监听数据端口
  • DataStreamSource<String> dataSource = ("localhost", 9999);
  • //开启checkpoint,这样到了一定节点就会关闭文件,否则文件一直都是inprogress,此处设置的检查点是2秒。
  • (2000, CheckpointingMode.EXACTLY_ONCE);
  • //输出至文件
  • FileSink<String> fileSink = FileSink
  • //设置按行输出,指定输出的路径及编码格式,这里的泛型指定的是字符串类型。
  • .<String>forRowFormat(new Path("D:/IT/testfilnk"), new SimpleStringEncoder<>("UTF-8"))
  • //设置输出文件名的前缀和后缀
  • .withOutputFileConfig(()
  • .withPartPrefix("test-flink-output-")
  • .withPartSuffix(".log")
  • .build())
  • //设置文件滚动策略,这里设置的是20s和1024B(1KB),滚动策略满足其一就会重新写新文件。
  • .withRollingPolicy(
  • ()
  • .withRolloverInterval((20))
  • .withMaxPartSize(new MemorySize(1024))
  • .build()
  • )
  • .build();
  • (fileSink);
  • ();
  • }
  • }