1 思路:
0.txt MapReduce is simple
1.txt MapReduce is powerfull is simple
2.txt Hello MapReduce bye MapReduce 1 map函数:context.write(word:docid, 1) 即将word:docid作为map函数的输出
输出key 输出value
MapReduce:0.txt 1
is:0.txt 1
simple:0.txt 1
Mapreduce:1.txt 1
is:1.txt 1
powerfull:1.txt 1
is:1.txt 1
simple:1.txt 1
Hello:2.txt 1
MapReduce:2.txt 1
bye:2.txt 1
MapReduce:2.txt 1
2 combine函数:相同key(word:docid)的进行合并操作,然后context.write(word, docid:count),即将word做为输出key,docid:count作为输出value
输入key 输出value 输出key 输出value
MapReduce:0.txt 1 => MapReduce 0.txt:1
is:0.txt 1 => is 0.txt:1
simple:0.txt 1 => simple 0.txt:1
Mapreduce:1.txt 1 => Mapreduce 1.txt:1
is:1.txt 2 => is 1.txt:2
powerfull:1.txt 1 => powerfull 1.txt:1
simple:1.txt 1 => simple 1.txt:1
Hello:2.txt 1 => Hello 2.txt:1
MapReduce:2.txt 2 => MapReduce 2.txt:2
bye:2.txt 1 => bye 2.txt:1
3 Partitioner函数:HashPartitioner
略,根据combine的输出key进行分区
4 Reducer函数:仅仅是组合字符串了
输出key 输出value
MapReduce 0.txt:1,1.txt:1 2.txt:2
is 0.txt:1,is 1.txt:2
simple 0.txt:1,1.txt:1
powerfull 1.txt:1
Hello 2.txt:1
bye 2.txt:1
//感觉这个地方是 有问题的,Combiner相当于一个本地的reduce,万一如果某个文件大于64M(hadoop 2.x 是128M) 怎么办呢?会不会一个文件分到两个split中呢 那样在这里统计<word_docid, count>是不是会出现问题呢?
//为了确保不出问题,可以采用两个mapreduce 任务实现。http://www.cnblogs.com/i80386/p/3600174.html
combiner是把同一个机器上的多个map的结果先聚合一次
2 代码如下:
package proj; import java.io.IOException;
import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser; public class InvertedIndex { public static class InvertedIndexMapper extends
Mapper<Object, Text, Text, Text> { private Text keyInfo = new Text();
private Text valueInfo = new Text();
private FileSplit split; public void map(Object key, Text value, Context context)
throws IOException, InterruptedException { split = (FileSplit) context.getInputSplit(); StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
keyInfo.set(itr.nextToken() + ":" + split.getPath().toString());
valueInfo.set("1");
context.write(keyInfo, valueInfo);
}
}
}
//感觉这个地方是有问题的,Combiner相当于一个本地的reduce,万一如果某个文件大于64M(hadoop 2.x 是128M) 怎么办呢?会不会一个文件分到两个split中呢 那样在这里统计<word_docid, count>是不是会出现问题呢?
//为了确保不出问题,可以采用两个mapreduce 任务实现。http://www.cnblogs.com/i80386/p/3600174.html
public static class InvertedIndexCombiner extends
Reducer<Text, Text, Text, Text> { private Text info = new Text(); public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
} public static class InvertedIndexReducer extends
Reducer<Text, Text, Text, Text> {
private Text result = new Text(); public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
StringBuffer buff = new StringBuffer();
for (Text val : values) {
buff.append(val.toString() + ";");
}
result.set(buff.toString());
context.write(key, result);
} } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Job job = new Job(conf, "InvertedIndex");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} }
运行结果如下:
Hello hdfs://localhost:9000/user/root/in/2.txt:1;
MapReduce hdfs://localhost:9000/user/root/in/2.txt:2;hdfs://localhost:9000/user/root/in/0.txt:1;hdfs://localhost:9000/user/root/in/1.txt:1;
bye hdfs://localhost:9000/user/root/in/2.txt:1;
is hdfs://localhost:9000/user/root/in/0.txt:1;hdfs://localhost:9000/user/root/in/1.txt:2;
powerfull hdfs://localhost:9000/user/root/in/1.txt:1;
simple hdfs://localhost:9000/user/root/in/1.txt:1;hdfs://localhost:9000/user/root/in/0.txt:1; 0.txt MapReduce is simple
1.txt MapReduce is powerfull is simple
2.txt Hello MapReduce bye MapReduce