import argparse import logging import sys from pyflink.common import WatermarkStrategy, Encoder, Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.BATCH) # write all the data to one file env.set_parallelism(1) ds = env.from_source( source=FileSource.for_record_stream_format(StreamFormat.text_line_format(), './test.csv') .process_static_file_set().build(), watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name="aaaa" ) ds.print() env.execute()