MapReduce编程之倒排索引的实现

时间:2022-05-17 14:59:37

转:http://www.cnblogs.com/aijianiula/p/3870664.html

倒排索引简单地就是:根据单词,返回它在哪个文件中出现过,而且频率是多少的结果。这就像百度里的搜索,你输入一个关键字,那么百度引擎就迅速的在它的服务器里找到有该关键字的文件,并根据频率和其他一些策略(如页面点击投票率)等来给你返回结果。这个过程中,倒排索引就起到很关键的作用。


---------------------------------mapper

//context.wirte("hello->a.txt", "1")
//context.wirte("hello->a.txt", "1")
//context.wirte("hello->a.txt", "1")

<"hello->a.txt", {1,1,1}>
---------------------------------reducer
/context.write("hello", "a.txt->3")
//context.write("hello", "b.txt->2")
//context.write("hello", "c.txt->2")



-----------------------------------------------mapper

//context.write("hello", "a.txt->3")
//context.write("hello", "b.txt->2")
//context.write("hello", "c.txt->2")

<"hello", {"a.txt->3", "b.txt->2", "c.txt->2"}>

-------------------------------- reducer

context.write("hello", "a.txt->3 b.txt->2 c.txt->2")


hello a.txt->3 b.txt->2 c.txt->2
jerry a.txt->1 b.txt->3 c.txt->1
tom a.txt->2 b.txt->1 c.txt->1


第一步:

package com.test.hadoop.mr.ii;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 倒排索引的第一个步骤

* @author Administrator com.test.hadoop.mr.ii.InverseIndexStepOne
*/
public class InverseIndexStepOne {
public static class InverseIndexStepOneMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text k = new Text();
private LongWritable v = new LongWritable();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = StringUtils.split(line, " ");
// 获取本次调用传递进来的数据所在的文件信息,先要获取所属切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 从切片信息中获取到文件路径及文件名
String fileName = inputSplit.getPath().getName();

// 输出kv对 <hello-->a.txt,1>
for (String word : words) {
k.set(word + "-->" + fileName);
v.set(1);
context.write(k, v);
}
}
}

public static class InverseIndexStepOneReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable v = new LongWritable();

// <hello -->a.txt,{1,1,1,1}>
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
// 遍历values进行累加
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
v.set(count);
context.write(key, v);
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();

Job job_stepOne = Job.getInstance(conf);

job_stepOne.setJarByClass(InverseIndexStepOne.class);

job_stepOne.setMapperClass(InverseIndexStepOneMapper.class);
job_stepOne.setReducerClass(InverseIndexStepOneReducer.class);

job_stepOne.setOutputKeyClass(Text.class);
job_stepOne.setOutputValueClass(LongWritable.class);

FileInputFormat.setInputPaths(job_stepOne, new Path("D:/srcData/ii"));
FileOutputFormat.setOutputPath(job_stepOne, new Path("D:/out"));

job_stepOne.waitForCompletion(true);
}
}

第二步

package com.test.hadoop.mr.ii;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 倒排索引的第一个步骤
*
* @author Administrator com.test.hadoop.mr.ii.InverseIndexStepOne
*/
public class InverseIndexStepTwo {
public static class InverseIndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text k = new Text();
private Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();

// 切分出各个字段
String[] fields = StringUtils.split(line, "\t");
long count = Long.parseLong(fields[1]);
String wordAndFile = fields[0];

String[] wordAndFileName = StringUtils.split(wordAndFile, "-->");
String word = wordAndFileName[0];
String fileName = wordAndFileName[1];

// 将单词作为key,文件-->次数 作为value输出
k.set(word);
v.set(fileName + "-->" + count);

context.write(k, v);
}
}

public static class InverseIndexStepTwoReducer extends Reducer<Text, Text, Text, Text> {
private Text v = new Text();

// key:hello values:[a.txt-->3,b.txt-->2,c.txt-->1]
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

String result = "";
for (Text value : values) {
result += value + " ";
}
v.set(result);
context.write(key, v);
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();

Job job_stepOne = Job.getInstance(conf);

job_stepOne.setJarByClass(InverseIndexStepTwo.class);

job_stepOne.setMapperClass(InverseIndexStepTwoMapper.class);
job_stepOne.setReducerClass(InverseIndexStepTwoReducer.class);

job_stepOne.setOutputKeyClass(Text.class);
job_stepOne.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job_stepOne, new Path("D:/out"));
FileOutputFormat.setOutputPath(job_stepOne, new Path("D:/out2"));

job_stepOne.waitForCompletion(true);
}
}