Hadoop源代码分析(一)——输入(TextInputFormat,FileSplit,LineRecordReader)

时间:2021-09-26 17:32:00

简介

Hadoop在运行一个MapReduce作业的时候,我们需要为作业指定它的输入格式。

在默认情况下:

    - 输入格式:TextInputFormat

    - 输入分片:FileSplit

    - 记录读取器:LineRecordReader

这三个类对数据做了初始化的处理,最后以的形式将数据交给Mapper处理。本篇文章将会通过源代码分析数据处理的过程。

TextInputFormat

    /** An {@link InputFormat} for plain text files. Files are broken into lines. 
* Either linefeed or carriage-return are used to signal end of line. Keys are
* the position in the file, and values are the line of text.. */
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

@Override
public RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext context) {
return new LineRecordReader();
}

@Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}

}
这个类中主要有两个方法,一个是CreateRecordReader。用于新建一个记录读取器;另一个isSplitable返回一个布尔值决定数据是否需要分片。

那么这个类完全和数据源没有联系啊,别急。我们再看看它的父类FileInputFormat

FileInputFormat

这个类最重要的就是这个getSplits方法,这个方法实现了读取Hdfs上的数据,同时实现了Hadoop最重要的分片功能。

   //生成文件list,放进filesplits 
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //获取输入分片最小值,默认为1
long maxSize = getMaxSplitSize(job); //输入分片的最大值,默认为Long.MaxValue

// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>(); //初始化分组保存分片
List<FileStatus> files = listStatus(job); //获得所有输入文件列表
for (FileStatus file: files) { //读取输入文件列表所有文件的地址并且获取信息
Path path = file.getPath(); //获得文件路径
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) { //如果读取到的文件可以切分(即在TextFormat里面重写的函数返回值为true)则对其进行切分
long blockSize = file.getBlockSize(); //文件系统数据块大小(可以在hdfs-site.xml中定义dfs.block.size),默认为64M
long splitSize = computeSplitSize(blockSize, minSize, maxSize); //计算分片大小,取max(min(maxsize,blocksize),minsize),所以默认值为blocksize

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //如果未切割部分比分片的1.1倍要大,那么就创建一个FileSplit分片
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) { //剩余部分小于1.1倍,将整个部分作为FileSplit分片处理
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // 如果不可分割,整个文件作为FileSplit
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
以上的注释大概解释了分片的过程,那么所谓的片FileSplit又是什么呢?

FileSpilt

该类中,有三个重要变量

private Path file;//该输入分片所在的文件
private long start;//该输入分片在文件中的起始位置
private long length;//该输入分片的大小
private String[] hosts;//保存输入该分片的主机列表
每个变量都有其对应的Get方法,除此之外,这个类就剩下

public void write();
public void readFields();

这两个序列化和反序列化方法了。

相信聪明的读者看到这就可以理解所谓FileSplit根本没有读取文件的内容,它只是根据在FileInputFormat里面GetSplit的方法中的splitSize把文件分成一个个的索引片。这些所以片会为谁而用呢?就是LineRecordReader。

LineRecordReader

此处删减掉了一些不太重要的代码

public class LineRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class);
public static final String MAX_LINE_LENGTH =
"mapreduce.input.linerecordreader.line.maxlength";

private long start;//分片在文件中开始的位置
private long pos;//当前读取到的位置
private long end;//分片在文件中结束的位置
private LineReader in;//读取一行文本的行读取器
private int maxLineLength;
private LongWritable key;
private Text value;
private byte[] recordDelimiterBytes;//可自定义分隔符

public LineRecordReader() {
}

public LineRecordReader(byte[] recordDelimiter) {
this.recordDelimiterBytes = recordDelimiter;
}

//每有一个Split,该函数就运行一次
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);

 start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();

// 打开文件
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);

//压缩文件处理方法开始
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
 //压缩文件处理方法结束
 } else {
    //如果不是首行将会读取需要开始读取的位置
if (start != 0) {
skipFirstLine=true;
fileIn.seek(start);
}
 in = new LineReader( fileIn, job);
if(skipFirstLine){
start+=in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE,end-start);//跳到开始的位置
}
this.pos=start;//从此处开始读取
 } this.pos = start; } private int maxBytesToConsume(long pos); private long getFilePosition(); public boolean nextKeyValue() throws IOException { 
   if (key == null) {  

     key = new LongWritable();  

   } 

   key.set(pos);  //key设置为行偏移量

   if (value == null) { 

     value = new Text();  

   } 
//实例化Key和Value
   int newSize = 0

   while (getFilePosition() <= end) { 
//当没有读取到文件结尾时候循环进行
     newSize = in.readLine(value, maxLineLength,  

         Math.max(maxBytesToConsume(pos), maxLineLength));  //通过此方法将读取到的每一行内容保存到value的值内

     if (newSize == 0) { 

       break;  

     }  

     pos += newSize;  //pos移动

     if (newSize < maxLineLength) {  

       break

     }  

 

     // line too long. try again 

     LOG.info("Skipped line of size " + newSize + " at pos " +   

              (pos - newSize));  

   } 

   if (newSize == 0) {  

     key = null

     value = null;  

     return false;  

   } else { 

     return true;  

   }  

 }

public LongWritable getCurrentKey(); public Text getCurrentValue(); public float getProgress();获取进度 public synchronized void close();关闭函数}

通过读取每个分片,LineRecordReader用LineReader的readline()方法将每一行的值保存到value中,而key则设置为pos,即行偏移量(该分片离该文件头的距离pos-start)该键值对将会传输到map函数中执行mapreduce操作,且每一个分片都会开启一个map任务。

应用

1.可以自定义一个MyInputFormat继承FileInputFormat然后重写getSplit()方法,直接修改splitsize的值,可以自定义分片大小,对于小文件的处理有所帮助

2.LineRecordReader有一个自定义分隔符的构造方法

 public LineRecordReader(byte[] recordDelimiter) {
this.recordDelimiterBytes = recordDelimiter;
}
可以自定义一个RecordReader继承LineRecordReader,调用此构造方法,实现不同分隔符的输入