public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
方法getSplits将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M
方法getSplits将每个split解析成records, 再依次将record解析成<K,V>对
InputFile --> splits --> <K,V>
系统常用的 InputFormat 又有哪些呢?

然而系统所提供的这几种固定的将 InputFile转换为<K,V>的方式有时候并不能满足我们的需求:
此时需要我们自定义 InputFormat ,从而使Hadoop框架按照我们预设的方式来将
在领会自定义 InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:
InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
RecordReader(interface), LineRecordReader(class)的关系
FileInputFormat implements InputFormat
TextInputFormat extends FileInputFormat
TextInputFormat.getRecordReader calls LineRecordReader
LineRecordReader implements RecordReader
getRecordReader方法即可,而该方法的核心是调用LineRecordReader(即由LineRecorderReader类来实现 "将每个split解析成records, 再依次将record解析成<K,V>对"),该方法实现了接口RecordReader
public interface RecordReader<K, V> {
boolean next(K key, V value) throws IOException;
K createKey();
V createValue();
long getPos() throws IOException;
public void close() throws IOException;
float getProgress() throws IOException;
示例,数据每一行为 “物体,x坐标,y坐标,z坐标”
ball 3.5,12.7,9.0
car 15,23.76,42.23
device 0.0,12.4,-67.1
每一行将要被解析为<Text, Point3D>(Point3D是我们在上一篇日志中自定义的数据类型)
public class ObjectPositionInputFormat extends FileInputFormat<Text, Point3D> { public RecordReader<Text, Point3D> getRecordReader( InputSplit input, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(input.toString()); return new ObjPosRecordReader(job, (FileSplit)input); } } class ObjPosRecordReader implements RecordReader<Text, Point3D> { private LineRecordReader lineReader; private LongWritable lineKey; private Text lineValue; public ObjPosRecordReader(JobConf job, FileSplit split) throws IOException { lineReader = new LineRecordReader(job, split); lineKey = lineReader.createKey(); lineValue = lineReader.createValue(); } public boolean next(Text key, Point3D value) throws IOException { // get the next line if (!lineReader.next(lineKey, lineValue)) { return false; } // parse the lineValue which is in the format: // objName, x, y, z String [] pieces = lineValue.toString().split(","); if (pieces.length != 4) { throw new IOException("Invalid record received"); } // try to parse floating point components of value float fx, fy, fz; try { fx = Float.parseFloat(pieces[1].trim()); fy = Float.parseFloat(pieces[2].trim()); fz = Float.parseFloat(pieces[3].trim()); } catch (NumberFormatException nfe) { throw new IOException("Error parsing floating point value in record"); } // now that we know we'll succeed, overwrite the output objects key.set(pieces[0].trim()); // objName is the output key. value.x = fx; value.y = fy; value.z = fz; return true; } public Text createKey() { return new Text(""); } public Point3D createValue() { return new Point3D(); } public long getPos() throws IOException { return lineReader.getPos(); } public void close() throws IOException { lineReader.close(); } public float getProgress() throws IOException { return lineReader.getProgress(); } }
public class ObjectPositionInputFormat extends FileInputFormat<Text, Point3D> { @Override protected boolean isSplitable(JobContext context, Path filename) { // TODO Auto-generated method stub return false; } @Override public RecordReader<Text, Point3D> createRecordReader(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new objPosRecordReader(); } public static class objPosRecordReader extends RecordReader<Text,Point3D>{ public LineReader in; public Text lineKey; public Point3D lineValue; public StringTokenizer token=null; public Text line; @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub System.out.println("key"); //lineKey.set(token.nextToken()); System.out.println("hello"); return lineKey; } @Override public Point3D getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return lineValue; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub FileSplit split=(FileSplit)input; Configuration job=context.getConfiguration(); Path file=split.getPath(); FileSystem fs=file.getFileSystem(job); FSDataInputStream filein=fs.open(file); in=new LineReader(filein,job); line=new Text(); lineKey=new Text(); lineValue=new Point3D(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub int linesize=in.readLine(line); if(linesize==0) return false; token=new StringTokenizer(line.toString()); String []temp=new String[2]; if(token.hasMoreElements()){ temp[0]=token.nextToken(); if(token.hasMoreElements()){ temp[1]=token.nextToken(); } } System.out.println(temp[0]); System.out.println(temp[1]); String []points=temp[1].split(","); System.out.println(points[0]); System.out.println(points[1]); System.out.println(points[2]); lineKey.set(temp[0]); lineValue.set(Float.parseFloat(points[0]),Float.parseFloat(points[1]), Float.parseFloat(points[2])); System.out.println("pp"); return true; } } }