HBase源码学习 客户端scan过程

时间:2020-12-03 08:37:18


HTable tb = new HTable(conf,"test");
Scan scan = new Scan();

ResultScanner rs = tb.getScanner(scan);
Result r = null;

    byte[] val = r.getValue(("colfam").getBytes(),("col").getBytes());
    System.out.println("Get value: "+(new String(val)));



  public ResultScanner getScanner(final Scan scan) throws IOException {
    if (scan.getBatch() > 0 && scan.isSmall()) {
          throw new IllegalArgumentException("Small scan should not be used with batching");
    if (scan.getCaching() <= 0) {

    if (scan.isReversed()) {
      if (scan.isSmall()) {
        return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
            pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//1
      } else {
        return new ReversedClientScanner(getConfiguration(), scan, getName(),
            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
            pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//2

    if (scan.isSmall()) {
      return new ClientSmallScanner(getConfiguration(), scan, getName(),
          this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
          pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//3
    } else {
      return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
          this.rpcCallerFactory, this.rpcControllerFactory,
          pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//4


    public Result next() throws IOException {
      // If the scanner is closed and there's nothing left in the cache, next is a no-op.
      if (cache.size() == 0 && this.closed) {
        return null;
      if (cache.size() == 0) {

      if (cache.size() > 0) {
        return cache.poll();

      // if we exhausted this scanner before calling close, write out the scan metrics
      return null;



   protected void loadCache() throws IOException {
    Result[] values = null; 
    long remainingResultSize = maxScannerResultSize; //剩余cache容量
    int countdown = this.caching; //剩余cache能保存的Result数量


    boolean skipFirst = false; //是否需要跳过一行
    boolean retryAfterOutOfOrderException = true; 

    boolean serverHasMoreResults = false; //region是否还有更多的数据
    do {
      try {
          //重试需要重新定位start row,而之前已经成功读取了一些记录
          //所以这里其实就是skip lastRsult
        if (skipFirst) {

          callable.setCaching(1); //跳过一条记录,所以设为1
          values = call(scan, callable, caller, scannerTimeout);

          //当前向RS (region server)读取失败,切换到了新的replicaRS,我们需要重做一次skip
          if (values == null && callable.switchedToADifferentReplica()) {
            if (this.lastResult != null) { 
              skipFirst = true;
            this.currentRegion = callable.getHRegionInfo();
          skipFirst = false;
        // Server returns a null values if scanning is to stop. Else,
        // returns an empty array if scanning is to go on and we've just
        // exhausted current region.
        values = call(scan,  callable, caller, scannerTimeout);
        if (skipFirst && values != null && values.length == 1) {
          skipFirst = false; // Already skipped, unset it before scanning again
          values = call(scan, callable, caller, scannerTimeout);

        if (values == null && callable.switchedToADifferentReplica()) { 
          if (this.lastResult != null) { 
            skipFirst = true;
          this.currentRegion = callable.getHRegionInfo();
        retryAfterOutOfOrderException = true;
      } catch (DoNotRetryIOException e) {

        if (e instanceof UnknownScannerException) {
          long timeout = lastNext + scannerTimeout;

          if (timeout < System.currentTimeMillis()) {
            long elapsed = System.currentTimeMillis() - lastNext;
            ScannerTimeoutException ex =
                new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
                    + "timeout is currently set to " + scannerTimeout);
            throw ex;
        } else {

          Throwable cause = e.getCause();
          if ((cause != null && cause instanceof NotServingRegionException) ||
              (cause != null && cause instanceof RegionServerStoppedException) ||
              e instanceof OutOfOrderScannerNextException) {
            // Pass
            // It is easier writing the if loop test as list of what is allowed rather than
            // as a list of what is not allowed... so if in here, it means we do not throw.
          } else {
            throw e;
        // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
        //发生异常,我们需要重新开始,所以必须定位start row
        if (this.lastResult != null) {

          //设置start row为最后一次成功读取的结果,但是我们会跳过它
          //真实的start row在下一行

          // Skip first row returned. We already let it out on previous
          // invocation.
          skipFirst = true;
        if (e instanceof OutOfOrderScannerNextException) {
          if (retryAfterOutOfOrderException) {
            retryAfterOutOfOrderException = false;
          } else {
            // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
            throw new DoNotRetryIOException("Failed after retry of " +
                "OutOfOrderScannerNextException: was there a rpc timeout?", e);
        // Clear region.
        this.currentRegion = null;
        // Set this to zero so we don't try and do an rpc and close on remote server when
        // the exception we got was UnknownScanner or the Server is going down.
        callable = null;
        // This continue will take us to while at end of loop where we will set up new scanner.
      long currentTime = System.currentTimeMillis();
      if (this.scanMetrics != null) {
        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
      lastNext = currentTime;
      if (values != null && values.length > 0) {
        for (Result rs : values) {
          // We don't make Iterator here
          for (Cell cell : rs.rawCells()) {
            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
          this.lastResult = rs;

      if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
        // Only adhere to more server results when we don't have any partialResults
        // as it keeps the outer loop logic the same.
        serverHasMoreResults = callable.getServerHasMoreResults();

    } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
        && possiblyNextScanner(countdown, values == null));

整个过程的主线是:调用call获取Result,如果发生异常进行重试。重试重新确定start row通过跳过lastResult完成。

1. call如果失败会自动切换replicaRS,也就是换到备份的region server上去读取,这个只有在scan的Consistency设为Consistency.TIMELINE情况下才会发生。
2. OutOfOrderScannerNextException异常的原因是客户端和服务端维护的一个序列值不一致。每次读取收到结果,客户端的nextCallSeq++。假如某次读取请求服务端正确收到,返回结果给客户端的时候,客户端等待超时了,那么服务端的seq就会比客户端多1。
