import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
/** * 将mapreduce的结果数据写入mysql中 */
public class mysqlmapre {
/** * 重写DBWritable
* TblsWritable需要向mysql中写入数据
*/
public static class TblsWritable implements Writable, DBWritable
{
String word;
int count;
public TblsWritable()
{
}
public TblsWritable(String word,int count)
{
this.word = word;
this.count = count;
}
@Override
public void write(PreparedStatement statement) throws SQLException
{
statement.setString(1, this.word);
statement.setInt(2, this.count);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException
{
this.word = resultSet.getString(1);
this.count = resultSet.getInt(2);
}
@Override
public void write(DataOutput out) throws IOException
{
out.writeUTF(this.word);
out.writeInt(this.count);
}
@Override
public void readFields(DataInput in) throws IOException
{
this.word = in.readUTF();
this.count = in.readInt();
}
public String toString()
{
return new String(this.word + " " + this.count);
}
}
public static class ConnMysqlMapper extends Mapper<Object,Text,Text, IntWritable>{
//TblsRecord是自定义的类型,也就是上面重写的DBWritable类
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key,Text value,Context context)throws IOException,InterruptedException
{
//<首字母偏移量,该行内容>接收进来,然后处理value,将abc和x作为map的输出
//key对于本程序没有太大的意义,没有使用
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String token = itr.nextToken();
for(int i=0;i<input.length;i++)
if((token.compareToIgnoreCase(input[i]))==0){
word.set(token);
context.write(word, one);
}
}
}
}
public static class ConnMysqlReducer extends Reducer<LongWritable,Text,TblsWritable,TblsWritable>{
运行后提示的错误是:
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-24 15:00:47,361 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1380292941_0001 failed with state FAILED due to: NA
2017-03-24 15:00:47,424 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 35
public static class ConnMysqlReducer extends Reducer<Text, IntWritable, TblsWritable, TblsWritable>{
public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException,InterruptedException{
#2
谢谢!!果然有用!大神!!
#3
我理解了你之前回复里的意思,我把ConnMysqlReducer 类里的功能改了一下(我想把map的结果在reduce里相加),就把
public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>
这行输出类型改了,可是还是不行,又出现上面一样的异常。
public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
异常提示:
2017-03-26 21:54:40,092 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local1956595032_0001
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:115)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-26 21:54:40,094 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1956595032_0001 failed with state FAILED due to: NA
将之前表中的数据全删了 -- This should be easy, delete the whole table before you run the MapReduce job.
删除MYSQL中之前已经存在的相同word --
DBOutputFormat has a method
constructQuery(), you may create your own custom DBOutputFormat, override this method, where you delete the record before inserting.
public static class ConnMysqlReducer extends Reducer<Text, IntWritable, TblsWritable, TblsWritable>{
public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException,InterruptedException{
我理解了你之前回复里的意思,我把ConnMysqlReducer 类里的功能改了一下(我想把map的结果在reduce里相加),就把
public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>
这行输出类型改了,可是还是不行,又出现上面一样的异常。
public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
异常提示:
2017-03-26 21:54:40,092 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local1956595032_0001
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:115)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-26 21:54:40,094 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1956595032_0001 failed with state FAILED due to: NA
将之前表中的数据全删了 -- This should be easy, delete the whole table before you run the MapReduce job.
删除MYSQL中之前已经存在的相同word --
DBOutputFormat has a method
constructQuery(), you may create your own custom DBOutputFormat, override this method, where you delete the record before inserting.