Lucene中的堆应用

时间:2022-10-21 03:15:50

Lucene中的堆

 

Lucene打分公式我就不说明了,这部分主要说明Lucene中堆的数据结构。把公式摆在下面,忘记的可以看下:

 Lucene中的堆应用

 

Lucene中多处用到最小堆的数据结构。下面的内容都是基于布尔查询OR场景,即需要将多个倒排表进行合并(取并集)。

 

在BooleanScorer2的构造函数中,创建DisjunctionSumScorer对象countingSumScorer。

之后在score函数中,先利用DisjunctionSumScorer类按照文档ID递增的顺序弹出文档,在这个过程中将分数进行叠加,然后利用InOrderTopScoreDocCollector类将文档构建成基于分数的最小堆。

 

BooleanScorer2代码片段如下:

class BooleanScorer2 extends Scorer {
....
....
/** The scorer to which all scoring will be delegated,
* except for computing and using the coordination factor.
*/
private final Scorer countingSumScorer;
public BooleanScorer2(Weight weight, boolean disableCoord, Similarity similarity, int minNrShouldMatch,
List<Scorer> required, List<Scorer> prohibited, List<Scorer> optional, int maxCoord) throws IOException {
....
countingSumScorer = makeCountingSumScorer(disableCoord, similarity);
}
....
....
/** Scores and collects all matching documents.
* @param collector The collector to which all matching documents are passed through.
*/
@Override
public void score(Collector collector) throws IOException {
collector.setScorer(this);
while ((doc = countingSumScorer.nextDoc()) != NO_MORE_DOCS) {
collector.collect(doc);
}
}
....
....
}

 

 

将多个倒排表按照文档ID的递增合并

 

多个倒排表合并的核心代码在DisjunctionSumScorer类中。这个类中有一个关键的数据结构ScorerDocQueue。代码及注释如下:

package org.apache.lucene.util;

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/* Derived from org.apache.lucene.util.PriorityQueue of March 2005 */

import java.io.IOException;

import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Scorer;

/** A ScorerDocQueue maintains a partial ordering of its Scorers such that the
least Scorer can always be found in constant time. Put()'s and pop()'s
require log(size) time. The ordering is by Scorer.doc().
*
* @lucene.internal
*/
public class ScorerDocQueue { // later: SpansQueue for spans with doc and term positions
private final HeapedScorerDoc[] heap;
private final int maxSize;
private int size;

/**
*
* 堆中的节点
* @author sony
*
*/
private class HeapedScorerDoc {
Scorer scorer; //每个节点中包含一个倒排表结构
int doc; //当前遍历到的文档ID

HeapedScorerDoc(Scorer s) { this(s, s.docID()); }

HeapedScorerDoc(Scorer scorer, int doc) {
this.scorer = scorer;
this.doc = doc;
}

void adjust() { doc = scorer.docID(); }
}

//栈顶元素
private HeapedScorerDoc topHSD; // same as heap[1], only for speed

/** Create a ScorerDocQueue with a maximum size. */
public ScorerDocQueue(int maxSize) {
// assert maxSize >= 0;
size = 0;
int heapSize = maxSize + 1;
//预先开辟出来空间
heap = new HeapedScorerDoc[heapSize];
this.maxSize = maxSize;
topHSD = heap[1]; // initially null
}

/**
* Adds a Scorer to a ScorerDocQueue in log(size) time.
* If one tries to add more Scorers than maxSize
* a RuntimeException (ArrayIndexOutOfBound) is thrown.
*/
public final void put(Scorer scorer) {
size++;
heap[size] = new HeapedScorerDoc(scorer);
//插入新的元素时,先放到最小堆的末尾,然后自底向上调整为最小堆。
upHeap();
}

/**
* Adds a Scorer to the ScorerDocQueue in log(size) time if either
* the ScorerDocQueue is not full, or not lessThan(scorer, top()).
* @param scorer
* @return true if scorer is added, false otherwise.
*/
public boolean insert(Scorer scorer){
if (size < maxSize) {
//如果有空间容纳,将新元素插到堆的末尾。
put(scorer);
return true;
} else {
//否则将新元素替换堆顶元素,然后自顶向下调整成最小堆。
int docNr = scorer.docID();
if ((size > 0) && (! (docNr < topHSD.doc))) { // heap[1] is top()
heap[1] = new HeapedScorerDoc(scorer, docNr);
downHeap();
return true;
} else {
return false;
}
}
}

/** Returns the least Scorer of the ScorerDocQueue in constant time.
* Should not be used when the queue is empty.
*/
public final Scorer top() {
// assert size > 0;
return topHSD.scorer;
}

/** Returns document number of the least Scorer of the ScorerDocQueue
* in constant time.
* Should not be used when the queue is empty.
*/
public final int topDoc() {
// assert size > 0;
return topHSD.doc;
}

public final float topScore() throws IOException {
// assert size > 0;
return topHSD.scorer.score();
}

//将堆顶的倒排表的遍历到下一个文档,并且调整最小堆
public final boolean topNextAndAdjustElsePop() throws IOException {
return checkAdjustElsePop(topHSD.scorer.nextDoc() != DocIdSetIterator.NO_MORE_DOCS);
}

public final boolean topSkipToAndAdjustElsePop(int target) throws IOException {
return checkAdjustElsePop(topHSD.scorer.advance(target) != DocIdSetIterator.NO_MORE_DOCS);
}

private boolean checkAdjustElsePop(boolean cond) {
//如果堆顶的倒排表还能遍历下一个文档,就更改topHSD的doc值
if (cond) { // see also adjustTop
topHSD.doc = topHSD.scorer.docID();
} else { // see also popNoResult
//否则,移除堆顶元素,将末尾元素提上来
heap[1] = heap[size]; // move last to first
heap[size] = null;
size--;
}
//最后自顶向下调整成最小堆
downHeap();
return cond;
}

/** Removes and returns the least scorer of the ScorerDocQueue in log(size)
* time.
* Should not be used when the queue is empty.
*/
public final Scorer pop() {
// assert size > 0;
Scorer result = topHSD.scorer;
popNoResult();
return result;
}

/** Removes the least scorer of the ScorerDocQueue in log(size) time.
* Should not be used when the queue is empty.
*/
private final void popNoResult() {
heap[1] = heap[size]; // move last to first
heap[size] = null;
size--;
downHeap();// adjust heap
}

/** Should be called when the scorer at top changes doc() value.
* Still log(n) worst case, but it's at least twice as fast to <pre>
* { pq.top().change(); pq.adjustTop(); }
* </pre> instead of <pre>
* { o = pq.pop(); o.change(); pq.push(o); }
* </pre>
*/
public final void adjustTop() {
// assert size > 0;
topHSD.adjust();
downHeap();
}

/** Returns the number of scorers currently stored in the ScorerDocQueue. */
public final int size() {
return size;
}

/** Removes all entries from the ScorerDocQueue. */
public final void clear() {
for (int i = 0; i <= size; i++) {
heap[i] = null;
}
size = 0;
}

//自底向上调整成最小堆
private final void upHeap() {
int i = size;
HeapedScorerDoc node = heap[i]; // save bottom node
int j = i >>> 1;
while ((j > 0) && (node.doc < heap[j].doc)) {
heap[i] = heap[j]; // shift parents down
i = j;
j = j >>> 1;
}
heap[i] = node; // install saved node
topHSD = heap[1];
}

//自顶向下调整成最小堆
private final void downHeap() {
int i = 1;
HeapedScorerDoc node = heap[i]; // save top node
int j = i << 1; // find smaller child
int k = j + 1;
if ((k <= size) && (heap[k].doc < heap[j].doc)) {
j = k;
}
while ((j <= size) && (heap[j].doc < node.doc)) {
heap[i] = heap[j]; // shift up child
i = j;
j = i << 1;
k = j + 1;
if (k <= size && (heap[k].doc < heap[j].doc)) {
j = k;
}
}
heap[i] = node; // install saved node
topHSD = heap[1];
}
}


 DisjunctionSumScorer关键函数 advanceAfterCurrent完成按顺序所有的遍历文档以及分数叠加的工作。

 

protected boolean advanceAfterCurrent() throws IOException {
do { // repeat until minimum nr of matchers
currentDoc = scorerDocQueue.topDoc();///scorerDocQueue是一个最小堆,每一个元素是每一个term下的倒排表
currentScore = scorerDocQueue.topScore();/////////在这里真正计算了norm和tf因子!
nrMatchers = 1;
do { // Until all subscorers are after currentDoc
if (!scorerDocQueue.topNextAndAdjustElsePop()) {//如果堆顶元素的那个倒排表不可以取下一个元素
if (scorerDocQueue.size() == 0) {
break; // nothing more to advance, check for last match.
}
}
if (scorerDocQueue.topDoc() != currentDoc) {
break; // All remaining subscorers are after currentDoc.
}
//如果堆顶可以取下一个文档,且已经重新调整完毕,最小的仍然是之前的文档号,说明多个Term在同一个文档中出现了
currentScore += scorerDocQueue.topScore();//叠加,需要在q中所有的t,在该文档d下的词频。
nrMatchers++;
} while (true);

if (nrMatchers >= minimumNrMatchers) {//如果这个文档同时满足符合条件的scorer
return true;
} else if (scorerDocQueue.size() < minimumNrMatchers) {
return false;
}
} while (true);
}


 

如果调整后的堆顶元素的文档号依然是当前文档的文档号,说明多个词在同一个文档中出现,这个时候需要将在这个文档中出现的所有词的分数进行叠加,这就是打分公式中叠加的体现

 

利用优先队列获取分数排名前N名

 

接下来进入InOrderTopScoreDocCollector类,它继承于TopScoreDocCollector类,包含一个优先队列的数据结构PriorityQueue类。优先队列的原理与最小堆是一致的。只不过优先队列在初始时,将堆中每一个元素的值都设为“无穷小”。这样,当来了一个文档,如果这个文档的分数大于堆顶,就替换堆顶,然后自顶向下调整成最小堆。这样保证堆中总是保留分数前N大的文档。代码如下:

 @Override
public void collect(int doc) throws IOException {
float score = scorer.score();

// This collector cannot handle these scores:
assert score != Float.NEGATIVE_INFINITY;
assert !Float.isNaN(score);

totalHits++;
//pqTop表示堆顶元素
if (score <= pqTop.score) {//刚开始pgTop设为负无穷,因为pq设为20个大小,在刚开始时都设置成无穷小,如果20个都满了。
//之后再来文档,只需要把比pg中最小的大的留下,替换原先最小的即可。
// Since docs are returned in-order (i.e., increasing doc Id), a document
// with equal score to pqTop.score cannot compete since HitQueue favors
// documents with lower doc Ids. Therefore reject those docs too.
return;
}
pqTop.doc = doc + docBase;//每次向头部插入数据,整理成最小堆
pqTop.score = score;
pqTop = pq.updateTop();
}

@Override
public boolean acceptsDocsOutOfOrder() {
return false;
}
}

 

最后按照需要的数目从堆中弹出文档,需要逆序一下。