1、概述
(1)、HBase 所有修改数据的操作保证行级别的原子性。
(2)、用户应该尽量使用批处理(batch)更新,减少单独操作同一行数据的次数。
(3)、创建表有代价,所以只创建一个HTable,一般在应用程序开始时创建
(4)、使用HTablePool,复用多个实例,例子
int maxSize = 20; HTablePool pool = new HTablePool(conf,maxSize); HTableInterface tableInterface = pool.getTable(strTableName); Put put = new Put(Bytes.toBytes("row-11")); put.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-a")); put.add(Bytes.toBytes("cf2"), Bytes.toBytes("b"), Bytes.toBytes("value-b")); tableInterface.put(put);
2、PUT操作
(1) . 多个版本
查看多版本:scan 'testtable', { VERSIONS => 3 }
(2). KeyValue 类,元数据内容
row-key / family:qualifier / version / type / value-length
(3). 客户端的写缓冲区
每一个put操作都是一个RPC操作,启用写缓冲区可将多个put一次RPC送往服务器:
table.setAutoFlush(false);
若需要将缓冲区数据强制写入服务端:
table.flushCommits()
设置写缓冲区大小
table.setWriteBufferSize(long writeBufferSize) // 默认为 2 M
也可以设置hbase-site.xml 中的 hbase.client.write.buffer
(4). 错误处理
如果Put 一个列表,里面有一个错误的(使用了不存在的列族);
Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis()); put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11")); Put put2 = new Put(Bytes.toBytes("22222"), System.currentTimeMillis()); put2.add(Bytes.toBytes("noExist"), Bytes.toBytes("a"), Bytes.toBytes("value-y22")); Put put3 = new Put(Bytes.toBytes("33333"), System.currentTimeMillis()); put3.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33")); list.add(put1); list.add(put2); list.add(put3); tableInterface.put(list);抛出以下异常:
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException: Column family noExist does not exist in region testtable
但是其余两个Put成功:
hbase(main):005:0> scan 'testtable'
ROW COLUMN+CELL
11111 column=cf1:a, timestamp=1416194410866, value=value-x11
33333 column=cf1:a, timestamp=1416194410866, value=value-x33
如果Put 一个列表,里面有一个为空 ;
Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis()); put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11")); Put put2 = new Put(Bytes.toBytes("22222"), System.currentTimeMillis()); put2.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-y22")); Put put3 = new Put(Bytes.toBytes("33333"), System.currentTimeMillis()); put3.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33")); Put put4 = new Put(Bytes.toBytes("44444"), System.currentTimeMillis()); list.add(put1); list.add(put2); list.add(put3); list.add(put4); tableInterface.put(list);将抛出异常:Exception in thread "main" java.lang.IllegalArgumentException: No columns to insert
因为这个错误由客户端检查发现,因此将没有内容写入到数据库。
(5). Put 写入List, 不保证写入的顺序。
(6). 原子操作
Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis()); put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11")); tableInterface.checkAndPut(Bytes.toBytes("11111"), Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"), put1);
3. Get 操作
只获取一个
Get get = new Get(Bytes.toBytes("row-11")); get.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a")); get.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b")); Result result = table.get(get); byte[] value1 = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("a")); byte[] value2 = result.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("b")); System.out.println("cf1:a=" + Bytes.toString(value1)); System.out.println("cf2:b=" + Bytes.toString(value2));
获取多个
Get get1 = new Get(Bytes.toBytes("row-11")); get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a")); get1.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b")); Get get2 = new Get(Bytes.toBytes("row-22")); get2.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a")); get2.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b")); List<Get> listGet = new ArrayList<Get>(); listGet.add(get1); listGet.add(get2); Result[] resutls = table.get(listGet); for (Result result : resutls) { byte[] row = result.getRow(); byte[] value1 = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("a")); byte[] value2 = result.getValue(Bytes.toBytes("cf2"), Bytes.toBytes("b")); System.out.println("rowkey=" + Bytes.toString(row)+", cf1:a=" + Bytes.toString(value1)); System.out.println("rowkey=" + Bytes.toString(row)+", cf2:b=" + Bytes.toString(value2)); }
如果 table.get(listGet) 操作中, listGet 之中有一个错误,则整个操作终止并抛出异常。
查找某行或者某行之前的一行:
Result rowOrBefore = table.getRowOrBefore(Bytes.toBytes("row-22"), Bytes.toBytes("cf1"));
4、DELETE 操作
同样也可以delete某1行或者多行,删除特定版本或者多个版本,整个列族或者某个列
也有原子操作:
Delete delete = new Delete(Bytes.toBytes("row-11")); table.checkAndDelete(Bytes.toBytes("row-11"),Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x33"), delete);
5、批量处理 Batch
可以同时进行Put、Get、Delete 操作
List<Row> list = new ArrayList<Row>(); Put put1 = new Put(Bytes.toBytes("11111"), System.currentTimeMillis()); put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("a"), Bytes.toBytes("value-x11")); put1.add(Bytes.toBytes("cf2"), Bytes.toBytes("b"), Bytes.toBytes("value-y11")); Get get1 = new Get(Bytes.toBytes("11111")); get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a")); Get get2 = new Get(Bytes.toBytes("33333")); get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a")); list.add(put1); list.add(get1); list.add(get2); Object[] results = new Object[list.size()]; try { tableInterface.batch(list, results); } catch (InterruptedException ex) { Logger.getLogger(BatchTest.class.getName()).log(Level.SEVERE, null, ex); } for(Object result:results) { System.out.println(result); }
输出:
keyvalues=NONE
keyvalues={11111/cf1:a/1416199179786/Put/vlen=9/mvcc=0}
keyvalues={33333/cf1:a/1416194410866/Put/vlen=9/mvcc=0}
Object[] results = new Object[list.size()];
tableInterface.batch(list, results); // 如果出错,此方法可以访问部分结果
Object[] results2 = tableInterface.batch(list); // 如果出错,此方法不会有任何结果
6、行锁
设置 hbase.regionserver.lease.period 可修改锁超时时间。
在行上创建一个锁,该锁阻塞索引的并发读取。
7、扫描scan
设置 hbase.regionserver.lease.period 可修改扫描器超时时间。
Scan 例子:
Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes("11111")); scan.setStopRow(Bytes.toBytes("333333")); scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a")); scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b")); ResultScanner scanner = htable.getScanner(scan); for(Result result : scanner) { System.out.println(result); } scanner.close();
以上的Scan方法,获取每一个Result 都会生成一个单独的RPC 请求,性能将不会很高。
可用两种方法打开扫描器缓存:
(1). 修改 hbase.client.scanner.caching
(2). 代码中设置
Scan scan = new Scan(); scan.setCaching(10); scan.setStartRow(Bytes.toBytes("11111")); scan.setStopRow(Bytes.toBytes("333333")); scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a")); scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b")); ResultScanner scanner = htable.getScanner(scan); for(Result result : scanner) { System.out.println(result); } scanner.close();注意:当数据量非常大的行,这些行有可能超过客户算进程的内存容量,可以如下控制批量获取:
Scan scan = new Scan(); scan.setCaching(10); scan.setBatch(5); scan.setStartRow(Bytes.toBytes("11111")); scan.setStopRow(Bytes.toBytes("333333")); scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("a")); scan.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("b")); ResultScanner scanner = htable.getScanner(scan); for(Result result : scanner) { System.out.println(result); } scanner.close();
setBatch(5):每次返回的 Result 包含5 个列