hbase数据迁移put方法java代码

时间:2021-03-30 06:34:34

package ceshi;

import java.io.IOException;

import java.util.Scanner;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.Statement; 

import java.sql.ResultSet;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration; 

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.util.Bytes;

public class ceshi {

static Configuration cfg=HBaseConfiguration.create();

public static void create(String tablename,String columnFamily)throws Exception{

HBaseAdmin admin=new HBaseAdmin(cfg);

 if(admin.tableExists(tablename)){ //在新建hBasetable之前先判断该表是否已经存在

 System.out.println("table Exists!");

 System.exit(0);

 }

 else{

 @SuppressWarnings("deprecation")

HTableDescriptor tableDesc =new HTableDescriptor(tablename);//若不存在在创建该表

 tableDesc.addFamily(new HColumnDescriptor(columnFamily));

 admin.createTable(tableDesc);

 System.out.println("create table success!");

 }

 }

 public static void put(String tablename,String row,String columnFamily,String column,String data)

 throws IOException{ //实现插入函数

 HTable table=new HTable(cfg,tablename);

Put p1=new Put(Bytes.toBytes(row));

p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));

table.put(p1);

System.out.println("put'"+row+"','"+columnFamily+":"+column+"','"+data+"'");

 }

 public static void get(String tablename,String row)throws IOException{

 HTable table=new HTable(cfg,tablename); //查询HBase的表的特定key的内容

 Get g=new Get(Bytes.toBytes(row));

 Result result=table.get(g);

 System.out.println("Get:"+result);

 }

 public static void scan(String tablename) throws Exception{//Hbase的整个表进行scan

 HTable table=new HTable(cfg,tablename);

 Scan s=new Scan();

 ResultScanner rs=table.getScanner(s);

 for(Result r:rs){

 System.out.println("Scan:"+r);

 }

 }

 public static boolean delete(String tablename) throws IOException{//删除HBase的某张表

 HBaseAdmin admin=new HBaseAdmin(cfg);

 if(admin.tableExists(tablename)){

 try{

 admin.disableTable(tablename);

 admin.deleteTable(tablename);

 }catch(Exception ex){

 ex.printStackTrace();

 return false;

 }

 }

 return true;

 }

/*private static HTable connectHBase(String tablename) throws IOException{

HTable table=null;

Configuration conf=HBaseConfiguration.create();

table=new HTable(conf,tablename);

return table;

}*/

private static Connection connectDB() throws Exception {

Scanner reader=new Scanner(System.in);

System.out.println("请输入MySQL数据库用户名:");

String userName=reader.next();

System.out.println("请输入MySQL数据库密码:");

String password=reader.next();

//System.out.println("请输入MySQL数据库中要迁移的表名:");

String url="jdbc:mysql://127.0.0.1/test";

//String userName="root";

//String password="12345";

//String url="jdbc:mysql://127.0.0.1/test";

Connection conn=DriverManager.getConnection(url,userName,password);

return conn;

}

@SuppressWarnings("deprecation")

public static void main(String[] args){

Connection dbConn=null;

HTable htable=null;

Statement  stmt=null;

Scanner reader=new Scanner(System.in);

String query="select * from five";

try{

dbConn=connectDB();

System.out.println("请输入迁移后Hbase数据库中表名:");

String table=reader.next();

System.out.println("请输入迁移后Hbase数据库中表的列族名:");

String n=reader.next();

long start =System.currentTimeMillis();///////////

ceshi.create(table, n);

byte[] family=Bytes.toBytes(n);

stmt=dbConn.createStatement();

ResultSet rs=stmt.executeQuery(query);

long ts=System.currentTimeMillis();

 HTable table1=new HTable(cfg,table);

while(rs.next()){

String stid=rs.getString("stid");

String month=rs.getString("month");

String day=rs.getString("day");

String rowkey=stid+month+day;

Put p=new Put(Bytes.toBytes(rowkey));

for(int i=5;i<=28;i++){

int l=i-4;

String columnI="v"+l;

String valueI=rs.getString(i);

p.add(family,Bytes.toBytes(columnI),ts,Bytes.toBytes(valueI));

}

table1.put(p);

}

long end=System.currentTimeMillis();

long Sumtime=end- start;

    double Sumtime1=(double)(Sumtime/1000.00); 

System.out.println("迁移所用的总时间为:" + Sumtime1+"s");

}catch(Exception e){

e.printStackTrace();//用户名

}finally{

try{

if(stmt !=null){

stmt.close();

}

if(dbConn !=null){

dbConn.close();

}

if(htable!=null){

htable.close();

}

}catch(Exception e){

}

}

}

}