MapReduce对输入多文件的处理2自定义FileInputFormat类

时间:2021-03-02 18:21:05

多种自定义文件格式的文件输入处理

MultipleInputs可以让MR支持多种输入格式
比如我们有两种文件格式,那么我们就要有两套Record Class,RecordReader和InputFormat
InputFormat(extends FileInputFormat)--->RecordReader(extends RecordReader)--->RecordClass(implements Writable)
MultipleInpts需要不同的InputFormat,一种InputFormat使用一种RecordReader来读取文件并返回一种Record格式的值
这就是这三个典型的关系,也是map过程中涉及的三个步骤的工具和产物


数据准备
a文件
1t80
2t90
3t100
4t50
5t73


b文件
1tlilit3
2txiaomingt3
3tfeifeit3
4tzhangsant3
5tlisit3


t表示分隔符

设计思路

将t前面的Text表示给map将要输入的key
t后面的作为给map要输入的value

要求自定义实现InputFormat,输出key,value格式数据。以产生Map的输入的数据(key,value)

 

!!!三个文件步骤!!!

InputFormat(extends FileInputFormat)--->RecordReader(extends RecordReader)--->RecordClass(implements Writable)

本例是对两个文件操作

1.两个RecordClass类(实现Writable接口)

package test.mr.multiinputs2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/*
* 对map输入的value的预处理
* 对原始数据的预加工
*/
/*
* 第一张表数据
*/
public class FirstClass implements Writable {
private String value;

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public FirstClass() {
super();
// TODO Auto-generated constructor stub
}

public FirstClass(String value) {
super();
this.value = value;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.value);

}

@Override
public void readFields(DataInput in) throws IOException {
this.value = in.readUTF();
}

@Override
public String toString() {
return "FirstClasst" + value;
}
}
package test.mr.multiinputs2;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;/* * 对map输入的value的预处理 * 对原始数据的预加工 *//* * 第二张表数据 */public class SecondClass implements Writable {private String username;private int classNo;public SecondClass() {super();}public SecondClass(String username, int classNo) {super();this.username = username;this.classNo = classNo;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public int getClassNo() {return classNo;}public void setClassNo(int classNo) {this.classNo = classNo;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(username);out.writeInt(classNo);}@Overridepublic void readFields(DataInput in) throws IOException {this.username = in.readUTF();this.classNo = in.readInt();}@Overridepublic String toString() {return "SecondClasst" + username + "t" + classNo;}}


2.两个自定义RecordReader类(继承RecordReader类)

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class FirstRecordReader extends RecordReader<Text, FirstClass> {

// 定义一个真正读取split中文件的读取器
private LineRecordReader lineRecordReader = null;
private Text key = null;
private FirstClass value = null;

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
close();
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(split, context);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// 没有读取到东西
if (!lineRecordReader.nextKeyValue()) {
key = null;
value = null;
return false;
}
Text val = lineRecordReader.getCurrentValue();
String line = val.toString();
String[] str = line.split("t");
key = new Text(str[0]);
value = new FirstClass(str[1].trim()); // 实现对原始数据的预分割
return true;
}

// 读取key的当前值
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}

// 读取value的当前值
@Override
public FirstClass getCurrentValue() throws IOException,
InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}

@Override
public void close() throws IOException {
if (null != lineRecordReader) {
lineRecordReader.close();
lineRecordReader = null;
}
key = null;
value = null;
}

}


 

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class SecondRecordReader extends RecordReader<Text, SecondClass> {
// 定义一个真正读取split中文件的读取器
private LineRecordReader lineRecordReader = null;
private Text key = null;
private SecondClass value = null;

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
close();
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(split, context);
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!lineRecordReader.nextKeyValue()) {
key = null;
value = null;
return false;
}
Text val = lineRecordReader.getCurrentValue();
String line = val.toString();
String str[] = line.split("t");
key = new Text(str[0]);
value = new SecondClass(str[1], Integer.parseInt(str[2]));
return true;
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}

@Override
public SecondClass getCurrentValue() throws IOException,
InterruptedException {
return value;
}

@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}

@Override
public void close() throws IOException {
if (null != lineRecordReader) {
lineRecordReader.close();
lineRecordReader = null;
}
key = null;
value = null;
}

}


3.自定义两个FileInputFormat类(继承FileInputFormat类)

package test.mr.multiinputs2;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class FirstInputFormat extends FileInputFormat<Text, FirstClass> {

@Override
public RecordReader<Text, FirstClass> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new FirstRecordReader();
}
}


 

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class SecondInputFormat extends FileInputFormat<Text, SecondClass> {

@Override
public RecordReader<Text, SecondClass> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
return new SecondRecordReader();
}

}


4.两个Map类

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FirstMap extends Mapper<Text, FirstClass, Text, Text> {
@Override
protected void map(Text key, FirstClass value,
Mapper<Text, FirstClass, Text, Text>.Context context)
throws IOException, InterruptedException {
context.write(key, new Text(value.toString()));
}
}


 

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SecondMap extends Mapper<Text, SecondClass, Text, Text> {
@Override
protected void map(Text key, SecondClass value,
Mapper<Text, SecondClass, Text, Text>.Context context)
throws IOException, InterruptedException {
context.write(key, new Text(value.toString()));
}
}


5.reduce类

package test.mr.multiinputs2;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MultiInputsRedu extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
for (Text val : values) {
context.write(key, val);
}
}
}


6.Job类

/*
* 要求自定义实现InputFormat,输出key,value格式数据
*/
public class MultiInputsMain extends Configuration implements Tool {
private String input1 = null; // 定义的多个输入文件
private String input2 = null;
private String output = null;

@Override
public void setConf(Configuration conf) {

}

@Override
public Configuration getConf() {
return new Configuration();
}

@Override
public int run(String[] args) throws Exception {
setArgs(args);
checkParam();// 对参数进行检测

Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MultiInputsMain.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setReducerClass(MultiInputsRedu.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// MultipleInputs类添加文件路径
// 添加上自定义的fileInputFormat(分别是FirstInputFormat和SecondInputFormat)格式
MultipleInputs.addInputPath(job, new Path(input1),
FirstInputFormat.class, FirstMap.class);
MultipleInputs.addInputPath(job, new Path(input2),
SecondInputFormat.class, SecondMap.class);

FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
return 0;
}

private void checkParam() {
if (input1 == null || "".equals(input1.trim())) {
System.out.println("no input phone-data path");
userMaunel();
System.exit(-1);
}
if (input2 == null || "".equals(input2.trim())) {
System.out.println("no input user-data path");
userMaunel();
System.exit(-1);
}
if (output == null || "".equals(output.trim())) {
System.out.println("no output path");
userMaunel();
System.exit(-1);
}
}

// 用户手册
private void userMaunel() {
System.err.println("Usage:");
System.err.println("-i1 input \t phone data path.");
System.err.println("-i2 input \t user data path.");
System.err.println("-o output \t output data path.");
}

// 对属性进行赋值
// 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录)
private void setArgs(String[] args) {
for (int i = 0; i < args.length; i++) {
if ("-i1".equals(args[i])) {
input1 = args[++i]; // 将input1赋值为第一个文件的输入路径
} else if ("-i2".equals(args[i])) {
input2 = args[++i];
} else if ("-o".equals(args[i])) {
output = args[++i];
}
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
ToolRunner.run(conf, new MultiInputsMain(), args); // 调用run方法
}
}