1、概述
(1)、HBase 所有修改数据的操作保证行级别的原子性。
(2)、用户应该尽量使用批处理(batch)更新,减少单独操作同一行数据的次数。
(3)、创建表有代价,所以只创建一个HTable,一般在应用程序开始时创建
(4)、使用HTablePool,复用多个实例,例子
int maxSize = 20;
HTablePool pool = new HTablePool(conf,maxSize);
HTableInterface tableInterface = pool.getTable(strTableName);
Put put = new Put(Bytes.toBytes("row-11"));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-a"));
put.add(Bytes.toBytes("cf2"), Bytes.toBytes("b"), Bytes.toBytes("value-b"));
tableInterface.put(put);
2、PUT操作
(1) . 多个版本
查看多版本:scan 'testtable', { VERSIONS => 3 }
(2). KeyValue 类,元数据内容
row-key / family:qualifier / version / type / value-length
(3). 客户端的写缓冲区
每一个put操作都是一个RPC操作,启用写缓冲区可将多个put一次RPC送往服务器:
table.setAutoFlush(false);
若需要将缓冲区数据强制写入服务端:
table.flushCommits()
设置写缓冲区大小
table.setWriteBufferSize(long writeBufferSize) // 默认为 2 M
也可以设置hbase-site.xml 中的 hbase.client.write.buffer
(4). 错误处理
如果Put 一个列表,里面有一个错误的(使用了不存在的列族);
Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());抛出以下异常:
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));
Put put2 = new Put(Bytes.toBytes("22222"), System.currentTimeMillis());
put2.add(Bytes.toBytes("noExist"), Bytes.toBytes("a"), Bytes.toBytes("value-y22"));
Put put3 = new Put(Bytes.toBytes("33333"), System.currentTimeMillis());
put3.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"));
list.add(put1);
list.add(put2);
list.add(put3);
tableInterface.put(list);
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException: Column family noExist does not exist in region testtable
但是其余两个Put成功:
hbase(main):005:0> scan 'testtable'
ROW COLUMN+CELL
11111 column=cf1:a, timestamp=1416194410866, value=value-x11
33333 column=cf1:a, timestamp=1416194410866, value=value-x33
如果Put 一个列表,里面有一个为空 ;
Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());将抛出异常:Exception in thread "main" java.lang.IllegalArgumentException: No columns to insert
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));
Put put2 = new Put(Bytes.toBytes("22222"), System.currentTimeMillis());
put2.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-y22"));
Put put3 = new Put(Bytes.toBytes("33333"), System.currentTimeMillis());
put3.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"));
Put put4 = new Put(Bytes.toBytes("44444"), System.currentTimeMillis());
list.add(put1);
list.add(put2);
list.add(put3);
list.add(put4);
tableInterface.put(list);
因为这个错误由客户端检查发现,因此将没有内容写入到数据库。
(5). Put 写入List, 不保证写入的顺序。
(6). 原子操作
Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));
tableInterface.checkAndPut(Bytes.toBytes("11111"),
Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"), put1);
3. Get 操作
只获取一个
Get get = new Get(Bytes.toBytes("row-11"));
get.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
get.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
Result result = table.get(get);
byte[] value1 = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
byte[] value2 = result.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
System.out.println("cf1:a=" + Bytes.toString(value1));
System.out.println("cf2:b=" + Bytes.toString(value2));
获取多个
Get get1 = new Get(Bytes.toBytes("row-11"));
get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
get1.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
Get get2 = new Get(Bytes.toBytes("row-22"));
get2.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
get2.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
List<Get> listGet = new ArrayList<Get>();
listGet.add(get1);
listGet.add(get2);
Result[] resutls = table.get(listGet);
for (Result result : resutls) {
byte[] row = result.getRow();
byte[] value1 = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
byte[] value2 = result.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
System.out.println("rowkey=" + Bytes.toString(row)+", cf1:a=" + Bytes.toString(value1));
System.out.println("rowkey=" + Bytes.toString(row)+", cf2:b=" + Bytes.toString(value2));
}
如果 table.get(listGet) 操作中, listGet 之中有一个错误,则整个操作终止并抛出异常。
查找某行或者某行之前的一行:
Result rowOrBefore = table.getRowOrBefore(Bytes.toBytes("row-22"), Bytes.toBytes("cf1"));
4、DELETE 操作
同样也可以delete某1行或者多行,删除特定版本或者多个版本,整个列族或者某个列
也有原子操作:
Delete delete = new Delete(Bytes.toBytes("row-11"));
table.checkAndDelete(Bytes.toBytes("row-11"),Bytes.toBytes("cf1"),
Bytes.toBytes("a"), Bytes.toBytes("value-x33"), delete);
5、批量处理 Batch
可以同时进行Put、Get、Delete 操作
List<Row> list = new ArrayList<Row>();
Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis());
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11"));
put1.add(Bytes.toBytes("cf2"), Bytes.toBytes("b"), Bytes.toBytes("value-y11"));
Get get1 = new Get(Bytes.toBytes("11111"));
get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
Get get2 = new Get(Bytes.toBytes("33333"));
get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
list.add(put1);
list.add(get1);
list.add(get2);
Object[] results = new Object[list.size()];
try {
tableInterface.batch(list, results);
} catch (InterruptedException ex) {
Logger.getLogger(BatchTest.class.getName()).log(Level.SEVERE, null, ex);
}
for(Object result:results)
{
System.out.println(result);
}
输出:
keyvalues=NONE
keyvalues={11111/cf1:a/1416199179786/Put/vlen=9/mvcc=0}
keyvalues={33333/cf1:a/1416194410866/Put/vlen=9/mvcc=0}
Object[] results = new Object[list.size()];
tableInterface.batch(list, results); // 如果出错,此方法可以访问部分结果
Object[] results2 = tableInterface.batch(list); // 如果出错,此方法不会有任何结果
6、行锁
设置 hbase.regionserver.lease.period 可修改锁超时时间。
在行上创建一个锁,该锁阻塞索引的并发读取。
7、扫描scan
设置 hbase.regionserver.lease.period 可修改扫描器超时时间。
Scan 例子:
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes("11111"));
scan.setStopRow(Bytes.toBytes("333333"));
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
ResultScanner scanner = htable.getScanner(scan);
for(Result result : scanner)
{
System.out.println(result);
}
scanner.close();
以上的Scan方法,获取每一个Result 都会生成一个单独的RPC 请求,性能将不会很高。
可用两种方法打开扫描器缓存:
(1). 修改 hbase.client.scanner.caching
(2). 代码中设置
Scan scan = new Scan();注意:当数据量非常大的行,这些行有可能超过客户算进程的内存容量,可以如下控制批量获取:
scan.setCaching(10);
scan.setStartRow(Bytes.toBytes("11111"));
scan.setStopRow(Bytes.toBytes("333333"));
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
ResultScanner scanner = htable.getScanner(scan);
for(Result result : scanner)
{
System.out.println(result);
}
scanner.close();
Scan scan = new Scan();
scan.setCaching(10);
scan.setBatch(5);
scan.setStartRow(Bytes.toBytes("11111"));
scan.setStopRow(Bytes.toBytes("333333"));
scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a"));
scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b"));
ResultScanner scanner = htable.getScanner(scan);
for(Result result : scanner)
{
System.out.println(result);
}
scanner.close();
setBatch(5):每次返回的 Result 包含5 个列