利用Solr建立HBase的二级索引

时间:2021-05-11 16:45:27

利用Solr建立HBase的二级索引:

1、编写的协处理器代码为

package com;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.common.SolrInputDocument;

public class SolrCoprocessor extends BaseRegionObserver {

 @Override
 public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
   Put put, WALEdit edit, Durability durability) throws IOException {
  // TODO Auto-generated method stub
  insertSolr(put);
  super.postPut(e, put, edit, durability);
 }

 public void insertSolr(Put put) {
  CloudSolrServer cloudSolrServer;
  final String zkHost = "IP:2181,IP:2181,IP:2181";
  final int zkConnectTimeout = 1;
  cloudSolrServer = new CloudSolrServer(zkHost);
  cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
  cloudSolrServer.connect();
  cloudSolrServer.setDefaultCollection("student");
  SolrInputDocument doc = new SolrInputDocument();
  doc.addField("id", Bytes.toString(put.getRow()));
  for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {
   String key = Bytes.toString(c.getQualifier());
   String value = Bytes.toString(c.getValue());
   System.out.println("key = " + key);
   System.out.println("value = " + value);
   if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {
    Map<String, Object> oper = new HashMap<String, Object>();
    oper.put("set", value);
    doc.addField(key, oper);
   }
  }
  try {
   cloudSolrServer.add(doc);
   cloudSolrServer.commit(true, true, true);
   cloudSolrServer.shutdown();
  } catch (Exception e) {
   // TODO Auto-generated catch block
   System.out.println("11111");
   cloudSolrServer.shutdown();
   putData("temp_student", put);
   // e.printStackTrace();
  }
 }

 // 插入Solr出错,将数据放入备份表中
 public void putData(String tablename, Put put) {
  Configuration hb_conf = HBaseConfiguration.create();
  HTable table;
  String content = "";
  long time = System.currentTimeMillis();
  String row = time + "";

  String operate = "put";
  content = operate;
  String rowkey = Bytes.toString(put.getRow());
  content = content + "\001" + rowkey;
  content = content + "\001";
  for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {
   String key = Bytes.toString(c.getQualifier());
   String value = Bytes.toString(c.getValue());
   System.out.println("key = " + key);
   System.out.println("value = " + value);
   if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {
    content = content + key + "\003" + value + "\002";
   }
  }
  content = content.substring(0, content.length() - 1);
  try {
   table = new HTable(hb_conf, tablename);
   Put newPut = new Put(Bytes.toBytes(row));
   newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
     Bytes.toBytes(content));
   table.put(newPut);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

 @Override
 public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,
   Delete delete, WALEdit edit, Durability durability)
   throws IOException {
  // TODO Auto-generated method stub
  deleteSolr(delete);
  super.postDelete(e, delete, edit, durability);
 }

 public void deleteSolr(Delete delete) {
  CloudSolrServer cloudSolrServer;
  final String zkHost = "IP:2181,IP:2181,IP:2181";
  final int zkConnectTimeout = 1;
  cloudSolrServer = new CloudSolrServer(zkHost);
  cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
  cloudSolrServer.connect();
  cloudSolrServer.setDefaultCollection("student");
  Configuration hb_conf = HBaseConfiguration.create();
  HTable table;
  try {
   table = new HTable(hb_conf, "student");
   Get get = new Get(delete.getRow());
   Result result = table.get(get);
   if (result.size() == 0) {

    try {
     cloudSolrServer.deleteById(Bytes.toString(delete.getRow()));
     cloudSolrServer.commit(true, true, true);
     cloudSolrServer.shutdown();
    } catch (Exception e) {
     // TODO Auto-generated catch block
     cloudSolrServer.shutdown();
     deleteData("temp_student", get);
    }

   } else {
    SolrInputDocument doc = new SolrInputDocument();
    doc.addField("id", Bytes.toString(delete.getRow()));
    List<Cell> list_cell = result.listCells();
    for (int i = 0; i < list_cell.size(); i++) {
     Cell cell = list_cell.get(i);
     String key = Bytes.toString(cell.getQualifier());
     String value = Bytes.toString(cell.getValue());
     System.out.println("key = " + key + " value = " + value);
     doc.addField(key, value);
    }
    try {
     cloudSolrServer.add(doc);
     cloudSolrServer.commit(true, true, true);
     cloudSolrServer.shutdown();
    } catch (Exception e) {
     // TODO Auto-generated catch block
     cloudSolrServer.shutdown();
     addData("temp_student", result);
    }

   }
  } catch (IOException e) {
   // TODO Auto-generated catch block
  }

 }

 // 删除Solr出错,将数据放入备份表中
 public void deleteData(String tablename, Get get) {
  Configuration hb_conf = HBaseConfiguration.create();
  HTable table;
  long time = System.currentTimeMillis();
  String row = time + "";

  String operate = "delete";
  String content = operate;
  String rowkey = Bytes.toString(get.getRow());
  content = content + "\001" + rowkey;
  try {
   table = new HTable(hb_conf, tablename);
   Put newPut = new Put(Bytes.toBytes(row));
   newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
     Bytes.toBytes(content));
   table.put(newPut);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

 // 更新Solr出错,将数据放入备份表中
 public void addData(String tablename, Result result) {
  Configuration hb_conf = HBaseConfiguration.create();
  HTable table;
  long time = System.currentTimeMillis();
  String row = time + "";

  String operate = "add";
  String content = operate;
  String rowkey = Bytes.toString(result.getRow());
  content = content + "\001" + rowkey;
  content=content+"\001";
  try {
   List<Cell> list_cell = result.listCells();
   for (int i = 0; i < list_cell.size(); i++) {
    Cell cell = list_cell.get(i);
    String key = Bytes.toString(cell.getQualifier());
    String value = Bytes.toString(cell.getValue());
    System.out.println("key = " + key + " value = " + value);
    content = content + key + "\003" + value + "\002";
   }
   content = content.substring(0, content.length() - 1);
   table = new HTable(hb_conf, tablename);
   Put newPut = new Put(Bytes.toBytes(row));
   newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
     Bytes.toBytes(content));
   table.put(newPut);
  } catch (IOException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
 }

}

 

2、将协处理器的代码打包成jar包(Test_1.jar),然后放到HDFS上,或者放到每一台Hbase主机的同一文件目录下(本例子为:/home/hadoop/Test_1.jar

3、将Solr的相关jar包放到HBase的lib文件中

4、为HBase的相关表添加协处理器

(1)disable 'student'

(2)alter 'student','coprocessor'=>'hdfs://IP:9000/Test_1.jar|

com.SolrCoprocessor|1001|'

 或者:

 alter'student','coprocessor'=>'file:///home/hadoop/Test_1.jar|

 com.SolrCoprocessor|1001|'

(3)enable 'student'

5、附带:为表删除协处理器的命令为:

alter'student',METHOD=>'table_att_unset',NAME=>'coprocessor$1'