对于DataStream,可以选择如下的Strategy,
/**
* Sets the partitioning of the {@link DataStream} so that the output elements
* are broadcasted to every parallel instance of the next operation.
*
* @return The DataStream with broadcast partitioning set.
*/
public DataStream<T> broadcast() {
return setConnectionType(new BroadcastPartitioner<T>());
} /**
* Sets the partitioning of the {@link DataStream} so that the output elements
* are shuffled uniformly randomly to the next operation.
*
* @return The DataStream with shuffle partitioning set.
*/
@PublicEvolving
public DataStream<T> shuffle() {
return setConnectionType(new ShufflePartitioner<T>());
} /**
* Sets the partitioning of the {@link DataStream} so that the output elements
* are forwarded to the local subtask of the next operation.
*
* @return The DataStream with forward partitioning set.
*/
public DataStream<T> forward() {
return setConnectionType(new ForwardPartitioner<T>());
} /**
* Sets the partitioning of the {@link DataStream} so that the output elements
* are distributed evenly to instances of the next operation in a round-robin
* fashion.
*
* @return The DataStream with rebalance partitioning set.
*/
public DataStream<T> rebalance() {
return setConnectionType(new RebalancePartitioner<T>());
} /**
* Sets the partitioning of the {@link DataStream} so that the output elements
* are distributed evenly to a subset of instances of the next operation in a round-robin
* fashion.
*
* <p>The subset of downstream operations to which the upstream operation sends
* elements depends on the degree of parallelism of both the upstream and downstream operation.
* For example, if the upstream operation has parallelism 2 and the downstream operation
* has parallelism 4, then one upstream operation would distribute elements to two
* downstream operations while the other upstream operation would distribute to the other
* two downstream operations. If, on the other hand, the downstream operation has parallelism
* 2 while the upstream operation has parallelism 4 then two upstream operations will
* distribute to one downstream operation while the other two upstream operations will
* distribute to the other downstream operations.
*
* <p>In cases where the different parallelisms are not multiples of each other one or several
* downstream operations will have a differing number of inputs from upstream operations.
*
* @return The DataStream with rescale partitioning set.
*/
@PublicEvolving
public DataStream<T> rescale() {
return setConnectionType(new RescalePartitioner<T>());
} /**
* Sets the partitioning of the {@link DataStream} so that the output values
* all go to the first instance of the next processing operator. Use this
* setting with care since it might cause a serious performance bottleneck
* in the application.
*
* @return The DataStream with shuffle partitioning set.
*/
@PublicEvolving
public DataStream<T> global() {
return setConnectionType(new GlobalPartitioner<T>());
}
逻辑都是由Partitoner来实现的,
BroadcastPartitioner
public class BroadcastPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L; int[] returnArray;
boolean set;
int setNumber; @Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
if (set && setNumber == numberOfOutputChannels) {
return returnArray;
} else {
this.returnArray = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; i++) {
returnArray[i] = i;
}
set = true;
setNumber = numberOfOutputChannels;
return returnArray;
}
}
int[] returnArray, 数组,select的channel id
broadcast,要发到所有channel,所以returnArray要包含所有的channel id
ShufflePartitioner,随机选一个channel
public class ShufflePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L; private Random random = new Random(); private int[] returnArray = new int[1]; @Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
returnArray[0] = random.nextInt(numberOfOutputChannels);
return returnArray;
}
ForwardPartitioner,对于forward,应该只有一个输出channel,所以就选第一个channel就可以
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L; private int[] returnArray = new int[] {0}; @Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
return returnArray;
}
RebalancePartitioner,就是roundrobin,循环选择
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L; private int[] returnArray = new int[] {-1}; @Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
return this.returnArray;
}
GlobalPartitioner,默认选第一个
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L; private int[] returnArray = new int[] { 0 }; @Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
int numberOfOutputChannels) {
return returnArray;
}
在RecordWriter中,emit会调用selectChannels来选取channel
public void emit(T record) throws IOException, InterruptedException {
for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
sendToTarget(record, targetChannel);
}
}