简介
前一段时间做项目需要将服务器上MySql数据库中的数据迁移到HBase数据库,没有用工具,直接用JDBC和HBase API解决。
碰到一些问题,服务器上的HBase数据库不知道为什么远程连接不上,于是写了xx.java文件在服务器上跑。
前提配置:
- java jdk
- hbase、hadoop
- mysql
- 一些habse和jdbc的jar包
- jar包以及样例链接如下,不日上传
-截图如下
迁移思路
- 根据mysql数据库表名,获取表字段信息,根据第四列找到主键,如图
- mysql表与hbase表对应,主键对应主键,其他属性对应列族(info)的属性,如上图News表”url”对应News_veetah表”url”(rowkey),”news_article_id”对应”info:news_article_id”
- jdbc从mysql一条条读取表中数据,同时一行行添加到hbase对应表
- hbase api java操作(不详细解释,基本和jdbc原理类似)
代码
注释不详细解释了,注意装hbase服务器的IP,装mysql服务器的ip,mysql用户名密码等
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
public class SqlToHBase {
Map<String, List<String>> map = new HashMap<String, List<String>>();//存储表名,主键
public static Configuration config = null;
static {
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "47.92.123.230:2181");
}
/*
* 创建表
*/
public static void createTable(String tableName, String[] columnfamily) {
Admin admin;
HTableDescriptor table;
TableName t_name = TableName.valueOf(tableName);
Connection connection = null;
try {
System.out.println("connection...");
connection = ConnectionFactory.createConnection(config);
admin = connection.getAdmin();
table = new HTableDescriptor(t_name);
System.out.println("table.addFamily...");
for (int i = 0; i < columnfamily.length; i++) {
table.addFamily(new HColumnDescriptor(columnfamily[i]));
}
if (admin.tableExists(t_name)) {
System.out.println("Table " + tableName + " Exists!!");
return;
} else {
System.out.println("admin.createTable...");
admin.createTable(table);
System.out.println("Create Table Success!!! Table Name :[ " + tableName + " ]");
}
System.out.println("admin.close...");
admin.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
System.out.println("connection.close...");
if (null != connection && !connection.isClosed()) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*
* 删表
*/
public static void deleteTable(String tableName) {
Connection connection = null;
Admin admin;
try {
connection = ConnectionFactory.createConnection(config);
admin = connection.getAdmin();
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
System.out.println(tableName + " is deleted!!");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (null != connection && !connection.isClosed()) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*
* 添加数据
*/
public static void addData(String tableName, String rowKey, String[] column, String[] value) {
Connection connection = null;
Table table = null;
try {
connection = ConnectionFactory.createConnection(config);
Put put = new Put(Bytes.toBytes(rowKey));
table = connection.getTable(TableName.valueOf(tableName));
HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();
for (int i = 0; i < columnFamilies.length; i++) {
String familyName = columnFamilies[i].getNameAsString();
if (familyName.equals("info")) {
for (int j = 0; j < column.length; j++) {
if(value[j] != null) {
put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column[j]), Bytes.toBytes(value[j]));
}
}
}
table.put(put);
System.out.println("Add Data to [" + tableName + "] Success!!! Rowkey:" + rowKey);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (null != table) {
table.close();
}
if (null != connection && !connection.isClosed()) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/*
* 根据MySQL表名得到MySQL表属性
*/
public List<String> getSqlTableColumnByName(String sqltablename) {
List<String> list = new ArrayList<String>();
// 1.声明变量
java.sql.Connection connection = null;
PreparedStatement state = null;
ResultSet result = null;
try {
// 2.获取连接
connection = DBManager.getConnection();
// 3.创建sql语句对象,并执行语句
// String sql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = ?";
String sql = "describe " + sqltablename;
state = connection.prepareStatement(sql);
// state.setString(1, sqltablename);
// state = connection.prepareStatement(sql);
result = state.executeQuery(); // 返回查询结果
List<String> keylist = new ArrayList<String>();
while(result.next()) {
if(result.getString(4).equals("PRI")) {
keylist.add(result.getString(1));
} else {
list.add(result.getString(1));
}
}
map.put(sqltablename, keylist);
} catch (Exception e) {
// 5.处理异常
e.printStackTrace();
} finally {
// 6.释放资源
DBManager.closeAll(connection, state, result);
}
return list;
}
/*
* 传输
*/
public void doSqlToHBase(String[] names) {
// //删表
// for(int i = 0; i < names.length; i++) {
// deleteTable(names[i] + "_veetch");
// }
for(int i = 0; i < names.length; i++) {
String tablename = names[i] + "_veetah222";
String[] columnfamily = {"info"};
createTable(tablename, columnfamily);
List<String> list = getSqlTableColumnByName(names[i]);
String[] columns = new String[list.size()];
for(int j = 0; j < list.size(); j++) {
columns[j] = list.get(j);
}
// String[] columns = {"city_id", "city_name", "city_name_e", "lng", "lat", "weight"};
String[] values = new String[columns.length];
// 1.声明变量
java.sql.Connection connection = null;
PreparedStatement state = null;
ResultSet result = null;
try {
// 2.获取连接
connection = DBManager.getConnection();
// 3.创建sql语句对象,并执行语句
String sql = "select * from " + names[i];
state = connection.prepareStatement(sql);
result = state.executeQuery(); // 返回查询结果
// 4.分析执行结果
while (result.next()) {
for(int cc = 0; cc < values.length; cc++) {
values[cc] = result.getString(list.get(cc));
}
String rowkey = "";
if(map.get(names[i]).size() == 1) {
rowkey = result.getString(map.get(names[i]).get(0));
} else {
for(int jj = 0; jj < map.get(names[i]).size(); jj++) {
rowkey += ":" + result.getString(map.get(names[i]).get(jj));
}
}
addData(tablename, rowkey, columns, values);
}
} catch (Exception e) {
// 5.处理异常
e.printStackTrace();
} finally {
// 6.释放资源
DBManager.closeAll(connection, state, result);
}
}
System.out.println("---------迁移成功!----------");
}
public static void main(String[] args) throws IOException {
SqlToHBase s = new SqlToHBase();
// String[] names = {"Account", "Admin", "Blog", "Blog_of_xinjin", "Blog_xinjin", "Forum", "Forum_of_xinjin","Material","News","News_modif","News_of_xinjin","News_xinjin",
// "Result","Topic", "VPS", "backupTest","blog_crawler_reply","News","city","facebook_account","map","model","news_crawler_reply","post_job","reply_job",
// "site","test","view"};
String[] names = {"Admin"}; //Topic 两个主键
s.doSqlToHBase(names);
// s.createTable("wwwwww", new String[]{"info"});
}
}
JDBC连接类
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class DBManager {
private static String host = "12.12.12.12";
// private static String host = "127.0.0.1";
private static String port = "3306";
private static String username = "root";
private static String password = "xxxxxxx";
// private static String password = "123456";
private static String database = "veetah";
public DBManager(){
}
//通过静态代码块加载数据库驱动程序
static{
try{
Class.forName("com.mysql.jdbc.Driver");
}catch(Exception e){
e.printStackTrace();
}
}
//获取数据库连接
public static java.sql.Connection getConnection() {
java.sql.Connection connection = null;
String url = "jdbc:mysql://" + host + "" + ":" + port +"/" + database;
try{
connection = DriverManager.getConnection(url, username, password);
}catch(Exception e){
e.printStackTrace();
}
return connection;
}
//释放资源
public static void closeAll(java.sql.Connection connection, Statement state, ResultSet result) {
try{
if(result != null){
result.close();
}
if(state != null){
state.close();
}
if(connection != null){
connection.close();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
编译运行命令
- 在服务器创建一个文件夹mkdir test
- 进入文件夹cd test
- 创建lib文件夹 mkdir lib
- 将必要jar包导入到lib下
- 创建两个类vi SqlToHBase.java,vi DBManager.java复制代码到里面
- 编译运行javac -Djava.ext.dirs=./lib DBManager.java,javac -Djava.ext.dirs=./lib SqlToHBase.java,java -Djava.ext.dirs=./lib SqlToHBase
- 如图
运行结果截图
这是一个MySQL中veetah数据库中的view表迁移到HBase中 view_veetah表截图示例