Hbase 笔记(3) 客户端API基础

时间:2021-12-05 23:19:24

1、概述

(1)、HBase 所有修改数据的操作保证行级别的原子性。

(2)、用户应该尽量使用批处理(batch)更新,减少单独操作同一行数据的次数。

(3)、创建表有代价,所以只创建一个HTable,一般在应用程序开始时创建

(4)、使用HTablePool,复用多个实例,例子

        int maxSize = 20;
HTablePool pool = new HTablePool(conf,maxSize);

HTableInterface tableInterface = pool.getTable(strTableName);
Put put = new Put(Bytes.toBytes("row-11"));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-a"));
put.add(Bytes.toBytes("cf2"), Bytes.toBytes("b"), Bytes.toBytes("value-b"));
tableInterface.put(put);

2、PUT操作

(1) .  多个版本

查看多版本:scan  'testtable',  { VERSIONS => 3 }

(2). KeyValue 类,元数据内容

row-key / family:qualifier / version / type / value-length

(3). 客户端的写缓冲区

每一个put操作都是一个RPC操作,启用写缓冲区可将多个put一次RPC送往服务器:

table.setAutoFlush(false);

若需要将缓冲区数据强制写入服务端:

table.flushCommits()

设置写缓冲区大小

table.setWriteBufferSize(long writeBufferSize)  // 默认为 2 M

也可以设置hbase-site.xml 中的 hbase.client.write.buffer

(4). 错误处理

 如果Put 一个列表,里面有一个错误的(使用了不存在的列族);

        Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));

Put put2 = new Put(Bytes.toBytes("22222"), System.currentTimeMillis());
put2.add(Bytes.toBytes("noExist"), Bytes.toBytes("a"), Bytes.toBytes("value-y22"));

Put put3 = new Put(Bytes.toBytes("33333"), System.currentTimeMillis());
put3.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"));

list.add(put1);
list.add(put2);
list.add(put3);

tableInterface.put(list);
抛出以下异常:

org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException: Column family noExist does not exist in region testtable

但是其余两个Put成功

hbase(main):005:0> scan 'testtable'
ROW                   COLUMN+CELL                                                 
 11111                column=cf1:a, timestamp=1416194410866, value=value-x11      
 33333                column=cf1:a, timestamp=1416194410866, value=value-x33 


 如果Put 一个列表,里面有一个为空  ;

        Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));

Put put2 = new Put(Bytes.toBytes("22222"), System.currentTimeMillis());
put2.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-y22"));

Put put3 = new Put(Bytes.toBytes("33333"), System.currentTimeMillis());
put3.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"));

Put put4 = new Put(Bytes.toBytes("44444"), System.currentTimeMillis());

list.add(put1);
list.add(put2);
list.add(put3);
list.add(put4);

tableInterface.put(list);
将抛出异常:Exception in thread "main" java.lang.IllegalArgumentException: No columns to insert

因为这个错误由客户端检查发现,因此将没有内容写入到数据库。

(5). Put 写入List, 不保证写入的顺序。

(6). 原子操作

Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));
tableInterface.checkAndPut(Bytes.toBytes("11111"),
Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"), put1);


3. Get 操作

只获取一个

        Get get = new Get(Bytes.toBytes("row-11"));
get.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
get.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));

Result result = table.get(get);

byte[] value1 = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
byte[] value2 = result.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
System.out.println("cf1:a=" + Bytes.toString(value1));
System.out.println("cf2:b=" + Bytes.toString(value2));

获取多个

        Get get1 = new Get(Bytes.toBytes("row-11"));
get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
get1.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));

Get get2 = new Get(Bytes.toBytes("row-22"));
get2.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
get2.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));

List<Get> listGet = new ArrayList<Get>();
listGet.add(get1);
listGet.add(get2);

Result[] resutls = table.get(listGet);

for (Result result : resutls) {
byte[] row = result.getRow();
byte[] value1 = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
byte[] value2 = result.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
System.out.println("rowkey=" + Bytes.toString(row)+", cf1:a=" + Bytes.toString(value1));
System.out.println("rowkey=" + Bytes.toString(row)+", cf2:b=" + Bytes.toString(value2));
}

如果  table.get(listGet) 操作中, listGet 之中有一个错误,则整个操作终止并抛出异常。


查找某行或者某行之前的一行:

        Result rowOrBefore = table.getRowOrBefore(Bytes.toBytes("row-22"), Bytes.toBytes("cf1"));


4、DELETE 操作

同样也可以delete某1行或者多行,删除特定版本或者多个版本,整个列族或者某个列

也有原子操作: 

        Delete delete = new Delete(Bytes.toBytes("row-11"));
table.checkAndDelete(Bytes.toBytes("row-11"),Bytes.toBytes("cf1"),
Bytes.toBytes("a"), Bytes.toBytes("value-x33"), delete);

5、批量处理 Batch

可以同时进行Put、Get、Delete 操作

        List<Row> list = new ArrayList<Row>();
Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));
put1.add(Bytes.toBytes("cf2"), Bytes.toBytes("b"), Bytes.toBytes("value-y11"));
Get get1 = new Get(Bytes.toBytes("11111"));
get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
Get get2 = new Get(Bytes.toBytes("33333"));
get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
list.add(put1);
list.add(get1);
list.add(get2);

Object[] results = new Object[list.size()];
try {
tableInterface.batch(list, results);
} catch (InterruptedException ex) {
Logger.getLogger(BatchTest.class.getName()).log(Level.SEVERE, null, ex);
}
for(Object result:results)
{
System.out.println(result);
}

输出:

keyvalues=NONE
keyvalues={11111/cf1:a/1416199179786/Put/vlen=9/mvcc=0}
keyvalues={33333/cf1:a/1416194410866/Put/vlen=9/mvcc=0}

Object[]  results = new Object[list.size()];
tableInterface.batch(list, results);                      // 如果出错,此方法可以访问部分结果        
Object[]  results2 = tableInterface.batch(list);   // 如果出错,此方法不会有任何结果


6、行锁

设置 hbase.regionserver.lease.period 可修改锁超时时间。

在行上创建一个锁,该锁阻塞索引的并发读取。


7、扫描scan

设置 hbase.regionserver.lease.period 可修改扫描器超时时间。

Scan 例子: 

        Scan scan = new Scan(); 
scan.setStartRow(Bytes.toBytes("11111"));
scan.setStopRow(Bytes.toBytes("333333"));
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
ResultScanner scanner = htable.getScanner(scan);
for(Result result : scanner)
{
System.out.println(result);
}
scanner.close();


以上的Scan方法,获取每一个Result 都会生成一个单独的RPC 请求,性能将不会很高。

可用两种方法打开扫描器缓存:

(1). 修改 hbase.client.scanner.caching

(2).  代码中设置 

        Scan scan = new Scan();
scan.setCaching(10);
scan.setStartRow(Bytes.toBytes("11111"));
scan.setStopRow(Bytes.toBytes("333333"));
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
ResultScanner scanner = htable.getScanner(scan);
for(Result result : scanner)
{
System.out.println(result);
}
scanner.close();
注意:当数据量非常大的行,这些行有可能超过客户算进程的内存容量,可以如下控制批量获取:

        Scan scan = new Scan();
scan.setCaching(10);
scan.setBatch(5);
scan.setStartRow(Bytes.toBytes("11111"));
scan.setStopRow(Bytes.toBytes("333333"));
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
ResultScanner scanner = htable.getScanner(scan);
for(Result result : scanner)
{
System.out.println(result);
}
scanner.close();
 


setCaching(10): 每次服务器端向客户端传输 10 行 Result 

setBatch(5):每次返回的 Result  包含5 个列