Hadoop
MapReduce 自定义输入格式
一.首先写一个类继承RecordReader<?,?> 这个泛型类型对应输入的格式
定义成员变量
private FileSplit fs;
private IntWritable key;
private Text value;
//导包是hadoop.util中的
private LineReader reader;
private int count;
实现五个方法
1.初始化方法 主要是获取文件系统 获取reader 以便读取一行数据
/*
* 初始化
*/
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
fs=(FileSplit) split;
Path path = fs.getPath();
Configuration conf = new Configuration();
//获取文件系统
FileSystem system = path.getFileSystem(conf);
FSDataInputStream in = system.open(path);
reader = new LineReader(in);
}
2.第二个方法 有一行数据就开始读取一行数据 没有数据返回false
/*
* 知识点1:这个方法会被调用多次 这个方法的返回值如果是true就会被调用一次
* 知识点2:每当nextKeyValue被调用一次 ,getCurrentKey,getCurrentValue也会被跟着调用一次
* 知识点3:getCurrentKey,getCurrentValue给Map传key,value
*/
public boolean nextKeyValue() throws IOException, InterruptedException {
key = new IntWritable();
value = new Text();
Text tmp = new Text();
//readLine是读取一行的内容
int length = reader.readLine(tmp);
//length=0表示读到最后一行 没数据了
if(length==0){
return false;
}else{
//把读取的每行数据给value
value = tmp;
count++;
key.set(count);
return true;
}
}
3.第三个和第四个方法 获取设置的key和value 以及progress(不用理会)
@Override
public IntWritable getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
4.最后一个方法 关闭reader 节省资源
@Override
public void close() throws IOException {
if(reader!=null){
reader=null;
}
}
二.写一个类AuthInputFormat继承FileInputFormat<IntWritable, Text> 这个泛型中的值里的两个泛型是输入的key value的类型
实现RecordReader方法 new出来我们的第一个类的对象
/**
* hadoop默认用的是LineRecordReader 把每行的行首偏移量和内容作为key value
*/
public RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new AuthReader();
}
三.写Mapper
四.写Driver
在driver中注意 设置输入的格式 为我们自己写的类job.setInputFormatClass(AuthInputFormat.class);