HBase--客户端API(基础知识)

时间:2021-10-09 08:31:41

CRUD操作

put方法

put方法被分为两类:一类用于单行,另一类操作用户多行

单行put

void put(Put put ) throws Exception;

向HBase中插入数据示列

public static void put(String tablename, String row, String columnFamily, String cloumn, String data) throws IOException {
HTable table = new HTable(cfg, tablename);
Put p1 = new Put(Bytes.toBytes(row));
p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(cloumn), Bytes.toBytes(data));
table.put(p1);
System.out.println("put" + row + ",cloumnFamily" + ":" + cloumn + "," + data);
}

通过客户端代码访问配置文件

应用程序需要通过默认位置(classpath)下的hbase-site.xml文件来获知要访问的集群,同样也可以在代码中指定要访问的集群地址


无论哪种方式都需要HBaseConfiguration 类来处理配置属性 ,可以用一下方法来创建Configuration实列

static Configuration create();
static Configuration create(Configuration that);

当你调用任意一个静态方法创建Configuration实例时,代码会尝试使用当前的java classpath来加载两个配置文件:hbase-default.xml hbase-site.xml
如果使用create(Configuration that)创建一个已存在的配置,那么用户指定的配置优先级高于任何从classpath加载进来的配置


可以简单的忽略 任务外部的客户端配置文件,而直接在代码中设置hbase.zookeeper.quorum属性.这样就创建一个不需要额外配置的客户端

客户端写缓冲区

每一个put都是一个RPC操作,它将客户端额数据传送到服务器然后返回.这只适合少量数据的要求,试想没秒钟有上千行数据需要插入到HBase中,这样的数据就不能满足要求了.

减少独立RPC的调用关键是减少往返时间,另外一个重要因素就是消息的大小.如果传送的消息很小,比如一个计数器之类的,那么用户把多次修改的数据批量提交给服务器并减少请求次数,这样就能一定程度上就会有相应提升

HBase的API配置了一个客户端的写缓冲区,缓冲区负责收集Put操作,然后调用RPC一次性将数据提交到服务器

void setAutoFlush(boolean autoFlush)
boolean isAutoFlush()

默认情况下,客户端是禁用缓冲区的.可以将自动刷写autoFlush设置为false,来激活缓冲区.激活缓冲区之后,数据存储到HBase,此时操作不会产生RPC调用,因为存储的Put实例保存在客户端进程的内存中.当需要强制把 数据写到服务器端的时,可以调用以下函数:

void flushCommits() throws IOException

使用客户端写缓冲区

    /**
* 使用客户端写缓冲区
*/

public void putWithBuffer() throws IOException {
HTable table = new HTable(cfg, "testtable");
System.out.println("Auto Flush:" + table.isAutoFlush());
table.setAutoFlush(false);

Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), Bytes.toBytes("val1"));
table.put(put1);

Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("cf2"), Bytes.toBytes("cq2"), Bytes.toBytes("val2"));
table.put(put2);

Put put3 = new Put(Bytes.toBytes("row3"));
put3.add(Bytes.toBytes("cf3"), Bytes.toBytes("cq3"), Bytes.toBytes("val3"));
table.put(put3);

Get get = new Get(Bytes.toBytes("row1"));
Result res1 = table.get(get);
System.out.println("Result1:" + res1); // 不能得到任何返回值

table.flushCommits();

Result res2 = table.get(get);
System.out.println("Result1:" + res2);//将会得到row1的值

}

第一个get()返回的NONE,客户端的写缓冲区是一个内存结构,存储了所有没有刷写的记录,这些记录尚没有发送到服务器,因此用户无法访问它

put列表

void put(List<Put> puts) throws IOException;

Put列表操作

/**
* 使用列表想HBase中添加数据
*/

public void addListPuts() throws IOException {
HTable table = new HTable(cfg, "testtable");
List<Put> puts = Lists.newArrayList();
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), Bytes.toBytes("val1"));
Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("cf2"), Bytes.toBytes("cq2"), Bytes.toBytes("val2"));
Put put3 = new Put(Bytes.toBytes("row3"));
put3.add(Bytes.toBytes("cf3"), Bytes.toBytes("cq3"), Bytes.toBytes("val3"));
puts.add(put1);
puts.add(put2);
puts.add(put3);

try {
table.put(puts);
} catch (IOException e) {
System.err.println("Error:" + e);
table.flushCommits();//如果失败自动提交客户端缓冲区的内容
}

}

原子性操作compare-and-set

有一种特别有用的操作,能保证自身操作的原子性,检查写(check and put) 该方法的签名如下:

public boolean checkAndPut(final byte [] row,
final byte [] family, final byte [] qualifier, final byte [] value,
final Put put)

如果检查成功就执行put操作,,否则就彻底放弃修改操作,这种方法用户,需要检查现有的相关值,决定是否需要进行修改操作.
一旦你想把处理好的结果写回HBase,并保证没有其他客户端已经做了同样的事情,就可以使用这个原子性的操作,先比较原值,再进行修改.

示例

/**
* compareAndSet原子性操作
* @throws IOException
*/

public void cmpAndSet() throws IOException {
HTable table = new HTable(cfg, "testtable");
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), Bytes.toBytes("val1"));
boolean res1 = table.checkAndPut(Bytes.toBytes("row1"), Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), null, put1);
System.out.println("put applied:" + res1);//不存在,true
boolean res2 = table.checkAndPut(Bytes.toBytes("row1"), Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), null, put1);
System.out.println("put applied:" + res2);//第二次存在,false

Put put2 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("cq2"), Bytes.toBytes("val2"));

boolean res3 = table.checkAndPut(Bytes.toBytes("row1"), Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), Bytes.toBytes("val1"), put2);
System.out.println("put applied:" + res2);//上一次put值存在,true

Put put3 = new Put(Bytes.toBytes("row2"));//创建一个put ,使用不同的行键
put1.add(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), Bytes.toBytes("val3"));
boolean res4 = table.checkAndPut(Bytes.toBytes("row1"), Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), Bytes.toBytes("val1"), put3);//检查不同的行的值是否相等,然后写入另一行
System.out.println("Put applied:" + res4);//会在上面的 checkAndPut 抛异常
}

get方法

单行get

操作示意

    /**
* 从HBase中获取数据 单行Get
*/

public void GetARow() throws IOException {
HTable htable = new HTable(cfg, "testtable");
Get get = new Get(Bytes.toBytes("row1"));
get.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"));
Result result = htable.get(get);
byte[] val = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"));
System.out.println("Value:" + Bytes.toString(val));
}

get列表

使用列表参数额get方法和使用列表参数的put方法类似,用户可以一次请求多行数据.对应的API

Result[] get(List<Get>gets)throws IOException

使用Get列表从HBase中获取数据

    /**
* 使用Get列表参数获取数据
*/

public void getList() throws IOException {
HTable hTable = new HTable(cfg, "testtable");
List<Get> gets = Lists.newArrayList();
Get get1 = new Get(Bytes.toBytes("row1"));
get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"));
gets.add(get1);

Get get2 = new Get(Bytes.toBytes("row2"));
get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"));
gets.add(get2);

Get get3 = new Get(Bytes.toBytes("row2"));
get1.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("cq2"));
gets.add(get3);

Result[] results = hTable.get(gets);
for (Result result : results) {
String row = Bytes.toString(result.getRow());
System.out.println("Row:" + row + " ");
byte[] val ;
if (result.containsColumn(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"))) {
val = result.getValue(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"));
System.out.println("Value:" + val);
}
}
System.out.println("Second iteration...");
for (Result result : results) {
for (KeyValue kv : result.raw()) {
System.out.println("Row:" + Bytes.toString(kv.getRow()) + "Value:" + Bytes.toString(kv.getValue()));
}
}
}

get()方法要么返回与给定列表大小一致的Result数组,要么抛出一个异常.对于批量操作的局部错误,有一种更为精细的处理方法,即使用batch()方法

使用特殊方法检索数据

    /**
* 使用特殊方法检索数据
*/

public void querData() throws IOException {
HTable table = new HTable(cfg, "testtable");
Result result1 = table.getRowOrBefore(Bytes.toBytes("row1"), Bytes.toBytes("cf1"));
System.out.println("Found:" + Bytes.toString(result1.getRow()));//尝试查找已存在的行

Result result2 = table.getRowOrBefore(Bytes.toBytes("row99"), Bytes.toBytes("cf1"));
System.out.println("Found:" + Bytes.toString(result2.getRow()));//将返回最后一行数据

for (KeyValue kv : result2.raw()) {
System.out.println("col1:" + Bytes.toString(kv.getFamily()) + "/" + Bytes.toString(kv.getQualifier()) + ",Value:" + Bytes.toString(kv.getValue()));
}

//查找abc 或上一行 没有匹配到返回null
Result result3 = table.getRowOrBefore(Bytes.toBytes("abc"), Bytes.toBytes("cf1"));
System.out.println("Found:" + result3);
}

delete

delete操作示意

    /**
* 从hbase中删除数据的应用示例
*/

public void deleteData() throws IOException {
HTable table = new HTable(cfg, "testtable");
Delete delete = new Delete(Bytes.toBytes("row1"));
delete.setTimestamp(1);//设置时间戳
delete.deleteColumn(Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), 1);//删除一列中的特定版本
delete.deleteColumns(Bytes.toBytes("cf2"), Bytes.toBytes("cq1"));//删除一列中的全部版本
delete.deleteColumns(Bytes.toBytes("cf2"), Bytes.toBytes("cq3"), 15);//删除一列中给定的版本和所有更旧的版本
delete.deleteFamily(Bytes.toBytes("cf3"));
delete.deleteFamily(Bytes.toBytes("cf3"), 3);
table.delete(delete);
table.close();
}

如果尝试删除未设置时间戳的单元格,什么都不发生.列如,某一列有两个版本,版本10,版本20,删除版本15将不会影响现存的任何版本