HBase源码学习 客户端scan过程

时间:2020-12-03 08:37:18

申明:以下代码均来自HBase-1.0.1.1

HTable tb = new HTable(conf,"test");
Scan scan = new Scan();
scan.addColumn(("colfam").getBytes(),("col").getBytes());

ResultScanner rs = tb.getScanner(scan);
Result r = null;

while((r=rs.next())!=null){
    byte[] val = r.getValue(("colfam").getBytes(),("col").getBytes());
    System.out.println("Get value: "+(new String(val)));
}

上面是一段非常简单的扫描test表的代码。首先它构造了一个scan,这是扫描的描述。然后调用HTable.getScanner获取ResultScanner。最后调用ResultScanner.next获取数据结果。整个扫描过程简单的来说就是这样,下面我们具体来看下这个过程是如何工作的。

首先是getScanner,它实际上返回的是ClientScanner或其子类。根据scan设置的不同会返回4种不同的Scanner。

  public ResultScanner getScanner(final Scan scan) throws IOException {
    if (scan.getBatch() > 0 && scan.isSmall()) {
          throw new IllegalArgumentException("Small scan should not be used with batching");
    }
    if (scan.getCaching() <= 0) {
      scan.setCaching(getScannerCaching());
    }

    if (scan.isReversed()) {
      if (scan.isSmall()) {
        return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
            pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//1
      } else {
        return new ReversedClientScanner(getConfiguration(), scan, getName(),
            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
            pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//2
      }
    }

    if (scan.isSmall()) {
      return new ClientSmallScanner(getConfiguration(), scan, getName(),
          this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
          pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//3
    } else {
      return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
          this.rpcCallerFactory, this.rpcControllerFactory,
          pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());//4
    }
  }

ClientScanner实现了ResultScanner接口,我们来看看ClientScannernext是如何工作的。

    public Result next() throws IOException {
      // If the scanner is closed and there's nothing left in the cache, next is a no-op.
      if (cache.size() == 0 && this.closed) {
        return null;
      }
      if (cache.size() == 0) {
        loadCache();
      }

      if (cache.size() > 0) {
        return cache.poll();
      }

      // if we exhausted this scanner before calling close, write out the scan metrics
      writeScanMetrics();
      return null;
    }

这段代码的过程是这样:首先判断cache的size是否为0,如果为0则加载一些数据进cache,然后如果cache的size大于0,说明有数据,那么poll第一个数据并返回。从这里我们可以看到,扫描并不是每次next都去server拿数据,而是预先加载一些进来,不够了再取。

然后我们看看如果cache里没有数据,loadCache是如何工作的。loadCache的代码很长,去掉了一些因为注释,把自己的理解注释在里面。

   protected void loadCache() throws IOException {
    Result[] values = null; 
    long remainingResultSize = maxScannerResultSize; //剩余cache容量
    int countdown = this.caching; //剩余cache能保存的Result数量

    //设置cache大小,server将会一次返回这么多条数据回来,如果有的话
    callable.setCaching(this.caching); 

    boolean skipFirst = false; //是否需要跳过一行
    //发生OutOfOrderException时候,是否需要重试
    boolean retryAfterOutOfOrderException = true; 

    boolean serverHasMoreResults = false; //region是否还有更多的数据
    do {
      try {
          //跳过一行数据,为什么?
          //因为当发生异常的时候,我们可能需要重试
          //重试需要重新定位start row,而之前已经成功读取了一些记录
          //我们只能通过lastResult记录最后一次成功读取的记录,然后从下一条开始
          //所以这里其实就是skip lastRsult
        if (skipFirst) {

          callable.setCaching(1); //跳过一条记录,所以设为1
          values = call(scan, callable, caller, scannerTimeout);

          //当前向RS (region server)读取失败,切换到了新的replicaRS,我们需要重做一次skip
          //switchedToADifferentReplica用来判断是否做了切换
          if (values == null && callable.switchedToADifferentReplica()) {
            if (this.lastResult != null) { 
              skipFirst = true;
            }
            //设置新的region信息
            this.currentRegion = callable.getHRegionInfo();
            continue;
          }
          callable.setCaching(this.caching);
          skipFirst = false;
        }
        // Server returns a null values if scanning is to stop. Else,
        // returns an empty array if scanning is to go on and we've just
        // exhausted current region.
        //读取数据
        //这里最终调用的是callable的call
        //callable是ScannerCallableWithReplicas的实例
        values = call(scan,  callable, caller, scannerTimeout);
        //我认为这个if不会发生,要么skip成功skipFirst置为false,要么失败执行continue
        //这里不重要可以忽略
        if (skipFirst && values != null && values.length == 1) {
          skipFirst = false; // Already skipped, unset it before scanning again
          values = call(scan, callable, caller, scannerTimeout);
        }

        //这里和上面一样,在失败的时候做一些处理
        //多说明一些东西:
        //value何时为null?server停止scan,或者第一次调用call
        //因为第一次执行openScanner并不返回数据
        if (values == null && callable.switchedToADifferentReplica()) { 
          if (this.lastResult != null) { 
            skipFirst = true;
          }
          this.currentRegion = callable.getHRegionInfo();
          continue;
        }
        //重置为true
        retryAfterOutOfOrderException = true;
      } catch (DoNotRetryIOException e) {
         //这里是一些异常处理

        if (e instanceof UnknownScannerException) {
          long timeout = lastNext + scannerTimeout;

          if (timeout < System.currentTimeMillis()) {
            long elapsed = System.currentTimeMillis() - lastNext;
            ScannerTimeoutException ex =
                new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
                    + "timeout is currently set to " + scannerTimeout);
            ex.initCause(e);
            throw ex;
          }
        } else {

          Throwable cause = e.getCause();
          if ((cause != null && cause instanceof NotServingRegionException) ||
              (cause != null && cause instanceof RegionServerStoppedException) ||
              e instanceof OutOfOrderScannerNextException) {
            // Pass
            // It is easier writing the if loop test as list of what is allowed rather than
            // as a list of what is not allowed... so if in here, it means we do not throw.
          } else {
            throw e;
          }
        }
        // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
        //这里就是上面提到的为什么要skip
        //发生异常,我们需要重新开始,所以必须定位start row
        if (this.lastResult != null) {

          //设置start row为最后一次成功读取的结果,但是我们会跳过它
          //真实的start row在下一行
          this.scan.setStartRow(this.lastResult.getRow());

          // Skip first row returned. We already let it out on previous
          // invocation.
          skipFirst = true;
        }
        //这个异常在第一出现的时候
        //因为retryAfterOutOfOrderException=ture,所以会重试一次
        //但是如果连续出现第二次
        //因为retryAfterOutOfOrderException还没被重置为true
        //所以会直接抛出异常,不再重试
        if (e instanceof OutOfOrderScannerNextException) {
          if (retryAfterOutOfOrderException) {
            retryAfterOutOfOrderException = false;
          } else {
            // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
            throw new DoNotRetryIOException("Failed after retry of " +
                "OutOfOrderScannerNextException: was there a rpc timeout?", e);
          }
        }
        //发生异常我们重置region信息和callable
        // Clear region.
        this.currentRegion = null;
        // Set this to zero so we don't try and do an rpc and close on remote server when
        // the exception we got was UnknownScanner or the Server is going down.
        callable = null;
        // This continue will take us to while at end of loop where we will set up new scanner.
        continue;
      }
      long currentTime = System.currentTimeMillis();
      if (this.scanMetrics != null) {
        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
      }
      lastNext = currentTime;
      //把结果加入cache
      if (values != null && values.length > 0) {
        for (Result rs : values) {
          cache.add(rs);
          // We don't make Iterator here
          for (Cell cell : rs.rawCells()) {
            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
          }
          countdown--;
          this.lastResult = rs;
        }
      }

      //设置serverHasMoreResults,true表示这个region还有数据,否则
      if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
        // Only adhere to more server results when we don't have any partialResults
        // as it keeps the outer loop logic the same.
        serverHasMoreResults = callable.getServerHasMoreResults();
      }

      //这个while的逻辑是这样:
      //假如serverHasMoreResults=true,还有数据
      //!serverHasMoreResults=false,整个逻辑false,循环终止
      //因server一次返回剩余容量的数据,cache已经load满了,当然要终止
      //反之,!serverHasMoreResults=true,那么将会执行possiblyNextScanner
      //possiblyNextScanner用来寻找下一个region,并构造新的callable
      //如果成功找到,那么返回true,这样循环继续进行
    } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
        && possiblyNextScanner(countdown, values == null));
  }

整个过程的主线是:调用call获取Result,如果发生异常进行重试。重试重新确定start row通过跳过lastResult完成。

注意:
1. call如果失败会自动切换replicaRS,也就是换到备份的region server上去读取,这个只有在scan的Consistency设为Consistency.TIMELINE情况下才会发生。
2. OutOfOrderScannerNextException异常的原因是客户端和服务端维护的一个序列值不一致。每次读取收到结果,客户端的nextCallSeq++。假如某次读取请求服务端正确收到,返回结果给客户端的时候,客户端等待超时了,那么服务端的seq就会比客户端多1。
3.第一次调用call实则是在服务端上打开scan服务,并获取scannerId为之后做准备。如果你继续跟踪possiblyNextScanner,你会发现定位了新的region并构造了对应的ScannerCallableWithReplicas后主动调了一次call,就是这个原因。

更详细的过程可以继续跟踪call,其实是调用ScannerCallableWithReplicas.call,而后者维护了线程池执行任务,最终调用的是ScannerCallable.call。如果任务失败,那么ScannerCallableWithReplicas会为每一个replicaRS构造各自的ScannerCallable,然后各自起任务运行。只有第一个成功返回的replicaRS会被选中,之后的读取会被切换到这个replicaRS上。