HBase--客户端API(基础知识)二

时间:2021-09-20 08:33:59

批量操作

实际上,许多基于列表的操作,如delete(List deletes),get(Listgets),都是基于batch实现的,都是为了方便用户而保留的方法

下面是客户端API提供的批量操作的方法

void batch(List<Row> actions,Object[] results)throws IOException,InterruptedException
Objec[] batch(List<Row> actions)throws IOException,InterruptedException

ROW类是Put,Get,Delete类的父类,使用父类在List中实现多态
不可以将正对同一行的Put和Delete操作放在同一个批量处理请求中,为了保证最好的性能,这些操作的顺序可不能同步,但是这样会产生不可预料的结果.由于资源竞争,某些情况下,用户会看到波动的结果

使用批量操作的应用示例

    /**
* 使用批量操作的应用示例
*
* @throws IOException
*/

public void batchOperate() throws IOException {
HTable table = new HTable(cfg, "testtable");
byte[] row1 = Bytes.toBytes("row1");
byte[] row2 = Bytes.toBytes("row2");
byte[] cf1 = Bytes.toBytes("cf1");
byte[] cf2 = Bytes.toBytes("cf2");
byte[] cq1 = Bytes.toBytes("cq1");
byte[] cq2 = Bytes.toBytes("cq2");
List<Row> batch = Lists.newArrayList();
Put put = new Put(row2);
put.add(cf2, cq1, Bytes.toBytes("val5"));
batch.add(put);
Get get1 = new Get(row1);
get1.addColumn(cf1, cq1);
batch.add(get1);
Delete delete = new Delete(row1);
delete.deleteColumns(cf1, cq2);
batch.add(delete);
Get get2 = new Get(row2);
get2.addFamily(Bytes.toBytes("BOGUS"));
batch.add(get2);
Object[] results = new Object[batch.size()];
try {
table.batch(batch, results);
} catch (InterruptedException e) {
System.err.println("Error:" + e);
}
int i = 0;
for (Object result : results) {
System.out.println("result[" + i + "]:" + results[i]);
i++;
}
}

当使用batch()功能时,Put实例不会被写入客户端写缓冲区.batch请求是同步的,会把操作直接发送到服务器,这个动作没有延迟或者中间其他操作.这和Put()调用明显不同.


两种方法的不同点

这两中批量处理的方法的不同之处在于,一个需要用户输入包含返回结果的Object数组,而另一个函数帮组用户创建这个数组.他们的语义不同在于用户提供Object数组的方法可以访问部分结果,而由函数返回的结果的方法如果抛出异常 的话,不会有任何的返回结果,因为新结果数组返回之前,控制流就中断了.

两种方法的相同点

  1. get,put,delete都支持.在执行时出现问题,客户端抛出异常并报告问题.他们都不使用客户端写缓冲区
  2. 能够访问成功操作的结果,同时也能获取远程失败的异常

行锁

    private static final byte[] ROW1 = Bytes.toBytes("row1");
private static final byte[] CF1 = Bytes.toBytes("cf1");
private static final byte[] CQ1 = Bytes.toBytes("cq1");
private static final Configuration cfg = HBaseConfiguration.create();
private static final Logger logger = LoggerFactory.getLogger(HBaseBlock.class);

public static void main(String[] args) throws IOException {
System.out.println("Taking out lock ...");
HTable table = new HTable(cfg, "testtable");
RowLock rowLock = table.lockRow(ROW1);
System.out.println("lock ID:" + rowLock.getLockId());
Thread thread = new Thread(new UnlockedPut());
thread.start();
System.out.println("sleep 5secs in main()...");
try {
Thread.sleep(5000);

} catch (InterruptedException e) {
logger.error("error:{}", e);
}

try {
Put put1 = new Put(ROW1, rowLock);
put1.add(CF1, CQ1, Bytes.toBytes("val1"));
table.put(put1);

Put put2 = new Put(ROW1, rowLock);
put2.add(CF1, CQ1, Bytes.toBytes("val2"));
table.put(put2);
} catch (IOException e) {
logger.error("ERROR:{}", e);
} finally {
System.out.println("Releasing lock...");
table.unlockRow(rowLock);
}


}


static class UnlockedPut implements Runnable {
@Override
public void run() {
try {
HTable table = new HTable(cfg, "testtable");
Put put1 = new Put(ROW1);
put1.add(CF1, CQ1, Bytes.toBytes("val3"));
long time = System.currentTimeMillis();
System.out.println("Thread trying to put same row now ...");
table.put(put1);
System.out.println("Wait time:" + (System.currentTimeMillis() - time) + "ms");
} catch (IOException e) {
logger.error("error:{}", e);
}
}
}

从这个例子可以看出一个显示锁是如何阻塞另一个使用隐式锁的线程的.主题线程休眠5秒,醒来之后,执行两次put操作,分别将统一列设置为不同的值.随后主线程释放锁,阻塞线程的run()方法就继续执行调用第三个put,虽然这个put操作表面上是最后执行的,但是从结果中我们可以看到这个put操作的时间戳是最小的.这是因为这个线程中额put操作是在主线程中的put操作之前执行的,这之后主线程休眠了5秒.当put被发送到服务器时,如果他的时间戳没有被显示的设定,服务器会帮它设定时间戳,同事试图获取这一行的锁.但是示例代码中主线程已经获得了改行的锁,因此服务端的处理多等待了5秒,锁被释放才得以继续

get操作是不需要锁的,而是应用一个多版本的并发控制机制来保证行级读操作

Scan(扫描)

  1. HTable的getScanner()方法,此方法返回真正的扫瞄器,并且可以使用它来进行迭代操作

    ResultScanner getScanner(Scan scan)throws IOException
    ResultScanner getScanner(byte[] family) throws IOException;
    ResultScanner getScanner(byte[]family,byte[] qualifier) throws IOException
  2. Scan 构造器
Scan()
Scan(byte[] startRow,Filter filter)
Scan(byte[] startRow,byte[] stopRow)

用户可以选择性的 提供startRow参数,来定义扫描读取HBase表的起始行键,即行键不是必须指定的,同时可以选择stopRow来限定读取到何处停止

ResultScanner类

扫描操作不会通过一次RPC请求返回所有的行,而是以行为单位进行返回.行的数目过大,可能有上千条甚至更多,同时在一次请求中返回大量的数据,会占用大量的系统资源并且也是非常耗时的.
ResultScanner把扫描操作转换为类似的get操作,它将每一行数据封装成一个Result实例,并将所有的Result实例放入一个迭代器中

scan操作实例

public void scannData() {
HTable table = null;
ResultScanner scanner = null;

try {
table = new HTable(cfg, "testtable");
} catch (IOException e) {

}

try {
Scan scan = new Scan();
scanner = table.getScanner(scan);
for (Result result : scanner) {
System.out.println(result);
}
} catch (Exception e) {
logger.error("Error:{}", e);

} finally {
if (scanner != null) {
scanner.close();
}
}

ResultScanner scanner2 = null;
try {
Scan scan2 = new Scan();
scan2.addFamily(Bytes.toBytes("cf1"));
scanner2 = table.getScanner(scan2);
for (Result result : scanner2) {
System.out.println(result);
}
} catch (IOException e) {
logger.error("Error:{}", e);
} finally {
scanner2.close();
}

ResultScanner scanner3 = null;
try {
Scan scan3 = new Scan();
scan3.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("col-5")).
addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("col-33")).
setStartRow(Bytes.toBytes("row-10")).
setStopRow(Bytes.toBytes("row-20"));
scanner3 = table.getScanner(scan3);
for (Result result : scanner3) {
System.out.println(result);
}
} catch (IOException e) {
logger.error("Error:{}", e);
} finally {
scanner3.close();
}

}