关于MultipleOutputFormat若干小记

时间:2023-01-26 04:03:05

使用版本是0.19.2,据说0.20以后,MultipleOutputFormat不好使,不知道真假

api可以参考

http://hadoop.apache.org/common/docs/r0.19.2/api/

但是说老实话,光看api有的时候有点混乱,每个函数到底影响些啥呢?

protected  K generateActualKey(K key, V value)
          Generate the actual key from the given key/value.
protected  V generateActualValue(K key, V value)
          Generate the actual value from the given key and value.
protected  String generateFileNameForKeyValue(K key, V value, String name)
          Generate the file output file name based on the given key and the leaf file name.
protected  String generateLeafFileName(String name)
          Generate the leaf name for the output file name.
protected abstract  RecordWriter<K,V> getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3)
           
protected  String getInputFileBasedOutputFileName(JobConf job, String name)
          Generate the outfile name based on a given anme and the input file name.
 RecordWriter<K,V> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3)
          Create a composite record writer that can write key/value data to different output files

 

现在简单介绍了下调用的过程

ReduceTask.java文件中

 1  public  void run(JobConf job,  final TaskUmbilicalProtocol umbilical)  throws IOException
 2 {
 3 ..........
 4 
 5 String finalName =  getOutputName(getPartition()); // return "part-" + NUMBER_FORMAT.format(partition);依据taskid产生诸如part-00000这样的文件名
 6 
 7 FileSystem fs = FileSystem.get(job);
 8 
 9  final RecordWriter out = job.getOutputFormat(). getRecordWriter(fs, job, finalName, reporter); // finalName=part-00000
10 
11 .............
12 }

 

 在MultipleOutputFormat.java里面,请注意这些个函数的调用顺序

 

     public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) throws IOException
    {
         final FileSystem myFS = fs;
         final String myName =  generateLeafFileName(name); //在这里可以硬性的指定文件名名称
         final JobConf myJob = job;
         final Progressable myProgressable = arg3;

         return  new RecordWriter<K, V>() {
             //  a cache storing the record writers for different output files.
            TreeMap<String, RecordWriter<K, V>> recordWriters =  new TreeMap<String, RecordWriter<K, V>>();

             public  void write(K key, V value)  throws IOException
            {
                 //  get the file name based on the key
                String keyBasedPath =  generateFileNameForKeyValue(key, value, myName); //一般依据key来决定文件名的时候 就在这个函数

                 //  get the file name based on the input file name
                String finalPath =  getInputFileBasedOutputFileName(myJob, keyBasedPath); //如果想依据jobconf配置来确定名称的话 就在这个函数里实现  finalPath 就是最终的文件名

                 //  get the actual key
                K actualKey =  generateActualKey(key, value);
                V actualValue =  generateActualValue(key, value);

                RecordWriter<K, V> rw =  this.recordWriters.get(finalPath);
                 if (rw ==  null)
                {
                     //  if we don't have the record writer yet for the final path, create one and add it to the cache
                      rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);//必须自己实现的
                     this.recordWriters.put(finalPath, rw);
                }
                rw.write(actualKey, actualValue);//
            };
 
             .......
 
        };
    }

 

 上述函数,除了getInputFileBasedOutputFileName,其他的红色函数基本上都只是简单的返回输入值.