1、概述
1)作用
自定义多并行的Source,即Source的并行度可以是1到多个。
2)实现
1.继承RichParallelSourceFunction,重写run()方法。
2、代码实现
import ;
import ;
import ;
import ;
import ;
public class CustomerParallelSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = ();
DataStreamSource<String> nums = (new ParallelSourceFunc());
("自定义ParallelSourceFunc得到的DataStream的并行度为:" + ());
();
();
}
private static class ParallelSourceFunc extends RichParallelSourceFunction<String> {
private boolean flag = true;
public ParallelSourceFunc() {
("构造方法执行了!!!!!!!!!!!");
}
/**
* 先调用Open方法
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
(indexOfThisSubtask + ": Open方法被调用了");
}
/**
* open方法调用完后再调用run方法
* run方法task启动后会执行一次
* 如果run方法一直不退出,就是一个无限的数据流
* 如果数据读取完了,run方法退出,就是一个有限的数据流,Source退出,job也停止了
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
(indexOfThisSubtask + " : Run方法被调用了");
Random random = new Random();
//获取当前SubTask的Index
while (flag) {
int i = (100);
(indexOfThisSubtask + " --> " + i);
(1000);
}
}
/**
* task cancel会执行一次
*/
@Override
public void cancel() {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
(indexOfThisSubtask + " : Cancel方法被调用了~~~~~");
flag = false;
}
/**
* 如果人为将job cancel先调用cancel方法再调用close方法
* 如果没有将job人为的cancel,任务停掉前一定会调用close方法
*
* @throws Exception
*/
@Override
public void close() throws Exception {
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
(indexOfThisSubtask + " : Close方法被调用了");
}
}
}