前面提到的scan
操作支持设置过滤器,筛选需要返回的结果。下面看一个简单的例子:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.List;
public class TestHBase {
public static void main(String[] argv) throws Exception {
// 建立连接
System.out.println("connecting...");
Configuration config = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf("test"));
System.out.println("connect ok");
// 添加数据
List<Put> puts = new ArrayList<Put>();
puts.add(put("A", "Alice", "20"));
puts.add(put("B", "Bob", "21"));
puts.add(put("C", "Charley", "22"));
table.put(puts);
System.out.println("put ok");
// 用正则作为过滤器进行扫描
Scan scan = new Scan();
scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("[AC]")));
ResultScanner scanner = table.getScanner(scan);
for(Result e : scanner) {
System.out.println(Bytes.toString(e.getRow()));
}
// 断开连接
table.close();
connection.close();
}
public static Put put(String row, String name, String age) {
Put result = new Put(Bytes.toBytes(row));
result.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("name"), Bytes.toBytes(name));
result.addColumn(Bytes.toBytes("test_family"), Bytes.toBytes("age"), Bytes.toBytes(age));
return result;
}
}
从例子中可以看到在Scan
请求中通过setFilter
设置过滤器。这个例子中用了RowFilter
表示对行健进行过滤,CompareFilter.CompareOp.EQUAL
表示相等,跟后面的正则表达式比较器组合起来表示“匹配正则表达式”。RegexStringComparator
表示用正则表达式进行匹配,[AC]
表示行健包含A
或者C
的正则表达式。
过滤器在服务端完成过滤,然后将符合结果的数据返回给客户端,因此过滤能够减少网络开销提升性能。
过滤器主要有以下几种:
-
RowFilter
表示对行健进行过滤。 -
FamilyFilter
表示对列族进行过滤。过滤方式是比较列族的名称。 -
QualifierFilter
表示对列名进行过滤。 -
ValueFilter
表示对单元格的值进行过滤。 -
DependentColumnFilter
可以指定一个参考列,只返回时间戳与参考列相同的列。(不清楚哪种场景适用) -
SingleColumnValueFilter
可以指定一个参考列,当参考列满足指定条件时才返回一整行。 -
SingleColumnValueExcludeFilter
当参考列不需要返回给客户端时可以用这个过滤器,性能稍微好一些。 -
PrefixFilter
只返回行键符合指定前缀的行。 -
PageFilter(x)
表示限定返回的结果数量最多为x。但不能保证返回的结果数量一定小于等于x,因为跨Region分别执行得到的结果合并之后就多出来了。配合Scan.setStartRow
可以做出分页的效果。 -
KeyOnlyFilter
表示只需要返回KeyValue
中的键,不需要具体内容。前面讲过KeyValue
中包含行键、列族、列名、版本(时间戳)、类型(Put、Delete、DeleteFamily、DeleteColumn)、值的长度。 -
FirstKeyOnlyFilter
表示只需要返回一行中第一个KeyValue
的键即可。适合行数统计场景。 -
InclusiveStopFilter
表示扫描结果包含结束行。默认扫描区间是左闭右开的区间,通过这个过滤器实现左右都闭的区间。 -
TimestampsFilter
表示精确的时间戳过滤,匹配指定的时间戳时才返回结果。 -
ColumnCountGetFilter
对于get
操作,表示最多返回指定的列数。对于扫描操作,当其中一行数据的列数超过指定数量时停止扫描。所以不太适合扫描操作使用。 -
ColumnPaginationFilter
用于对列进行分页。可以指定起始列和结束咧。 -
ColumnPrefixFilter
对列名称进行前缀匹配过滤。 -
RandomRowFilter
对返回的结果进行随机取样,可以指定取样比例。
以下几种是Filter
装饰器,可以将Filter
进行组合做出更复杂的条件:
-
SkipFilter
接受一个Filter
参数,表示被包装的过滤器。当被包装的过滤器判定一个单元格不符合条件时,整行数据都不返回。 -
WhileMatchFilter
接受一个Filter
参数。当被包装的过滤器判定一个单元格不符合条件时,立即停止扫描。 -
FilterList
可以组合多个Filter
,通过and逻辑或者or逻辑将条件进行组合。
自定义过滤器。通过集成Filter
类,可以实现自定义过滤器。实现完成后需要部署到服务端才能使用。以下是过滤器执行的顺序:
-
filterRowKey
对行键进行过滤 -
filterKeyValue
对单元格中的数据进行过滤 -
filterRow
根据整行的数据对此行进行过滤 -
reset
一行结束后调用此方法 -
filterAllRemaining
一行结束后调用此方法,返回true表示结束整个扫描操作
计数器
HBase支持increment
操作,实现计数器的功能。计数器在HBase中用二进制的格式储存。因此客户端读取值之后需要通过Bytes.getInt
取值。
协处理器
可以在服务端运行代码,将运行结果返回给客户端。使用场景有辅助索引、鉴权、数据统计。
自定义的协处理器可以监听region的变化事件、WAL数据事件、Master集群级别的事件,可以设置endpoint支持自定义的数据计算流程。
有三个重要的类用于协处理器:Coprocessor
、CoprocessorEnvironment
、CoprocessorHost
。
Coprocessor
是协处理器的主体,里面有:
- start
、stop
方法,启动、关闭时会被框架调用
- State枚举,定义了协处理器的状态
CoprocessorEnvironment
提供了协处理器的环境,它有以下方法:
-
getHBaseVersion
获取HBase版本 -
getInstance
返回加载的协处理器实例 -
getPriority
获取优先级 -
getLoadSequence
返回协处理器的执行顺序 -
getTable
获取HTable接口,用于访问数据
CoprocessorHost
维护协处理器的实例和他们专用的环境。在新版中已经没有了。
加载协处理器
在hbase-site.xml
中可以添加协处理器,以下是配置内容:
<property><!-- 监听region变化事件 -->
<name>hbase.coprocessor.region.classes</name>
<value>coprocessor.RegionObserverExample,coprocessor.AnotherCoprocessor</value>
</property>
<property><!-- 监听集群级别的集群管理事件 -->
<name>hbase.coprocessor.master.classes</name>
<value>coprocessor.MasterObserverExample</value>
</property>
<property><!-- 监听WAL的数据变化事件 -->
<name>hbase.coprocessor.wal.classes</name>
<value>coprocessor.WALObserverExample</value>
</property>
RegionObserver
Region协处理器可用于处理region的声明周期事件,有以下一些生命周期:
- pending open状态会触发事件:
preOpen
postOpen
- open状态会触发事件:
preWALRestore
postWALRestore
preFlush
postFlush
preCompact
postCompact
preSplit
postSplit
- pending close状态会触发事件:
preClose
postClose
Region协处理器也可用于处理客户端的请求:
-
preGet
postGet
-
prePut
postPut
- 还有delete checkAndPut getClosestRowBefore exists incrementColumnValue invrement scannerOpen scannerNex scannerClose事件
RegionObserver
继承了RegionCoprocessorEnvironment
,因此有以下方法可用:
-
getRegion
档获取当前监听的region -
getRegionServerServices
获取共享的服务
ObserverContext
是所有协处理器实例共享的,它提供以下方法:
-
getEnvironment
获取当前的环境引用 -
bypass
跳过HBase框架的处理流程,只用自己实现的流程 -
complete
跳过后续所有的协处理器 -
shouldBypass
框架调用此方法用于检查 -
shouldComplete
框架调用此方法用于检查
比如以下例子停止region的自动拆分:
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) {
e.bypass();
}
MasterObserver
Master协处理器用于处理集群事件,它能监听如下事件:
createTable
deleteTable
modifyTable
addColumn
modifyColumn
deleteColumn
enableTable
disableTable
move
assign
unassign
balance
-
balanceSwitch
:切换自动负载平衡的标志位前后调用 shutdown
stopMaster
endpoint
可以给表增加一些原本不存在的操作,比如统计表的行数。