package test01;
import .common.;
import .configuration.Configuration;
import .configuration.MemorySize;
import .file.;
import ;
import ;
import ;
import .environment.StreamExecutionEnvironment;
import ;
import ;
import java.time.Duration;
public class TestOutputFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = (new Configuration());
(1);
//监听数据端口
DataStreamSource<String> dataSource = ("localhost", 9999);
//开启checkpoint,这样到了一定节点就会关闭文件,否则文件一直都是inprogress,此处设置的检查点是2秒。
(2000, CheckpointingMode.EXACTLY_ONCE);
//输出至文件
FileSink<String> fileSink = FileSink
//设置按行输出,指定输出的路径及编码格式,这里的泛型指定的是字符串类型。
.<String>forRowFormat(new Path("D:/IT/testfilnk"), new SimpleStringEncoder<>("UTF-8"))
//设置输出文件名的前缀和后缀
.withOutputFileConfig(()
.withPartPrefix("test-flink-output-")
.withPartSuffix(".log")
.build())
//设置文件滚动策略,这里设置的是20s和1024B(1KB),滚动策略满足其一就会重新写新文件。
.withRollingPolicy(
()
.withRolloverInterval((20))
.withMaxPartSize(new MemorySize(1024))
.build()
)
.build();
(fileSink);
();
}
}