下面是四个文件及其内容。
代码实现:
Mapper:
package cn.tedu.invert; import java.io.IOException; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class InvertMapper extends Mapper<LongWritable, Text, Text, Text> { @Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取文件名
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String pathName = fileSplit.getPath().getName(); // 将文件中的内容提取
String[] words = value.toString().split(" "); // 每一个单词都对应着自己所在文件的文件名
for(String word:words){
context.write(new Text(word), new Text(pathName));
}
}
}
Reducer:
package cn.tedu.invert; import java.io.IOException;
import java.util.HashSet; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class InvertReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 哈希表不存重复元素,将重复的文件名去掉
HashSet<String> set = new HashSet<>();
for (Text text : values) {
set.add(text.toString());
} StringBuilder sb = new StringBuilder();
for (String str : set) {
sb.append(str.toString()).append(" ");
} context.write(key, new Text(sb.toString()));
}
}
Driver:
package cn.tedu.invert; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class InvertDriver { public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.tedu.invert.InvertDriver.class);
job.setMapperClass(InvertMapper.class);
job.setReducerClass(InvertReducer.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.74.129:9000/text/invert"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.74.129:9000/result/invert_result")); if (!job.waitForCompletion(true))
return;
}
}
结果: