简介
Hadoop在运行一个MapReduce作业的时候,我们需要为作业指定它的输入格式。
在默认情况下:
- 输入格式:TextInputFormat
- 输入分片:FileSplit
- 记录读取器:LineRecordReader
这三个类对数据做了初始化的处理,最后以的形式将数据交给Mapper处理。本篇文章将会通过源代码分析数据处理的过程。
TextInputFormat
/** An {@link InputFormat} for plain text files. Files are broken into lines.
* Either linefeed or carriage-return are used to signal end of line. Keys are
* the position in the file, and values are the line of text.. */
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
return new LineRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
}
这个类中主要有两个方法,一个是CreateRecordReader。用于新建一个记录读取器;另一个isSplitable返回一个布尔值决定数据是否需要分片。
那么这个类完全和数据源没有联系啊,别急。我们再看看它的父类FileInputFormat
FileInputFormat
这个类最重要的就是这个getSplits方法,这个方法实现了读取Hdfs上的数据,同时实现了Hadoop最重要的分片功能。
//生成文件list,放进filesplits
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //获取输入分片最小值,默认为1
long maxSize = getMaxSplitSize(job); //输入分片的最大值,默认为Long.MaxValue
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>(); //初始化分组保存分片
List<FileStatus> files = listStatus(job); //获得所有输入文件列表
for (FileStatus file: files) { //读取输入文件列表所有文件的地址并且获取信息
Path path = file.getPath(); //获得文件路径
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) { //如果读取到的文件可以切分(即在TextFormat里面重写的函数返回值为true)则对其进行切分
long blockSize = file.getBlockSize(); //文件系统数据块大小(可以在hdfs-site.xml中定义dfs.block.size),默认为64M
long splitSize = computeSplitSize(blockSize, minSize, maxSize); //计算分片大小,取max(min(maxsize,blocksize),minsize),所以默认值为blocksize
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //如果未切割部分比分片的1.1倍要大,那么就创建一个FileSplit分片
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) { //剩余部分小于1.1倍,将整个部分作为FileSplit分片处理
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // 如果不可分割,整个文件作为FileSplit
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
以上的注释大概解释了分片的过程,那么所谓的片FileSplit又是什么呢?
FileSpilt
该类中,有三个重要变量
private Path file;//该输入分片所在的文件
private long start;//该输入分片在文件中的起始位置
private long length;//该输入分片的大小
private String[] hosts;//保存输入该分片的主机列表
每个变量都有其对应的Get方法,除此之外,这个类就剩下
public void write();
public void readFields();
这两个序列化和反序列化方法了。
相信聪明的读者看到这就可以理解所谓FileSplit根本没有读取文件的内容,它只是根据在FileInputFormat里面GetSplit的方法中的splitSize把文件分成一个个的索引片。这些所以片会为谁而用呢?就是LineRecordReader。
LineRecordReader
此处删减掉了一些不太重要的代码
public class LineRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
public static final String MAX_LINE_LENGTH =
"mapreduce.input.linerecordreader.line.maxlength";
private long start;//分片在文件中开始的位置
private long pos;//当前读取到的位置
private long end;//分片在文件中结束的位置
private LineReader in;//读取一行文本的行读取器
private int maxLineLength;
private LongWritable key;
private Text value;
private byte[] recordDelimiterBytes;//可自定义分隔符
public LineRecordReader() {
}
public LineRecordReader(byte[] recordDelimiter) {
this.recordDelimiterBytes = recordDelimiter;
}
//每有一个Split,该函数就运行一次
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// 打开文件
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
//压缩文件处理方法开始
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}//压缩文件处理方法结束
} else {//如果不是首行将会读取需要开始读取的位置
in = new LineReader( fileIn, job);
if (start != 0) {
skipFirstLine=true;
fileIn.seek(start);
}
if(skipFirstLine){
start+=in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE,end-start);//跳到开始的位置
}
this.pos=start;//从此处开始读取
} this.pos = start; } private int maxBytesToConsume(long pos); private long getFilePosition(); public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos); //key设置为行偏移量
if (value == null) {
value = new Text();
}
//实例化Key和Value
int newSize = 0;
while (getFilePosition() <= end) { //当没有读取到文件结尾时候循环进行
newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength)); //通过此方法将读取到的每一行内容保存到value的值内
if (newSize == 0) {
break;
}
pos += newSize; //pos移动
if (newSize < maxLineLength) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
public LongWritable getCurrentKey(); public Text getCurrentValue(); public float getProgress();获取进度 public synchronized void close();关闭函数}通过读取每个分片,LineRecordReader用LineReader的readline()方法将每一行的值保存到value中,而key则设置为pos,即行偏移量(该分片离该文件头的距离pos-start)该键值对将会传输到map函数中执行mapreduce操作,且每一个分片都会开启一个map任务。应用
1.可以自定义一个MyInputFormat继承FileInputFormat然后重写getSplit()方法,直接修改splitsize的值,可以自定义分片大小,对于小文件的处理有所帮助
2.LineRecordReader有一个自定义分隔符的构造方法
public LineRecordReader(byte[] recordDelimiter) {
可以自定义一个RecordReader继承LineRecordReader,调用此构造方法,实现不同分隔符的输入
this.recordDelimiterBytes = recordDelimiter;
}