package ceshi;
import java.io.IOException;
import java.util.Scanner;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.sql.ResultSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class ceshi {
static Configuration cfg=HBaseConfiguration.create();
public static void create(String tablename,String columnFamily)throws Exception{
HBaseAdmin admin=new HBaseAdmin(cfg);
if(admin.tableExists(tablename)){ //在新建hBase的table之前先判断该表是否已经存在
System.out.println("table Exists!");
System.exit(0);
}
else{
@SuppressWarnings("deprecation")
HTableDescriptor tableDesc =new HTableDescriptor(tablename);//若不存在在创建该表
tableDesc.addFamily(new HColumnDescriptor(columnFamily));
admin.createTable(tableDesc);
System.out.println("create table success!");
}
}
public static void put(String tablename,String row,String columnFamily,String column,String data)
throws IOException{ //实现插入函数
HTable table=new HTable(cfg,tablename);
Put p1=new Put(Bytes.toBytes(row));
p1.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));
table.put(p1);
System.out.println("put'"+row+"','"+columnFamily+":"+column+"','"+data+"'");
}
public static void get(String tablename,String row)throws IOException{
HTable table=new HTable(cfg,tablename); //查询HBase的表的特定key的内容
Get g=new Get(Bytes.toBytes(row));
Result result=table.get(g);
System.out.println("Get:"+result);
}
public static void scan(String tablename) throws Exception{//对Hbase的整个表进行scan
HTable table=new HTable(cfg,tablename);
Scan s=new Scan();
ResultScanner rs=table.getScanner(s);
for(Result r:rs){
System.out.println("Scan:"+r);
}
}
public static boolean delete(String tablename) throws IOException{//删除HBase的某张表
HBaseAdmin admin=new HBaseAdmin(cfg);
if(admin.tableExists(tablename)){
try{
admin.disableTable(tablename);
admin.deleteTable(tablename);
}catch(Exception ex){
ex.printStackTrace();
return false;
}
}
return true;
}
/*private static HTable connectHBase(String tablename) throws IOException{
HTable table=null;
Configuration conf=HBaseConfiguration.create();
table=new HTable(conf,tablename);
return table;
}*/
private static Connection connectDB() throws Exception {
Scanner reader=new Scanner(System.in);
System.out.println("请输入MySQL数据库用户名:");
String userName=reader.next();
System.out.println("请输入MySQL数据库密码:");
String password=reader.next();
//System.out.println("请输入MySQL数据库中要迁移的表名:");
String url="jdbc:mysql://127.0.0.1/test";
//String userName="root";
//String password="12345";
//String url="jdbc:mysql://127.0.0.1/test";
Connection conn=DriverManager.getConnection(url,userName,password);
return conn;
}
@SuppressWarnings("deprecation")
public static void main(String[] args){
Connection dbConn=null;
HTable htable=null;
Statement stmt=null;
Scanner reader=new Scanner(System.in);
String query="select * from five";
try{
dbConn=connectDB();
System.out.println("请输入迁移后Hbase数据库中表名:");
String table=reader.next();
System.out.println("请输入迁移后Hbase数据库中表的列族名:");
String n=reader.next();
long start =System.currentTimeMillis();///////////
ceshi.create(table, n);
byte[] family=Bytes.toBytes(n);
stmt=dbConn.createStatement();
ResultSet rs=stmt.executeQuery(query);
long ts=System.currentTimeMillis();
HTable table1=new HTable(cfg,table);
while(rs.next()){
String stid=rs.getString("stid");
String month=rs.getString("month");
String day=rs.getString("day");
String rowkey=stid+month+day;
Put p=new Put(Bytes.toBytes(rowkey));
for(int i=5;i<=28;i++){
int l=i-4;
String columnI="v"+l;
String valueI=rs.getString(i);
p.add(family,Bytes.toBytes(columnI),ts,Bytes.toBytes(valueI));
}
table1.put(p);
}
long end=System.currentTimeMillis();
long Sumtime=end- start;
double Sumtime1=(double)(Sumtime/1000.00);
System.out.println("迁移所用的总时间为:" + Sumtime1+"s");
}catch(Exception e){
e.printStackTrace();//用户名
}finally{
try{
if(stmt !=null){
stmt.close();
}
if(dbConn !=null){
dbConn.close();
}
if(htable!=null){
htable.close();
}
}catch(Exception e){
}
}
}
}