利用Solr建立HBase的二级索引:
1、编写的协处理器代码为
package com;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.common.SolrInputDocument;
public class SolrCoprocessor extends BaseRegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
// TODO Auto-generated method stub
insertSolr(put);
super.postPut(e, put, edit, durability);
}
public void insertSolr(Put put) {
CloudSolrServer cloudSolrServer;
final String zkHost = "IP:2181,IP:2181,IP:2181";
final int zkConnectTimeout = 1;
cloudSolrServer = new CloudSolrServer(zkHost);
cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
cloudSolrServer.connect();
cloudSolrServer.setDefaultCollection("student");
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", Bytes.toString(put.getRow()));
for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {
String key = Bytes.toString(c.getQualifier());
String value = Bytes.toString(c.getValue());
System.out.println("key = " + key);
System.out.println("value = " + value);
if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {
Map<String, Object> oper = new HashMap<String, Object>();
oper.put("set", value);
doc.addField(key, oper);
}
}
try {
cloudSolrServer.add(doc);
cloudSolrServer.commit(true, true, true);
cloudSolrServer.shutdown();
} catch (Exception e) {
// TODO Auto-generated catch block
System.out.println("11111");
cloudSolrServer.shutdown();
putData("temp_student", put);
// e.printStackTrace();
}
}
// 插入Solr出错,将数据放入备份表中
public void putData(String tablename, Put put) {
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
String content = "";
long time = System.currentTimeMillis();
String row = time + "";
String operate = "put";
content = operate;
String rowkey = Bytes.toString(put.getRow());
content = content + "\001" + rowkey;
content = content + "\001";
for (KeyValue c : put.getFamilyMap().get(Bytes.toBytes("info"))) {
String key = Bytes.toString(c.getQualifier());
String value = Bytes.toString(c.getValue());
System.out.println("key = " + key);
System.out.println("value = " + value);
if (key.equalsIgnoreCase("name") || key.equalsIgnoreCase("address")) {
content = content + key + "\003" + value + "\002";
}
}
content = content.substring(0, content.length() - 1);
try {
table = new HTable(hb_conf, tablename);
Put newPut = new Put(Bytes.toBytes(row));
newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
Bytes.toBytes(content));
table.put(newPut);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,
Delete delete, WALEdit edit, Durability durability)
throws IOException {
// TODO Auto-generated method stub
deleteSolr(delete);
super.postDelete(e, delete, edit, durability);
}
public void deleteSolr(Delete delete) {
CloudSolrServer cloudSolrServer;
final String zkHost = "IP:2181,IP:2181,IP:2181";
final int zkConnectTimeout = 1;
cloudSolrServer = new CloudSolrServer(zkHost);
cloudSolrServer.setZkConnectTimeout(zkConnectTimeout);
cloudSolrServer.connect();
cloudSolrServer.setDefaultCollection("student");
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
try {
table = new HTable(hb_conf, "student");
Get get = new Get(delete.getRow());
Result result = table.get(get);
if (result.size() == 0) {
try {
cloudSolrServer.deleteById(Bytes.toString(delete.getRow()));
cloudSolrServer.commit(true, true, true);
cloudSolrServer.shutdown();
} catch (Exception e) {
// TODO Auto-generated catch block
cloudSolrServer.shutdown();
deleteData("temp_student", get);
}
} else {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("id", Bytes.toString(delete.getRow()));
List<Cell> list_cell = result.listCells();
for (int i = 0; i < list_cell.size(); i++) {
Cell cell = list_cell.get(i);
String key = Bytes.toString(cell.getQualifier());
String value = Bytes.toString(cell.getValue());
System.out.println("key = " + key + " value = " + value);
doc.addField(key, value);
}
try {
cloudSolrServer.add(doc);
cloudSolrServer.commit(true, true, true);
cloudSolrServer.shutdown();
} catch (Exception e) {
// TODO Auto-generated catch block
cloudSolrServer.shutdown();
addData("temp_student", result);
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
}
}
// 删除Solr出错,将数据放入备份表中
public void deleteData(String tablename, Get get) {
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
long time = System.currentTimeMillis();
String row = time + "";
String operate = "delete";
String content = operate;
String rowkey = Bytes.toString(get.getRow());
content = content + "\001" + rowkey;
try {
table = new HTable(hb_conf, tablename);
Put newPut = new Put(Bytes.toBytes(row));
newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
Bytes.toBytes(content));
table.put(newPut);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 更新Solr出错,将数据放入备份表中
public void addData(String tablename, Result result) {
Configuration hb_conf = HBaseConfiguration.create();
HTable table;
long time = System.currentTimeMillis();
String row = time + "";
String operate = "add";
String content = operate;
String rowkey = Bytes.toString(result.getRow());
content = content + "\001" + rowkey;
content=content+"\001";
try {
List<Cell> list_cell = result.listCells();
for (int i = 0; i < list_cell.size(); i++) {
Cell cell = list_cell.get(i);
String key = Bytes.toString(cell.getQualifier());
String value = Bytes.toString(cell.getValue());
System.out.println("key = " + key + " value = " + value);
content = content + key + "\003" + value + "\002";
}
content = content.substring(0, content.length() - 1);
table = new HTable(hb_conf, tablename);
Put newPut = new Put(Bytes.toBytes(row));
newPut.add(Bytes.toBytes("info"), Bytes.toBytes("content"),
Bytes.toBytes(content));
table.put(newPut);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、将协处理器的代码打包成jar包(Test_1.jar),然后放到HDFS上,或者放到每一台Hbase主机的同一文件目录下(本例子为:/home/hadoop/Test_1.jar)
3、将Solr的相关jar包放到HBase的lib文件中
4、为HBase的相关表添加协处理器
(1)disable 'student'
(2)alter 'student','coprocessor'=>'hdfs://IP:9000/Test_1.jar|
com.SolrCoprocessor|1001|'
或者:
alter'student','coprocessor'=>'file:///home/hadoop/Test_1.jar|
com.SolrCoprocessor|1001|'
(3)enable 'student'
5、附带:为表删除协处理器的命令为:
alter'student',METHOD=>'table_att_unset',NAME=>'coprocessor$1'