使用版本是0.19.2,据说0.20以后,MultipleOutputFormat不好使,不知道真假
api可以参考
http://hadoop.apache.org/common/docs/r0.19.2/api/
但是说老实话,光看api有的时候有点混乱,每个函数到底影响些啥呢?
protected K |
generateActualKey(K key, V value) Generate the actual key from the given key/value. |
protected V |
generateActualValue(K key, V value) Generate the actual value from the given key and value. |
protected String |
generateFileNameForKeyValue(K key, V value, String name) Generate the file output file name based on the given key and the leaf file name. |
protected String |
generateLeafFileName(String name) Generate the leaf name for the output file name. |
protected abstract RecordWriter<K,V> |
getBaseRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) |
protected String |
getInputFileBasedOutputFileName(JobConf job, String name) Generate the outfile name based on a given anme and the input file name. |
RecordWriter<K,V> |
getRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3) Create a composite record writer that can write key/value data to different output files |
现在简单介绍了下调用的过程
ReduceTask.java文件中
1
public
void run(JobConf job,
final TaskUmbilicalProtocol umbilical)
throws IOException
2 {
3 ..........
4
5 String finalName = getOutputName(getPartition()); // return "part-" + NUMBER_FORMAT.format(partition);依据taskid产生诸如part-00000这样的文件名
6
7 FileSystem fs = FileSystem.get(job);
8
9 final RecordWriter out = job.getOutputFormat(). getRecordWriter(fs, job, finalName, reporter); // finalName=part-00000
10
11 .............
12 }
2 {
3 ..........
4
5 String finalName = getOutputName(getPartition()); // return "part-" + NUMBER_FORMAT.format(partition);依据taskid产生诸如part-00000这样的文件名
6
7 FileSystem fs = FileSystem.get(job);
8
9 final RecordWriter out = job.getOutputFormat(). getRecordWriter(fs, job, finalName, reporter); // finalName=part-00000
10
11 .............
12 }
在MultipleOutputFormat.java里面,请注意这些个函数的调用顺序
public RecordWriter<K, V> getRecordWriter(FileSystem fs, JobConf job, String name, Progressable arg3)
throws IOException
{
final FileSystem myFS = fs;
final String myName = generateLeafFileName(name); //在这里可以硬性的指定文件名名称
final JobConf myJob = job;
final Progressable myProgressable = arg3;
return new RecordWriter<K, V>() {
// a cache storing the record writers for different output files.
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
public void write(K key, V value) throws IOException
{
// get the file name based on the key
String keyBasedPath = generateFileNameForKeyValue(key, value, myName); //一般依据key来决定文件名的时候 就在这个函数
// get the file name based on the input file name
String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath); //如果想依据jobconf配置来确定名称的话 就在这个函数里实现 finalPath 就是最终的文件名
// get the actual key
K actualKey = generateActualKey(key, value);
V actualValue = generateActualValue(key, value);
RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
if (rw == null)
{
// if we don't have the record writer yet for the final path, create one and add it to the cache
rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);//必须自己实现的
this.recordWriters.put(finalPath, rw);
}
rw.write(actualKey, actualValue);//
};
{
final FileSystem myFS = fs;
final String myName = generateLeafFileName(name); //在这里可以硬性的指定文件名名称
final JobConf myJob = job;
final Progressable myProgressable = arg3;
return new RecordWriter<K, V>() {
// a cache storing the record writers for different output files.
TreeMap<String, RecordWriter<K, V>> recordWriters = new TreeMap<String, RecordWriter<K, V>>();
public void write(K key, V value) throws IOException
{
// get the file name based on the key
String keyBasedPath = generateFileNameForKeyValue(key, value, myName); //一般依据key来决定文件名的时候 就在这个函数
// get the file name based on the input file name
String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath); //如果想依据jobconf配置来确定名称的话 就在这个函数里实现 finalPath 就是最终的文件名
// get the actual key
K actualKey = generateActualKey(key, value);
V actualValue = generateActualValue(key, value);
RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
if (rw == null)
{
// if we don't have the record writer yet for the final path, create one and add it to the cache
rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable);//必须自己实现的
this.recordWriters.put(finalPath, rw);
}
rw.write(actualKey, actualValue);//
};
.......
};
}
}
上述函数,除了getInputFileBasedOutputFileName,其他的红色函数基本上都只是简单的返回输入值.