1 概述
倒排索引是搜索引擎中不可或缺的数据结构,利用倒排索引可以快速搜索到包涵搜索关键词的一系列文章。倒排索引的常见结构如下图:
在倒排索引中,每个term与一系列的postings相关联,每个postings由文章的id以及payload组成,而payload常见的是该词在该文章中的词频,有的也加上了位置信息。
2 算法机理
使用MapReduce进行倒排索引的构造是目前大多数大数据公司所采用的方式,因为其可以很好地处理大规模的数据。最基本的倒排索引MapReduce算法如下:
在该算法中,mapper端的输入是doc id以及doc content,每篇文章都倍分配给一个mapper,在mapper端先对文章进行分词,统计词频等,然后组成postings,最后将每个词以及其对应的postings发送出去; reducer端接收一个term,以及其对应的一系列postings,将每一个postings加入一个列表中,然后对该列表按照doc id排序,最后发送term和其对应的postings列表。 处理流程可见下面的示意图:
然而,你估计也已经发现了,上述的算法存在一个很明显的可扩展性瓶颈,就是在reducer端它假设有足够大的内存来储存每一个term对应的一系列postings,因为后续还要对这些postings进行排序。因而,当postings变得很大时,reducer端将会耗尽内存。
一个很简单的解决方案是:我们改变mapper端发送出的key的格式,将key的格式变为
3 程序实现
从上部分可以看出,使用MapReduce实现倒排索引的构建原理其实并不是很复杂,然而在实现中还是有一些问题需要注意的。
- 从算法中我们可以看到,mapper的输入是key是doc id,value为doc的内容,那么如何保证在分片时能恰好按每篇文章分成各个分片传给mapper?
- 中文文章的分词如何处理?
第二个问题比较容易,我们可以使用第三方分词包来进行文章的分词处理,我这里使用的是ansj中文分词。
第一个问题比较复杂一点,我们需要重定义一个FileInputFormat,使用该FileInputFormat mapper可以把整个文件当作一条记录处理。下面代码实现了一个WholeFileInputFormat。
package mp.invertedIndexing;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
*
* @author liupenghe
* Date: 2016-02-17
*
*/
public class WholeFileInputFormat extends FileInputFormat<NullWritable, Text>{
@Override
public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
return new WholeFileRecordReader((FileSplit)split, context);
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
在WholeFileInputFormat中,没有使用键,此处表示为NullWritabble,值为文件内容,表示成Text实例。它定义了两个方法:一个是重载isSplitable()方法返回false值,来指定输入文件不被分片;另一个是实现了createRecordReader()来返回一个定制的RecordReader实现,如下面代码所示。
package mp.invertedIndexing;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* {@link WholeFileRecordReader} is used for transforming the filesplit into a record, and the
* content of a record is the whole text.
* @author liupenghe
* Date: 2016-02-17
*/
public class WholeFileRecordReader extends RecordReader<NullWritable, Text>{
private FileSplit fileSplit;
private Configuration conf;
private boolean processed = false;
private NullWritable key;
private Text value;
public WholeFileRecordReader(FileSplit split, TaskAttemptContext context) {
// TODO Auto-generated constructor stub
fileSplit = split;
conf = context.getConfiguration();
key = NullWritable.get();
value = new Text();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
if(!processed) {
Path filePath = fileSplit.getPath();
FileSystem fs = filePath.getFileSystem(conf);
FSDataInputStream in = null;
byte[] contents = new byte[(int)fileSplit.getLength()];
try {
in = fs.open(filePath);
in.readFully(contents, 0, contents.length);
value.set(new String(contents));
} finally {
in.close();
fs.close();
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
}
}
WholeFileRecordReader负责将FileSplit转换为一条记录,该记录的键是null,值是这个文件的内容。因为只有一条记录,WholeFileRecordReader要么处理这条记录,要么不处理,所以它维护一个名称为processed的布尔变量来表示记录是否被处理过。如果nextKeyValue()方法被调用,文件没有被处理,就打开文件,声明一个长度为文件长度的数组,然后将文件内容放入该数组中,然后实例化value,返回true则表示成功读取记录。还有一些其他的方法,来获取当前的key, value以及读文件的进度。
这部分内容具体请参考《Hadoop 权威指南》第7章-MapReduce类型与格式。
完整的代码地址请点击此处
4 遇到的问题
代码写完后放到集群上运行时遇到了一些问题,这里也给出我遇到的一些问题。
- 由于分词功能使用了第三方jar包,将程序打包为jar包放到集群上运行时总是报java.lang.classnotfoundexception. 网上查阅资料得出解决方案
job.addFileToClassPath(new Path("hdfs://10.10.108.52:9000/lph/tool_libs/ansj_seg-3.0-all-in-one.jar"));
还有其他几种加载第三方jar包的方式,请参考Hadoop应用引用第三方jar的几种方式 - 运行时也遇到了这么一个错误
Error: java.lang.NullPointerException
at mp.invertedIndexing.Turple.readFields(Turple.java:42)
查阅资料找出原因是我的自定义Writable类型Turple缺少一个默认的构造函数。《Hadoop 权威指南》4.3节说明:所有的Writable实现都必须有一个默认的构造函数以便MapReduce框架可以对它们进行实例化,然后再调用readFielsds函数填充各个字段的值。