十八、Flink自定义多并行Source

时间:2025-03-02 07:56:14
1、概述

1)作用

自定义多并行的Source,即Source的并行度可以是1到多个。

2)实现

1.继承RichParallelSourceFunction,重写run()方法。

2、代码实现
import ;
import ;
import ;
import ;
import ;

public class CustomerParallelSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = ();

        DataStreamSource<String> nums = (new ParallelSourceFunc());

        ("自定义ParallelSourceFunc得到的DataStream的并行度为:" + ());
        ();

        ();
    }

    private static class ParallelSourceFunc extends RichParallelSourceFunction<String> {
        private boolean flag = true;

        public ParallelSourceFunc() {
            ("构造方法执行了!!!!!!!!!!!");
        }

        /**
         * 先调用Open方法
         *
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            (indexOfThisSubtask + ": Open方法被调用了");
        }

        /**
         * open方法调用完后再调用run方法
         * run方法task启动后会执行一次
         * 如果run方法一直不退出,就是一个无限的数据流
         * 如果数据读取完了,run方法退出,就是一个有限的数据流,Source退出,job也停止了
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            (indexOfThisSubtask + " : Run方法被调用了");

            Random random = new Random();

            //获取当前SubTask的Index
            while (flag) {
                int i = (100);
                (indexOfThisSubtask + " --> " + i);

                (1000);
            }
        }


        /**
         * task cancel会执行一次
         */
        @Override
        public void cancel() {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            (indexOfThisSubtask + " : Cancel方法被调用了~~~~~");
            flag = false;
        }


        /**
         * 如果人为将job cancel先调用cancel方法再调用close方法
         * 如果没有将job人为的cancel,任务停掉前一定会调用close方法
         *
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            (indexOfThisSubtask + " : Close方法被调用了");
        }
    }
}