MapReduce中自定义文件输出名

时间:2023-01-27 09:29:29

MR的输出结果默认为part-r-00000,我们可自定义易识别的名字替代part,如score-r-00000

job.setOutputFormatClass(MyOut.class);


MyOut.setOutputName(job, "score");//自定义输出名

job.waitForCompletion(true);

//自定义MyOut类继承TextOutPutFormat,并覆盖其中的setOutPutName方法,此方法在FileOutputFormat类中为protected修饰,不能直接调用
private static class MyOut extends TextOutputFormat{

protected static void setOutputName(JobContext job, String name) {
job.getConfiguration().set(BASE_OUTPUT_NAME, name);
}
}

上述方法仅能简单的替代文件名part,要想全部自定义文件名,需要重写RecordWriter

/**
* 自定义MyFileOutputFormat继承FileOutputFormat,实现其中的getRecordWriter方法;
* 该方法返回一个RecordWriter对象,需先创建此对象,实现其中的write、close方法;
* 文件通过FileSystem在write方法中写出到hdfs自定义文件中
*/
public class MyFileOutputFormat extends FileOutputFormat<Text, Text> {

@Override
public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {

FileSystem fs = FileSystem.newInstance(job.getConfiguration());

//自定义输出路径及文件名,把数学成绩和英语成绩分别输出到不同的文件中
final FSDataOutputStream math = fs.create(new Path("/score/math.txt"));
final FSDataOutputStream english = fs.create(new Path("/score/english.txt"));

RecordWriter<Text, Text> recordWriter = new RecordWriter<Text, Text>() {

@Override
public void write(Text key, Text value) throws IOException,
InterruptedException {
if(key.toString().contains("math")){
math.writeUTF(key.toString());
}
if(key.toString().contains("english")){
english.writeUTF(key.toString());
}

}

@Override
public void close(TaskAttemptContext context) throws IOException,
InterruptedException {
if (math!=null) {
math.close();
}
if (english!=null) {
english.close();
}
}
};

return recordWriter;
}

}