MapReduce中自定义文件输出名

时间:2023-01-27 09:29:41

MR的输出结果默认为part-r-00000,我们可自定义易识别的名字替代part,如score-r-00000

[java] view plain copy
  1.     job.setOutputFormatClass(MyOut.class);  
  2.       
  3.       
  4.     MyOut.setOutputName(job, "score");//自定义输出名  
  5.       
  6.     job.waitForCompletion(true);  
  7.   
  8.        //自定义MyOut类继承TextOutPutFormat,并覆盖其中的setOutPutName方法,此方法在FileOutputFormat类中为protected修饰,不能直接调用  
  9. private static class MyOut extends TextOutputFormat{  
  10.       
  11.       protected static void setOutputName(JobContext job, String name) {  
  12.             job.getConfiguration().set(BASE_OUTPUT_NAME, name);  
  13.           }  
  14. }  

上述方法仅能简单的替代文件名part,要想全部自定义文件名,需要重写RecordWriter

[java] view plain copy
  1. /** 
  2.  * 自定义MyFileOutputFormat继承FileOutputFormat,实现其中的getRecordWriter方法; 
  3.  * 该方法返回一个RecordWriter对象,需先创建此对象,实现其中的write、close方法; 
  4.  * 文件通过FileSystem在write方法中写出到hdfs自定义文件中 
  5.  */  
  6. public class MyFileOutputFormat extends FileOutputFormat<Text, Text> {  
  7.   
  8.     @Override  
  9.     public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext job)  
  10.             throws IOException, InterruptedException {  
  11.           
  12.         FileSystem fs = FileSystem.newInstance(job.getConfiguration());  
  13.           
  14.         //自定义输出路径及文件名,把数学成绩和英语成绩分别输出到不同的文件中  
  15.         final FSDataOutputStream math = fs.create(new Path("/score/math.txt"));  
  16.         final FSDataOutputStream english = fs.create(new Path("/score/english.txt"));  
  17.   
  18.         RecordWriter<Text, Text> recordWriter = new RecordWriter<Text, Text>() {  
  19.   
  20.             @Override  
  21.             public void write(Text key, Text value) throws IOException,  
  22.                     InterruptedException {  
  23.                 if(key.toString().contains("math")){  
  24.                     math.writeUTF(key.toString());  
  25.                 }  
  26.                 if(key.toString().contains("english")){  
  27.                     english.writeUTF(key.toString());  
  28.                 }  
  29.                   
  30.             }  
  31.               
  32.             @Override  
  33.             public void close(TaskAttemptContext context) throws IOException,  
  34.                     InterruptedException {  
  35.                 if (math!=null) {  
  36.                     math.close();  
  37.                 }  
  38.                 if (english!=null) {  
  39.                     english.close();  
  40.                 }  
  41.                             }  
  42.         };  
  43.           
  44.         return recordWriter;  
  45.     }  
  46.