申明:以下代码均来自HBase-1.0.1.1
HTable tb = new HTable(conf,"test");
Scan scan = new Scan();
scan.addColumn(("colfam").getBytes(),("col").getBytes());
ResultScanner rs = tb.getScanner(scan);
Result r = null;
while((r=rs.next())!=null){
byte[] val = r.getValue(("colfam").getBytes(),("col").getBytes());
System.out.println("Get value: "+(new String(val)));
}
上面是一段非常简单的扫描test表的代码。首先它构造了一个scan
,这是扫描的描述。然后调用HTable.getScanner
获取ResultScanner
。最后调用ResultScanner.next
获取数据结果。整个扫描过程简单的来说就是这样,下面我们具体来看下这个过程是如何工作的。
首先是getScanner
,它实际上返回的是ClientScanner
或其子类。根据scan设置的不同会返回4种不同的Scanner。
public ResultScanner getScanner(final Scan scan) throws IOException {
if (scan.getBatch() > 0 && scan.isSmall()) {
throw new IllegalArgumentException("Small scan should not be used with batching");
}
if (scan.getCaching() <= 0) {
scan.setCaching(getScannerCaching());
}
if (scan.isReversed()) {
if (scan.isSmall()) {
return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//1
} else {
return new ReversedClientScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//2
}
}
if (scan.isSmall()) {
return new ClientSmallScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//3
} else {
return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
this.rpcCallerFactory, this.rpcControllerFactory,
pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//4
}
}
ClientScanner
实现了ResultScanner
接口,我们来看看ClientScanner
的next
是如何工作的。
public Result next() throws IOException {
// If the scanner is closed and there's nothing left in the cache, next is a no-op.
if (cache.size() == 0 && this.closed) {
return null;
}
if (cache.size() == 0) {
loadCache();
}
if (cache.size() > 0) {
return cache.poll();
}
// if we exhausted this scanner before calling close, write out the scan metrics
writeScanMetrics();
return null;
}
这段代码的过程是这样:首先判断cache的size是否为0,如果为0则加载一些数据进cache,然后如果cache的size大于0,说明有数据,那么poll第一个数据并返回。从这里我们可以看到,扫描并不是每次next都去server拿数据,而是预先加载一些进来,不够了再取。
然后我们看看如果cache里没有数据,loadCache
是如何工作的。loadCache
的代码很长,去掉了一些因为注释,把自己的理解注释在里面。
protected void loadCache() throws IOException {
Result[] values = null;
long remainingResultSize = maxScannerResultSize; //剩余cache容量
int countdown = this.caching; //剩余cache能保存的Result数量
//设置cache大小,server将会一次返回这么多条数据回来,如果有的话
callable.setCaching(this.caching);
boolean skipFirst = false; //是否需要跳过一行
//发生OutOfOrderException时候,是否需要重试
boolean retryAfterOutOfOrderException = true;
boolean serverHasMoreResults = false; //region是否还有更多的数据
do {
try {
//跳过一行数据,为什么?
//因为当发生异常的时候,我们可能需要重试
//重试需要重新定位start row,而之前已经成功读取了一些记录
//我们只能通过lastResult记录最后一次成功读取的记录,然后从下一条开始
//所以这里其实就是skip lastRsult
if (skipFirst) {
callable.setCaching(1); //跳过一条记录,所以设为1
values = call(scan, callable, caller, scannerTimeout);
//当前向RS (region server)读取失败,切换到了新的replicaRS,我们需要重做一次skip
//switchedToADifferentReplica用来判断是否做了切换
if (values == null && callable.switchedToADifferentReplica()) {
if (this.lastResult != null) {
skipFirst = true;
}
//设置新的region信息
this.currentRegion = callable.getHRegionInfo();
continue;
}
callable.setCaching(this.caching);
skipFirst = false;
}
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
//读取数据
//这里最终调用的是callable的call
//callable是ScannerCallableWithReplicas的实例
values = call(scan, callable, caller, scannerTimeout);
//我认为这个if不会发生,要么skip成功skipFirst置为false,要么失败执行continue
//这里不重要可以忽略
if (skipFirst && values != null && values.length == 1) {
skipFirst = false; // Already skipped, unset it before scanning again
values = call(scan, callable, caller, scannerTimeout);
}
//这里和上面一样,在失败的时候做一些处理
//多说明一些东西:
//value何时为null?server停止scan,或者第一次调用call
//因为第一次执行openScanner并不返回数据
if (values == null && callable.switchedToADifferentReplica()) {
if (this.lastResult != null) {
skipFirst = true;
}
this.currentRegion = callable.getHRegionInfo();
continue;
}
//重置为true
retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
//这里是一些异常处理
if (e instanceof UnknownScannerException) {
long timeout = lastNext + scannerTimeout;
if (timeout < System.currentTimeMillis()) {
long elapsed = System.currentTimeMillis() - lastNext;
ScannerTimeoutException ex =
new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
+ "timeout is currently set to " + scannerTimeout);
ex.initCause(e);
throw ex;
}
} else {
Throwable cause = e.getCause();
if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException) {
// Pass
// It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw.
} else {
throw e;
}
}
// Else, its signal from depths of ScannerCallable that we need to reset the scanner.
//这里就是上面提到的为什么要skip
//发生异常,我们需要重新开始,所以必须定位start row
if (this.lastResult != null) {
//设置start row为最后一次成功读取的结果,但是我们会跳过它
//真实的start row在下一行
this.scan.setStartRow(this.lastResult.getRow());
// Skip first row returned. We already let it out on previous
// invocation.
skipFirst = true;
}
//这个异常在第一出现的时候
//因为retryAfterOutOfOrderException=ture,所以会重试一次
//但是如果连续出现第二次
//因为retryAfterOutOfOrderException还没被重置为true
//所以会直接抛出异常,不再重试
if (e instanceof OutOfOrderScannerNextException) {
if (retryAfterOutOfOrderException) {
retryAfterOutOfOrderException = false;
} else {
// TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
throw new DoNotRetryIOException("Failed after retry of " +
"OutOfOrderScannerNextException: was there a rpc timeout?", e);
}
}
//发生异常我们重置region信息和callable
// Clear region.
this.currentRegion = null;
// Set this to zero so we don't try and do an rpc and close on remote server when
// the exception we got was UnknownScanner or the Server is going down.
callable = null;
// This continue will take us to while at end of loop where we will set up new scanner.
continue;
}
long currentTime = System.currentTimeMillis();
if (this.scanMetrics != null) {
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
}
lastNext = currentTime;
//把结果加入cache
if (values != null && values.length > 0) {
for (Result rs : values) {
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
countdown--;
this.lastResult = rs;
}
}
//设置serverHasMoreResults,true表示这个region还有数据,否则
if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
// Only adhere to more server results when we don't have any partialResults
// as it keeps the outer loop logic the same.
serverHasMoreResults = callable.getServerHasMoreResults();
}
//这个while的逻辑是这样:
//假如serverHasMoreResults=true,还有数据
//!serverHasMoreResults=false,整个逻辑false,循环终止
//因server一次返回剩余容量的数据,cache已经load满了,当然要终止
//反之,!serverHasMoreResults=true,那么将会执行possiblyNextScanner
//possiblyNextScanner用来寻找下一个region,并构造新的callable
//如果成功找到,那么返回true,这样循环继续进行
} while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
&& possiblyNextScanner(countdown, values == null));
}
整个过程的主线是:调用call获取Result,如果发生异常进行重试。重试重新确定start row通过跳过lastResult完成。
注意:
1. call如果失败会自动切换replicaRS,也就是换到备份的region server上去读取,这个只有在scan的Consistency设为Consistency.TIMELINE
情况下才会发生。
2. OutOfOrderScannerNextException异常的原因是客户端和服务端维护的一个序列值不一致。每次读取收到结果,客户端的nextCallSeq++。假如某次读取请求服务端正确收到,返回结果给客户端的时候,客户端等待超时了,那么服务端的seq就会比客户端多1。
3.第一次调用call实则是在服务端上打开scan服务,并获取scannerId为之后做准备。如果你继续跟踪possiblyNextScanner
,你会发现定位了新的region并构造了对应的ScannerCallableWithReplicas
后主动调了一次call,就是这个原因。
更详细的过程可以继续跟踪call,其实是调用ScannerCallableWithReplicas.call
,而后者维护了线程池执行任务,最终调用的是ScannerCallable.call
。如果任务失败,那么ScannerCallableWithReplicas
会为每一个replicaRS构造各自的ScannerCallable
,然后各自起任务运行。只有第一个成功返回的replicaRS会被选中,之后的读取会被切换到这个replicaRS上。