Hadoop中MapReduce自定义输入格式

时间:2020-12-15 22:11:14

Hadoop 

MapReduce 自定义输入格式
一.首先写一个类继承RecordReader<?,?> 这个泛型类型对应输入的格式

定义成员变量
private FileSplit fs;  
private IntWritable key;
private Text value;
//导包是hadoop.util中的
private LineReader reader;
private int count;

实现五个方法

1.初始化方法 主要是获取文件系统 获取reader 以便读取一行数据
/*
* 初始化
*/
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
fs=(FileSplit) split;
Path path = fs.getPath();
Configuration conf = new Configuration();
//获取文件系统
FileSystem system = path.getFileSystem(conf);
FSDataInputStream in = system.open(path);
reader = new LineReader(in);
}

2.第二个方法  有一行数据就开始读取一行数据  没有数据返回false
/*
* 知识点1:这个方法会被调用多次   这个方法的返回值如果是true就会被调用一次
* 知识点2:每当nextKeyValue被调用一次 ,getCurrentKey,getCurrentValue也会被跟着调用一次
* 知识点3:getCurrentKey,getCurrentValue给Map传key,value
*/
public boolean nextKeyValue() throws IOException, InterruptedException {
key = new IntWritable();
value = new Text();
Text tmp = new Text();
//readLine是读取一行的内容
int length = reader.readLine(tmp);
//length=0表示读到最后一行 没数据了
if(length==0){
return false;
}else{
//把读取的每行数据给value
value = tmp;
count++;
key.set(count);
return true;
}
}

3.第三个和第四个方法 获取设置的key和value    以及progress(不用理会)
@Override
public IntWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}

4.最后一个方法  关闭reader  节省资源
@Override
public void close() throws IOException {
if(reader!=null){
reader=null;
}
}

      二.写一个类AuthInputFormat继承FileInputFormat<IntWritable, Text>  这个泛型中的值里的两个泛型是输入的key  value的类型

      实现RecordReader方法   new出来我们的第一个类的对象
      /**
* hadoop默认用的是LineRecordReader  把每行的行首偏移量和内容作为key value
*/
public RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new AuthReader();
}

     
      三.写Mapper

      四.写Driver  
在driver中注意 设置输入的格式  为我们自己写的类job.setInputFormatClass(AuthInputFormat.class);