HBase的协处理器及应用实战

时间:2024-03-31 16:14:43

HBase协处理器

http://hbase.apache.org/book.html#cp

 1、 起源
Hbase 作为列族数据库最经常被人诟病的特性包括:无法轻易建立“二级索引”,难以执 行求和、计数、排序等操作。比如,在旧版本的(<0.92)Hbase 中,统计数据表的总行数,需 要使用 Counter 方法,执行一次 MapReduce Job 才能得到。虽然 HBase 在数据存储层中集成
了 MapReduce,能够有效用于数据表的分布式计算。然而在很多情况下,做一些简单的相 加或者聚合计算的时候, 如果直接将计算过程放置在 server 端,能够减少通讯开销,从而获 得很好的性能提升。于是, HBase 在 0.92 之后引入了协处理器(coprocessors),实现一些激动
人心的新特性:能够轻易建立二次索引、复杂过滤器(谓词下推)以及访问控制等。

2、协处理器有两种: observer 和 endpoint

  (1) Observer 类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子, 在固定的事件发生时被调用。比如: put 操作之前有钩子函数 prePut,该函数在 put 操作
执行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数

 

以 Hbase2.0.0 版本为例,它提供了三种观察者接口:
● RegionObserver:提供客户端的数据操纵事件钩子: Get、 Put、 Delete、 Scan 等。
● WALObserver:提供 WAL 相关操作钩子。
● MasterObserver:提供 DDL-类型的操作钩子。如创建、删除、修改数据表等。
到 0.96 版本又新增一个 RegionServerObserver

下图是以 RegionObserver 为例子讲解 Observer 这种协处理器的原理:

HBase的协处理器及应用实战

 

   (2) Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处 理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常 见的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即

max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的 操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执 行,势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,
HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内 执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。
这样整体的执行效率就会提高很多
下图是 EndPoint 的工作原理:

HBase的协处理器及应用实战

 

  (3)总结

Observer 允许集群在正常的客户端操作过程中可以有不同的行为表现
Endpoint 允许扩展集群的能力,对客户端应用开放新的运算命令
observer 类似于 RDBMS 中的触发器,主要在服务端工作
endpoint 类似于 RDBMS 中的存储过程,主要在 client 端工作
observer 可以实现权限管理、优先级设置、监控、 ddl 控制、 二级索引等功能
endpoint 可以实现 min、 max、 avg、 sum、 distinct、 group by 等功能

3协处理器加载方式  

     协处理器的加载方式有两种,我们称之为静态加载方式( Static Load) 和动态加载方式 ( Dynamic Load)。 静态加载的协处理器称之为 System Coprocessor,动态加载的协处理器称 之为 Table Coprocessor
     1、静态加载 

通过修改 hbase-site.xml 这个文件来实现, 启动全局 aggregation,能过操纵所有的表上 的数据。只需要添加如下代码:

<property>

    <name>hbase.coprocessor.user.region.classes</name>

    <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>

</property>

  为所有 table 加载了一个 cp class,可以用” ,”分割加载多个 class

    2、动态加载

启用表 aggregation,只对特定的表生效。通过 HBase Shell 来实现。
disable 指定表。 hbase> disable 'mytable'
添加 aggregation
hbase> alter 'mytable', METHOD => 'table_att','coprocessor'=>
'|org.apache.Hadoop.hbase.coprocessor.AggregateImplementation||'
重启指定表 hbase> enable 'mytable'

协处理器卸载

HBase的协处理器及应用实战

 

4、协处理器Observer应用实战

 

通过协处理器Observer实现hbase当中一张表插入数据,然后通过协处理器,将数据复制一份保存到另外一张表当中去,但是只取当第一张表当中的部分列数据保存到第二张表当中去

第一步:HBase当中创建一张表proc1

在HBase当中创建一张表,表名user2,并只有一个列族info

cd /export/servers/hbase-2.0.0/

bin/hbase shell

hbase(main):053:0> create 'proc1','info'

 

第二步:Hbase当中创建第二表proc2

创建第二张表'proc2,作为目标表,将第一张表当中插入数据的部分列,使用协处理器,复制到'proc2表当中来

hbase(main):054:0> create 'proc2','info'

第三步:开发HBase的协处理器

开发HBase的协处理器Copo

public class MyProcessor implements RegionObserver,RegionCoprocessor {

    static Connection connection = null;
    static Table table = null;
    static{
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","node01:2181");
        try {
            connection = ConnectionFactory.createConnection(conf);
            table = connection.getTable(TableName.valueOf("proc2"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private RegionCoprocessorEnvironment env = null;
    private static final String FAMAILLY_NAME = "info";
    private static final String QUALIFIER_NAME = "name";
    //2.0加入该方法,否则无法生效
    @Override
    public Optional<RegionObserver> getRegionObserver() {
        // Extremely important to be sure that the coprocessor is invoked as a RegionObserver
        return Optional.of(this);
    }
    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        env = (RegionCoprocessorEnvironment) e;
    }
    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {
        // nothing to do here
    }
    /**
     * 覆写prePut方法,在我们数据插入之前进行拦截,
     * @param e
     * @param put  put对象里面封装了我们需要插入到目标表的数据
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
                       final Put put, final WALEdit edit, final Durability durability)
            throws IOException {
        try {
            //通过put对象获取插入数据的rowkey
            byte[] rowBytes = put.getRow();
            String rowkey = Bytes.toString(rowBytes);
            //获取我们插入数据的name字段的值
            List<Cell> list = put.get(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(QUALIFIER_NAME));
            if (list == null || list.size() == 0) {
                return;
            }
            //获取到info列族,name列对应的cell
            Cell cell2 = list.get(0);

            //通过cell获取数据值
            String nameValue = Bytes.toString(CellUtil.cloneValue(cell2));
            //创建put对象,将数据插入到proc2表里面去
            Put put2 = new Put(rowkey.getBytes());
            put2.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(QUALIFIER_NAME),  nameValue.getBytes());
            table.put(put2);
            table.close();
        } catch (Exception e1) {
            return ;
        }
    }
}

 

第四步:将项目打成jar包,并上传到HDFS上面

将我们的协处理器打成一个jar包,此处不需要用任何的打包插件即可,然后上传到hdfs

HBase的协处理器及应用实战

 

将打好的jar包上传到linux的/export/servers路径下

cd /export/servers

mv original-hbase-1.0-SNAPSHOT.jar  processor.jar

hdfs dfs -mkdir -p /processor

hdfs dfs -put processor.jar /processor

第五步:将打好的jar包挂载到proc1当中去

hbase(main):056:0> describe 'proc1'

hbase(main):055:0> alter 'proc1',METHOD => 'table_att','Coprocessor'=>'hdfs://node01:8020/processor/processor.jar|cn.itcast.hbasemr.demo4.MyProcessor|1001|'

再次查看'proc1'表,

hbase(main):043:0> describe 'proc1'

可以查看到我们的卸载器已经加载了

 

第六步:proc1当中添加数据

进入hbase-shell客户端,然后直接执行以下命令向proc1表当中添加数据

put 'proc1','0001','info:name','zhangsan'

put 'proc1','0001','info:age','28'

put 'proc1','0002','info:name','lisi'

put 'proc1','0002','info:age','25'

向proc1表当中添加数据,然后通过

scan  'proc2'

我们会发现,proc2表当中也插入了数据,并且只有info列族,name列

 

注意如果需要卸载我们的协处理器,那么进入hbase的shell命令行,执行以下命令即可

disable 'proc1'

alter 'proc1',METHOD=>'table_att_unset',NAME=>'coprocessor$1'

enable 'proc1'