关于mapreduce的切割

时间:2020-12-19 21:45:38

先看minSize,有两个参数,getFormatMinSplitSize()和getMinSplitSize(job)
第一个参数固定返回1
protected long getFormatMinSplitSize() {
  return 1;
}
第二个参数返回的值是配置参数:mapreduce.input.fileinputformat.split.minsize
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}

minSize取得是两个参数的最大值
再看maxSize,返回的值是配置参数:mapreduce.input.fileinputformat.split.maxsize
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}

 List<FileStatus> files = listStatus(job);是输入文件的列表
  看这两句
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
blockSize是设置的每个块的大小
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}

这里得出splitSize的大小
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}

循环生成spilt,最后剩余的字节数/splitSize>SPLIT_SLOP(值为1.1),那就分成两个split,反之则分成一个。
得出切割的spilt个数