package hadoop42_006_hbase01;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ResourceBundle;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.util.ResourceBundles;
public class HBaseUtil {
private static Configuration conf;
private static Connection con;
private static Admin admin;
private static ExecutorService pool;
static{
conf=HBaseConfiguration.create();
ResourceBundle rb=ResourceBundles.getBundle("hbase");
Enumeration<String> kvs=rb.getKeys();
while (kvs.hasMoreElements()) {
String key=kvs.nextElement();
String value=rb.getString(key);
conf.set(key,value);
}
}
public static Connection getConn(){
pool=Executors.newCachedThreadPool();
try {
con=ConnectionFactory.createConnection(conf,pool);
return con;
} catch (IOException e) {
throw new RuntimeException("数据库连接失败",e);
}
}
public static void close(){
try {
getAdmin();
if (admin!=null) {
admin.close();
}
} catch (IOException e) {
throw new RuntimeException("关闭管理者失败。。。",e);
}
try {
if (con!=null && !con.isClosed()) {
con.close();
}
} catch (IOException e) {
throw new RuntimeException("关闭连接失败",e);
}
try {
if (con!=null&&!pool.isShutdown()) {
pool.shutdown();
}
} catch (Exception e) {
throw new RuntimeException("关闭线程池失败",e);
}
}
public static boolean createTable(String tableName,String... columnFamilies) throws TableExistsException{
getAdmin();
TableName tn=TableName.valueOf(tableName);
try {
if(!admin.tableExists(tn)){
HTableDescriptor htd=new HTableDescriptor(tn);
for (String cf : columnFamilies) {
HColumnDescriptor hcd=new HColumnDescriptor(Bytes.toBytes(cf));
htd.addFamily(hcd);
}
admin.createTable(htd);
return true;
}else{
throw new TableExistsException("表已经存在。。。。");
}
}catch (TableExistsException e) {
throw new TableExistsException("表已经存在!!");
}catch (IOException e) {
throw new RuntimeException("创建数据库表失败!!",e);
}finally{
close();
}
}
/**
* 对表的删除
* @param tableName 表名
* @return
*/
public static boolean delTable(String tableName){
try {
getAdmin();
TableName delIN=TableName.valueOf(tableName);
if(admin.tableExists(delIN)){
admin.disableTable(delIN);
admin.deleteTable(delIN);
return true;
}else{
throw new RuntimeException("表已经不存在。。。");
}
} catch (IOException e) {
throw new RuntimeException(e);
}finally {
close();
}
}
public static Admin getAdmin(){
try {
getConn();
admin=con.getAdmin();
return admin;
} catch (IOException e) {
throw new RuntimeException("取到表的管理者失败!!",e);
}
}
/**
* 对表中数据的增 删 改
* @param tableName 表名
* @param mt 数据库对表的操作类型 是进行PUT 还是 DELETE
* @param rowkey 行键值
* @param params 3个参数是列族名 单元格修饰名 单元格的值
* 2个参数时 列族名 单元格修饰名
* 1个参数时 列族名
*/
public static void doUpdate(String tableName,MutationType mt,String rowkey,String...params){
try {
getAdmin();
TableName tn=TableName.valueOf(tableName);
if(admin.tableExists(tn)){
Table t=con.getTable(tn,pool);
switch(mt){
case PUT:
Put put=null;
if (params.length==3) {
put = new Put(Bytes.toBytes(rowkey)).addColumn(Bytes.toBytes(params[0]),
Bytes.toBytes(params[1]), Bytes.toBytes(params[2]));
}else{
throw new RuntimeException("参数个数为3个!!!!");
}
t.put(put);
break;
case DELETE:
Delete del=new Delete(Bytes.toBytes(rowkey));
while (params!=null && params.length!=0) {
System.out.println(params.length);
switch (params.length) {
case 1:
del.addFamily(Bytes.toBytes(params[0]));
break;
case 2:
del.addColumn(Bytes.toBytes(params[0]), Bytes.toBytes(params[1]));
default:
throw new RuntimeException("最多两个参数");
}
}
t.delete(del);
break;
default:
throw new RuntimeException("只能进行增删改操作");
}
}
}catch (IOException e) {
e.printStackTrace();
}finally {
close();
}
}
/**
* 对表中数据的增 删 改
* @param tableName 表名
* @param mt 数据库对表的操作类型 是进行PUT 还是 DELETE
* @param rowkey 行键值
* @param params 多columnfamily String[] ...params 中string[]的参数为:
* 3个参数是列族名 单元格修饰名 单元格的值
* 2个参数时 列族名 单元格修饰名
* 1个参数时 列族名
*/
public static void doUpdate(String tableName,MutationType mt,String rowkey,String[]...params){
try {
getAdmin();
TableName tn=TableName.valueOf(tableName);
if(admin.tableExists(tn)){
Table t=con.getTable(tn,pool);
switch(mt){
case PUT:
Put put=null;
put = new Put(Bytes.toBytes(rowkey));
for (String[] ps : params) {
if (params.length==3) {
put.addColumn(Bytes.toBytes(ps[0]),Bytes.toBytes(ps[1]), Bytes.toBytes(ps[2]));
}else{
throw new RuntimeException("参数个数为3个!!!!");
}
}
t.put(put);
break;
case DELETE:
Delete del=new Delete(Bytes.toBytes(rowkey));
for (String[] ps : params) {
if(params !=null && ps.length !=0) {
switch (ps.length) {
case 1:
del.addFamily(Bytes.toBytes(ps[0]));
break;
case 2:
del.addColumn(Bytes.toBytes(ps[0]), Bytes.toBytes(ps[1]));
break;
default:
throw new RuntimeException("最多两个参数");
}
}
}
t.delete(del);
break;
default:
throw new RuntimeException("只能进行增删改操作");
}
}
}catch (IOException e) {
e.printStackTrace();
}finally {
close();
}
}
/**
* 对表中数据的增 删 改
* @param tableName 表名
* @param mt 数据库对表的操作类型 是进行PUT 还是 DELETE
* @param params 多rowkey Map<String,List<String[]>> 中String的参数为:
* 3个参数是列族名 单元格修饰名 单元格的值
* 2个参数时 列族名 单元格修饰名
* 1个参数时 列族名
*/
public static void doUpdate(String tableName,MutationType mt,Map<String,List<String[]>>params){
try {
getAdmin();
TableName tn=TableName.valueOf(tableName);
if(admin.tableExists(tn)){
Table t=con.getTable(tn,pool);
switch(mt){
case PUT:
List<Put> puts=new ArrayList<Put>();
for (Entry<String,List<String[]>> entry:params.entrySet()) {
Put put = new Put(Bytes.toBytes(entry.getKey()));
if(entry.getValue()!=null){
for (String[] ps : entry.getValue()) {
if (ps.length==3) {
put.addColumn(Bytes.toBytes(ps[0]),Bytes.toBytes(ps[1]), Bytes.toBytes(ps[2]));
}else{
throw new RuntimeException("参数个数为3个!!!!");
}
}
puts.add(put);
}
}
t.put(puts);
break;
case DELETE:
List<Delete> dels=new ArrayList<Delete>();
for (Entry<String,List<String[]>> entry:params.entrySet()) {
Delete del=new Delete(Bytes.toBytes(entry.getKey()));
if(entry.getValue()!=null){
for (String[] ps : entry.getValue()) {
if(ps!=null && ps.length!=0) {
switch(ps.length) {
case 1:
del.addFamily(Bytes.toBytes(ps[0]));
break;
case 2:
del.addColumn(Bytes.toBytes(ps[0]), Bytes.toBytes(ps[1]));
break;
default:
throw new RuntimeException("最多两个参数");
}
}
}
}
dels.add(del);
}
t.delete(dels);
break;
default:
throw new RuntimeException("只能进行增删改操作");
}
}
}catch (IOException e) {
throw new RuntimeException("进行增删改操作失败");
}finally {
close();
}
}
public static String get(String tableName,String rowKey,String columnFamily,String qualifier){
getAdmin();
try {
Table t=con.getTable(TableName.valueOf(tableName));
Get get=new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
Result r=t.get(get);
List<Cell> cells=r.listCells();
return Bytes.toString(CellUtil.cloneValue(cells.get(0)));
} catch (IOException e) {
throw new RuntimeException("获得表对象失败",e);
}
}
public static Map<String,String> get(String tableName,String rowKey,String columnFamily,String ... qualifiers){
getAdmin();
try {
Table t=con.getTable(TableName.valueOf(tableName));
Get get=new Get(Bytes.toBytes(rowKey));
if(qualifiers!=null &&qualifiers.length!=0){
for (String qualifier : qualifiers) {
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
}
}else if(columnFamily!=null){
get.addFamily(Bytes.toBytes(columnFamily));
}
Result r=t.get(get);
List<Cell> cells=r.listCells();
Map<String,String> results=null;
if(cells!=null && cells.size()!=0){
results=new HashMap<String,String>();
for (Cell cell : cells) {
results.put(Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
}
return results;
} catch (IOException e) {
throw new RuntimeException("获得表对象失败",e);
}
}
public static <T> T get(String tableName,String rowKey,String columnFamily,Class<T>clazz){
getAdmin();
try {
Table t=con.getTable(TableName.valueOf(tableName));
Get get=new Get(Bytes.toBytes(rowKey));
Field[] fs=clazz.getDeclaredFields();
for (Field f : fs) {
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(f.getName()));
}
Result r=t.get(get);
List<Cell>cells=r.listCells();
T tobj=clazz.newInstance();
if(cells!=null &&cells.size()!=0){
for (Cell cell : cells) {
for (int i = 0; i < fs.length; i++) {
String valueStr=Bytes.toString(CellUtil.cloneValue(cell));
if(Bytes.toString(CellUtil.cloneQualifier(cell)).intern()==fs[i].getName().intern()){
Object value=null;
if(fs[i].getType().getName().intern()=="int" || fs[i].getType().getName().intern()=="java.lang.Integer"){
value=Integer.parseInt(valueStr);
}else if(fs[i].getType().getName().intern()=="double" || fs[i].getType().getName().intern()=="java.lang.Double"){
value=Double.parseDouble(valueStr);
}
fs[i].setAccessible(true);
fs[i].set(tobj, value);
}
}
}
}
return tobj;
} catch (IOException e) {
throw new RuntimeException("获得表对象失败",e);
}catch (Exception e) {
throw new RuntimeException("获得对象失败",e);
}
}
public static Map<String,Map<String,String>> scan(String tableName,String[]... params){
getAdmin();
try {
Table t=con.getTable(TableName.valueOf(tableName));
Scan scan=new Scan();
if(params!=null &¶ms.length!=0){
for (String[] param : params) {
switch (param.length) {
case 1:
scan.addFamily(Bytes.toBytes(param[0]));
break;
case 2:
scan.addColumn(Bytes.toBytes(param[0]), Bytes.toBytes(param[1]));
default:
throw new RuntimeException("参数只能是一个或两个");
}
}
}
ResultScanner rScan=t.getScanner(scan);
Map<String,Map<String,String>> results=new HashMap<String,Map<String,String>>();
for (Result r : rScan) {
List<Cell> cells=r.listCells();
Map<String,String> rs=null;
if(cells!=null &&cells.size()!=0){
rs=new HashMap<String,String>();
for (Cell cell : cells) {
rs.put(Bytes.toString(CellUtil.cloneFamily(cell))+":"
+Bytes.toString(CellUtil.cloneQualifier(cell)),
Bytes.toString(CellUtil.cloneValue(cell)));
}
}
results.put(Bytes.toString(r.getRow()), rs);
}
return results;
} catch (IOException e) {
throw new RuntimeException("获得表对象失败",e);
}
}
}