Hbase源码研究(三)-------get最终转化为scan来处理(2)

时间:2021-09-04 08:24:19

    上一篇分析到了Scan scan = new Scan(get) 之后就没再往下写了, 现在继续补上。

    先来分析一下Scan的创建过程,

public Scan(Get get) {
this.startRow = get.getRow();
this.stopRow = get.getRow();
this.filter = get.getFilter();
this.cacheBlocks = get.getCacheBlocks();
this.maxVersions = get.getMaxVersions();
this.tr = get.getTimeRange();
this.familyMap = get.getFamilyMap();
}

    可以清楚地看到,里面startRow和stopRow设置是相等。

    再来看getScanner

   

protected RegionScanner getScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
startRegionOperation();
this.readRequestsCount.increment();
try {
// Verify families are all valid
prepareScanner(scan);
if(scan.hasFamilies()) {
for(byte [] family : scan.getFamilyMap().keySet()) {
checkFamily(family);
}
}
return instantiateRegionScanner(scan, additionalScanners);
} finally {
closeRegionOperation();
}
}

 

     其中重点关注instantiateRegionScanner 这个方法,里面就一行......

protected RegionScanner instantiateRegionScanner(Scan scan,
List<KeyValueScanner> additionalScanners) throws IOException {
return new RegionScannerImpl(scan, additionalScanners);
}


   看看RegionScannerImpl(还真囧啊.........绕这么多弯)

RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
//DebugPrint.println("HRegionScanner.<init>");

this.filter = scan.getFilter();
this.batch = scan.getBatch();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
this.stopRow = null;
} else {
this.stopRow = scan.getStopRow();
}
// If we are doing a get, we want to be [startRow,endRow] normally
// it is [startRow,endRow) and if startRow=endRow we get nothing.
this.isScan = scan.isGetScan() ? -1 : 0;

// synchronize on scannerReadPoints so that nobody calculates
// getSmallestReadPoint, before scannerReadPoints is updated.
synchronized(scannerReadPoints) {
this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
scannerReadPoints.put(this, this.readPt);
}

List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}

for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
StoreScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
}
this.storeHeap = new KeyValueHeap(scanners, comparator);
}


       感觉hbase里的注释还是挺NB的,重要的部分都有解释,看起来还是挺爽的......

      // If we are doing a get, we want to be [startRow,endRow] normally
// it is [startRow,endRow) and if startRow=endRow we get nothing.
this.isScan = scan.isGetScan() ? -1 : 0;


       有专门判断是不是getScan的,这里再次应正了get最终转化成scan来处理的。

       

// synchronize on scannerReadPoints so that nobody calculates
// getSmallestReadPoint, before scannerReadPoints is updated.
synchronized(scannerReadPoints) {
this.readPt = MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);
scannerReadPoints.put(this, this.readPt);
}


     其中涉及到了一个MultiVersionConsistencyControl的东东,下面是代码中对他的解释......

 

/**
* Manages the read/write consistency within memstore. This provides
* an interface for readers to determine what entries to ignore, and
* a mechanism for writers to obtain new write numbers, then "commit"
* the new writes for readers to read (thus forming atomic transactions).
*/


   应该是控制读写数据一致性的。

   接着往下看,初始化StoreScanner

      for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
StoreScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
}


store.getScanner 这个方法主要新建了一个StoreScanner,看看StoreScanner的构造方法

StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
throws IOException {
this.store = store;
this.cacheBlocks = scan.getCacheBlocks();
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
columns, store.ttl, store.comparator.getRawComparator(),
store.minVersions, store.versionsToReturn(scan.getMaxVersions()),
false, Long.MAX_VALUE);

this.isGet = scan.isGetScan();
// pass columns = try to filter out unnecessary ScanFiles
List<KeyValueScanner> scanners = getScanners(scan, columns);

// Seek all scanners to the start of the Row (or if the exact matching row
// key does not exist, then to the start of the next matching Row).
if (matcher.isExactColumnQuery()) {
for (KeyValueScanner scanner : scanners)
scanner.seekExactly(matcher.getStartKey(), false);
} else {
for (KeyValueScanner scanner : scanners)
scanner.seek(matcher.getStartKey());
}

// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.comparator);

this.store.addChangedReaderObserver(this);
}


先new了一个matcher比较器,留作后用

    matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
columns, store.ttl, store.comparator.getRawComparator(),
store.minVersions, store.versionsToReturn(scan.getMaxVersions()),
false, Long.MAX_VALUE);


接下来需要获得KeyValueScanner 来进行查找

 // pass columns = try to filter out unnecessary ScanFiles
List<KeyValueScanner> scanners = getScanners(scan, columns);


看看getScanners里面是怎么写的

 /*
* @return List of scanners to seek, possibly filtered by StoreFile.
*/
private List<KeyValueScanner> getScanners(Scan scan,
final NavigableSet<byte[]> columns) throws IOException {
boolean memOnly;
boolean filesOnly;
if (scan instanceof InternalScan) {
InternalScan iscan = (InternalScan)scan;
memOnly = iscan.isCheckOnlyMemStore();
filesOnly = iscan.isCheckOnlyStoreFiles();
} else {
memOnly = false;
filesOnly = false;
}
List<KeyValueScanner> allStoreScanners =
this.store.getScanners(cacheBlocks, isGet, false);

List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(allStoreScanners.size());

// include only those scan files which pass all filters
for (KeyValueScanner kvs : allStoreScanners) {
if (kvs instanceof StoreFileScanner) {
if (memOnly == false
&& ((StoreFileScanner) kvs).shouldSeek(scan, columns)) {
scanners.add(kvs);
}
} else {
// kvs is a MemStoreScanner
if (filesOnly == false && this.store.memstore.shouldSeek(scan)) {
scanners.add(kvs);
}
}
}

return scanners;
}


scanner是通过this.store.getScanners 来获得的,再看这个方法:

 protected List<KeyValueScanner> getScanners(boolean cacheBlocks,
boolean isGet,
boolean isCompaction) throws IOException {
List<StoreFile> storeFiles;
List<KeyValueScanner> memStoreScanners;
this.lock.readLock().lock();
try {
storeFiles = this.getStorefiles();
memStoreScanners = this.memstore.getScanners();
} finally {
this.lock.readLock().unlock();
}

// First the store file scanners

// TODO this used to get the store files in descending order,
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
// Then the memstore scanners
scanners.addAll(memStoreScanners);
return scanners;
}

 

 查找一共分两部分查,从memstore里查和从storefile里查,所以分别获得两组scanner,我们分别来看。

获得memstoreScanner从这个方法里获得 this.memstore.getScanners();

 /**
* @return scanner on memstore and snapshot in this order.
*/
List<KeyValueScanner> getScanners() {
this.lock.readLock().lock();
try {
return Collections.<KeyValueScanner>singletonList(
new MemStoreScanner());
} finally {
this.lock.readLock().unlock();
}
}

具体创造memstoreScanner的细节,我们以后再讨论。

再来看StoreFileScanner 创建过程,

 List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction);


再看方法

/**
* Return an array of scanners corresponding to the given set of store files.
*/
public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
boolean isCompaction) throws IOException {
List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
files.size());
for (StoreFile file : files) {
StoreFile.Reader r = file.createReader();
scanners.add(r.getStoreFileScanner(cacheBlocks, usePread, isCompaction));
}
return scanners;
}


是调用StoreFile.Reader的 getStoreFileScanner 方法创建的,继续往下跟

   /**
* Get a scanner to scan over this StoreFile.
*
* @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers)
* @param isCompaction is scanner being used for compaction?
* @return a scanner
*/
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks,
boolean pread, boolean isCompaction) {
return new StoreFileScanner(this, getScanner(cacheBlocks, pread,
isCompaction), !isCompaction);
}


看的new了一个 StoreFileScanner

 /**
* Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
* @param hfs HFile scanner
*/
public StoreFileScanner(StoreFile.Reader reader, HFileScanner hfs, boolean useMVCC) {
this.reader = reader;
this.hfs = hfs;
this.enforceMVCC = useMVCC;
}


HfileScanner 是通过 getScanner得到的 ,再看getScanner

@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread,
boolean isCompaction) {
return reader.getScanner(cacheBlocks, pread, isCompaction);
}


是通过HfileReader的 getScanner实现的,层数还真多啊.......再往下跟就到了 HFileReaderV2 实现这个getScanner方法了

/**
* Create a Scanner on this file. No seeks or reads are done on creation. Call
* {@link HFileScanner#seekTo(byte[])} to position an start the read. There is
* nothing to clean up in a Scanner. Letting go of your references to the
* scanner is sufficient.
*
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @param pread Use positional read rather than seek+read if true (pread is
* better for random reads, seek+read is better scanning).
* @param isCompaction is scanner being used for a compaction?
* @return Scanner on this file.
*/
@Override
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread,
final boolean isCompaction) {
return new ScannerV2(this, cacheBlocks, pread, isCompaction);
}


 

public ScannerV2(HFileReaderV2 r, boolean cacheBlocks,
final boolean pread, final boolean isCompaction) {
super(cacheBlocks, pread, isCompaction);
this.reader = r;
}


终于绕完了,返回最上层。把memstoreScanner和storeFileScanner合并成一个list返回到上层。

// include only those scan files which pass all filters
for (KeyValueScanner kvs : allStoreScanners) {
if (kvs instanceof StoreFileScanner) {
if (memOnly == false
&& ((StoreFileScanner) kvs).shouldSeek(scan, columns)) {
scanners.add(kvs);
}
} else {
// kvs is a MemStoreScanner
if (filesOnly == false && this.store.memstore.shouldSeek(scan)) {
scanners.add(kvs);
}
}
}


过滤掉filter拦截下来的kvscanner,再次返回到上一层。

// Seek all scanners to the start of the Row (or if the exact matching row
// key does not exist, then to the start of the next matching Row).
if (matcher.isExactColumnQuery()) {
for (KeyValueScanner scanner : scanners)
scanner.seekExactly(matcher.getStartKey(), false);
} else {
for (KeyValueScanner scanner : scanners)
scanner.seek(matcher.getStartKey());
}


看scanner 通过seekExactly和seek来查找数据,再返回上一层。

this.storeHeap = new KeyValueHeap(scanners, comparator);


 

/**
* Constructor. This KeyValueHeap will handle closing of passed in
* KeyValueScanners.
* @param scanners
* @param comparator
*/
public KeyValueHeap(List<? extends KeyValueScanner> scanners,
KVComparator comparator) {
this.comparator = new KVScannerComparator(comparator);
if (!scanners.isEmpty()) {
this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
this.comparator);
for (KeyValueScanner scanner : scanners) {
if (scanner.peek() != null) {
this.heap.add(scanner);
} else {
scanner.close();
}
}
this.current = heap.poll();
}
}


把所有的scanner放到队列里.......

终于世界安静了.........

在HRegion类里的getScanner方法就分析完了.......

下次再继续分析后面的.scanner.next(results); 吧.......