通过mapreduce把mysql的一张表的数据导到另外一张表中

时间:2024-07-28 09:07:50

怎么安装hadoop集群我在这里就不多说了,我这里安装的是三节点的集群

先在主节点安装mysql

启动mysql

通过mapreduce把mysql的一张表的数据导到另外一张表中

登录mysql

通过mapreduce把mysql的一张表的数据导到另外一张表中

创建数据库,创建表格,先把数据加载到表格 t ,表格t2是空的

通过mapreduce把mysql的一张表的数据导到另外一张表中

mysql> create database mrtest;
Query OK, 1 row affected (0.05 sec) mysql> use mrtest;
Database changed
mysql> CREATE TABLE `t` (
-> `id` int DEFAULT NULL,
-> `name` varchar(10) DEFAULT NULL
-> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.07 sec) mysql> CREATE TABLE `t2` (
-> `id` int DEFAULT NULL,
-> `name` varchar(10) DEFAULT NULL
-> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.01 sec) mysql> insert into t values (1,"june"),(2,"decli"),(3,"hello"),
-> (4,"june"),(5,"decli"),(6,"hello"),(7,"june"),
-> (8,"decli"),(9,"hello"),(10,"june"),
-> (11,"june"),(12,"decli"),(13,"hello");
Query OK, 13 rows affected (0.01 sec)
Records: 13 Duplicates: 0 Warnings: 0

配置一下mysql数据库

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

mysql>  select user,host,password from mysql.user;
+------+------------+-------------------------------------------+
| user | host | password |
+------+------------+-------------------------------------------+
| root | localhost | *6865AFFB6CE8FA9ED6A74985497DDD53FF3B8BAA |
| root | cdh-master | |
| root | 127.0.0.1 | |
| | localhost | |
| | cdh-master | |
| hive | % | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | cdh-master | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | localhost | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
+------+------------+-------------------------------------------+
8 rows in set (0.04 sec) mysql> delete from user where user=' ';
ERROR 1146 (42S02): Table 'mrtest.user' doesn't exist
mysql> use mysql;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A Database changed
mysql> delete from user where user=' ';
Query OK, 2 rows affected (0.05 sec) mysql> select user,host,password from mysql.user;
+------+------------+-------------------------------------------+
| user | host | password |
+------+------------+-------------------------------------------+
| root | localhost | *6865AFFB6CE8FA9ED6A74985497DDD53FF3B8BAA |
| root | cdh-master | |
| root | 127.0.0.1 | |
| hive | % | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | cdh-master | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | localhost | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
+------+------------+-------------------------------------------+
6 rows in set (0.00 sec) mysql> commit;
Query OK, 0 rows affected (0.00 sec) mysql> select user,host,password from mysql.user;
+------+------------+-------------------------------------------+
| user | host | password |
+------+------------+-------------------------------------------+
| root | localhost | *6865AFFB6CE8FA9ED6A74985497DDD53FF3B8BAA |
| root | cdh-master | |
| root | 127.0.0.1 | |
| hive | % | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | cdh-master | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | localhost | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
+------+------------+-------------------------------------------+
6 rows in set (0.00 sec) mysql> delete from user where host='127.0.0.1';
Query OK, 1 row affected (0.03 sec) mysql> delete from user where host='localhost';
Query OK, 2 rows affected (0.00 sec) mysql> commit;
Query OK, 0 rows affected (0.00 sec) mysql> select user,host,password from mysql.user;
+------+------------+-------------------------------------------+
| user | host | password |
+------+------------+-------------------------------------------+
| root | cdh-master | |
| hive | % | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | cdh-master | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
+------+------------+-------------------------------------------+
3 rows in set (0.01 sec) mysql> delete from user where user='hive';
Query OK, 2 rows affected (0.00 sec) mysql> select user,host,password from mysql.user;
+------+------------+----------+
| user | host | password |
+------+------------+----------+
| root | cdh-master | |
+------+------------+----------+
1 row in set (0.00 sec) mysql> grant all privileges on hive.* to hive@'%' identified by 'hive' with grant option;
Query OK, 0 rows affected (0.06 sec) mysql> grant all privileges on hive.* to hive@'master' identified by 'hive' with grant option;
Query OK, 0 rows affected (0.00 sec) mysql> grant all privileges on hive.* to hive@'localhost' identified by 'hive' with grant option;
Query OK, 0 rows affected (0.00 sec) mysql> select user,host,password from mysql.user;
+------+------------+-------------------------------------------+
| user | host | password |
+------+------------+-------------------------------------------+
| root | cdh-master | |
| hive | localhost | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | % | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | master | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
+------+------------+-------------------------------------------+
4 rows in set (0.00 sec) mysql> grant all privileges on *.* to root@'%' identified by 'root';
Query OK, 0 rows affected (0.02 sec) mysql> select user,host,password from mysql.user;
+------+------------+-------------------------------------------+
| user | host | password |
+------+------------+-------------------------------------------+
| root | % | *81F5E21E35407D884A6CD4A731AEBFB6AF209E1B |
| root | cdh-master | |
| hive | localhost | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | % | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | master | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
+------+------------+-------------------------------------------+
5 rows in set (0.00 sec) mysql> grant all privileges on *.* to root@'%' identified by '543116';
Query OK, 0 rows affected (0.00 sec) mysql> select user,host,password from mysql.user;
+------+------------+-------------------------------------------+
| user | host | password |
+------+------------+-------------------------------------------+
| root | % | *6865AFFB6CE8FA9ED6A74985497DDD53FF3B8BAA |
| root | cdh-master | |
| hive | localhost | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | % | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | master | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
+------+------------+-------------------------------------------+
5 rows in set (0.00 sec) mysql> GRANT ALL PRIVILEGES on *.* to 'root'@'localhost' identified by '543116';
Query OK, 0 rows affected (0.00 sec) mysql> select user,host,password from mysql.user;
+------+------------+-------------------------------------------+
| user | host | password |
+------+------------+-------------------------------------------+
| root | % | *6865AFFB6CE8FA9ED6A74985497DDD53FF3B8BAA |
| root | cdh-master | |
| root | localhost | *6865AFFB6CE8FA9ED6A74985497DDD53FF3B8BAA |
| hive | localhost | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | % | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
| hive | master | *4DF1D66463C18D44E3B001A8FB1BBFBEA13E27FC |
+------+------------+-------------------------------------------+
6 rows in set (0.00 sec) mysql> use mrtest;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A Database changed
mysql> show tables;
+------------------+
| Tables_in_mrtest |
+------------------+
| t |
| t2 |
+------------------+
2 rows in set (0.00 sec)

在eclipse创建mapreduce项目

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

在这里说一下我这里是安装的是hadoop2.6.0版本的

通过mapreduce把mysql的一张表的数据导到另外一张表中

如果没有添加对应的hadoop插件的话就这样添加

现在你本地安装的eclipse的dropins文件夹了放入这个插件,然后重新启动eclipse

通过mapreduce把mysql的一张表的数据导到另外一张表中

重启之后

通过mapreduce把mysql的一张表的数据导到另外一张表中

我们可以看到多了这么一项

通过mapreduce把mysql的一张表的数据导到另外一张表中

在这里选定本地安装的hadoop

通过mapreduce把mysql的一张表的数据导到另外一张表中

接下来是加载mysql的驱动包

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

这个是我本地的mysql驱动包

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

在eclipse加载驱动包之后还需要在集群里加载

把驱动包上传的每个节点的hadoop安装目录的lib目录下,是所有节点

通过mapreduce把mysql的一张表的数据导到另外一张表中

把集群启动一下,我这里只是搭建了分布式的3节点没有搭建HA

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

我们现在hdfs上创建一个目录来存放mysql的驱动包

通过mapreduce把mysql的一张表的数据导到另外一张表中

把本地的驱动包上传的hdfs上

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

在代码里面要加上这句来实现

DistributedCache.addFileToClassPath(new Path("hdfs://192.168.241.13:9000/mysqlconnector/mysql-connector-java-5.1.38-bin.jar"), conf);

下面是运行代码

通过mapreduce把mysql的一张表的数据导到另外一张表中

package com.gong.mrmysql;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator; import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable; /**
* Function: 测试 mr 与 mysql 的数据交互,此测试用例将一个表中的数据复制到另一张表中
* 实际当中,可能只需要从 mysql 读,或者写到 mysql 中。
* date: 2013-7-29 上午2:34:04 <br/>
* @author june
*/
public class Mysql2Mr {
// DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
// CREATE TABLE studentinfo (
// id INTEGER NOT NULL PRIMARY KEY,
// name VARCHAR(32) NOT NULL); public static class StudentinfoRecord implements Writable, DBWritable {
int id;
String name; public StudentinfoRecord() { } public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.name = Text.readString(in);
} public String toString() {
return new String(this.id + " " + this.name);
} @Override
public void write(PreparedStatement stmt) throws SQLException {
stmt.setInt(1, this.id);
stmt.setString(2, this.name);
} @Override
public void readFields(ResultSet result) throws SQLException {
this.id = result.getInt(1);
this.name = result.getString(2);
} @Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.id);
Text.writeString(out, this.name);
}
} // 记住此处是静态内部类,要不然你自己实现无参构造器,或者等着抛异常:
// Caused by: java.lang.NoSuchMethodException: DBInputMapper.<init>()
// http://*.com/questions/7154125/custom-mapreduce-input-format-cant-find-constructor
// 网上脑残式的转帖,没见到一个写对的。。。
public static class DBInputMapper extends MapReduceBase implements
Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
public void map(LongWritable key, StudentinfoRecord value,
OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException {
collector.collect(new LongWritable(value.id), new Text(value.toString()));
}
} public static class MyReducer extends MapReduceBase implements
Reducer<LongWritable, Text, StudentinfoRecord, Text> {
@Override
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<StudentinfoRecord, Text> output, Reporter reporter) throws IOException {
String[] splits = values.next().toString().split(" ");
StudentinfoRecord r = new StudentinfoRecord();
r.id = Integer.parseInt(splits[0]);
r.name = splits[1];
output.collect(r, new Text(r.name));
}
} public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(Mysql2Mr.class);
DistributedCache.addFileToClassPath(new Path("hdfs://192.168.241.13:9000/mysqlconnector/mysql-connector-java-5.1.38-bin.jar"), conf); conf.setMapOutputKeyClass(LongWritable.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class); conf.setOutputFormat(DBOutputFormat.class);
conf.setInputFormat(DBInputFormat.class);
// // mysql to hdfs
// conf.setReducerClass(IdentityReducer.class);
// Path outPath = new Path("/tmp/1");
// FileSystem.get(conf).delete(outPath, true);
// FileOutputFormat.setOutputPath(conf, outPath); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.241.13:3306/mrtest",
"root", "543116");
String[] fields = { "id", "name" };
// 从 t 表读数据
DBInputFormat.setInput(conf, StudentinfoRecord.class, "t", null, "id", fields);
// mapreduce 将数据输出到 t2 表
DBOutputFormat.setOutput(conf, "t2", "id", "name");
// conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
conf.setMapperClass(DBInputMapper.class);
conf.setReducerClass(MyReducer.class); JobClient.runJob(conf);
}
}

我们运行一下

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mysql查看t2表看看有没有数据

通过mapreduce把mysql的一张表的数据导到另外一张表中

再运行一次,可以看到t2表又一次被加载进数据了

通过mapreduce把mysql的一张表的数据导到另外一张表中

通过mapreduce把mysql的一张表的数据导到另外一张表中

这里我们就实现了怎么用mapreduce把mysql的一张表的数据加载到另外一张表去了。