把mapreduce结果导入数据库的时候出现错误

时间:2022-06-21 21:42:43
我的目的是想先从文件中读取数据经mapreduce处理后再将结果数据导入到数据库中。但是提示好像数据类型出错。
代码如下:

package com.sun.mysql;

import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
/** * 将mapreduce的结果数据写入mysql中 */
public class mysqlmapre {
/**  * 重写DBWritable
     * TblsWritable需要向mysql中写入数据
     */
    public static class TblsWritable implements Writable, DBWritable
    {  
            String word;  
            int count;  
            public TblsWritable()
            {  
            
            }  
            public TblsWritable(String word,int count)
            {  
            this.word = word;
            this.count = count;
            }  
            @Override  
            public void write(PreparedStatement statement) throws SQLException
            {
                    statement.setString(1, this.word);  
                    statement.setInt(2, this.count);  
            }  
            @Override  
            public void readFields(ResultSet resultSet) throws SQLException
            {  
                    this.word = resultSet.getString(1);  
                    this.count = resultSet.getInt(2);  
            }  
            @Override  
            public void write(DataOutput out) throws IOException
            {  
                    out.writeUTF(this.word);
                    out.writeInt(this.count);
            }  
            @Override  
            public void readFields(DataInput in) throws IOException
            {  
                    this.word = in.readUTF();  
                    this.count = in.readInt();  
            }  
            public String toString()
            {  
                return new String(this.word + " " + this.count);  
            }  
    }
    public static class ConnMysqlMapper extends Mapper<Object,Text,Text, IntWritable>{
    //TblsRecord是自定义的类型,也就是上面重写的DBWritable类

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException
        {  
        //<首字母偏移量,该行内容>接收进来,然后处理value,将abc和x作为map的输出
        //key对于本程序没有太大的意义,没有使用

                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                 String token = itr.nextToken();
                    for(int i=0;i<input.length;i++)
                  if((token.compareToIgnoreCase(input[i]))==0){
                            word.set(token);
                            context.write(word, one);
                        }
                    
              }
        }  
   }  
    public static class ConnMysqlReducer extends Reducer<LongWritable,Text,TblsWritable,TblsWritable>{  
    
        public void reduce(LongWritable key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{  
        //接收到的key value对即为要输入数据库的字段,所以在reduce中:
        //wirte的第一个参数,类型是自定义类型TblsWritable,利用key和value将其组合成TblsWritable,
        // 然后等待写入数据库
        //wirte的第二个参数,wirte的第一个参数已经涵盖了要输出的类型,所以第二个类型没有用,设为null
        for(Iterator<IntWritable> itr = values.iterator();itr.hasNext();){  
                     context.write(new TblsWritable(key.toString(),itr.next().get()),null);
                 }  
        
        }  
    }  
public static  String input[] = null;
public static void readTxtFile(String filePath){
        try {
                File file=new File(filePath);
                if(file.isFile() && file.exists()){ //判断文件是否存在
                    InputStreamReader read = new InputStreamReader(
                    new FileInputStream(file));
                    BufferedReader bufferedReader = new BufferedReader(read);
                    String lineTxt = null;
                    while((lineTxt = bufferedReader.readLine()) != null){
                        String a = lineTxt;                       
                        input = a.split(" ");
                      
                    }
                    read.close();
        }else{
            System.out.println("找不到指定的文件");
        }
        } catch (Exception e) {
            System.out.println("读取文件内容出错");
            e.printStackTrace();
        }      
    }


@SuppressWarnings("deprecation")
public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException
    {
        Configuration conf = new Configuration();
        DistributedCache.addFileToClassPath(new Path("/tmp/mysql-connector-java-5.1.40-bin.jar"), conf);
     String filePath = "/usr/local/hadoop/ab.txt";
     readTxtFile(filePath);
  args = new String[]{"hdfs://master:9000/user/hadoop/input/a.txt"};
 
  DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.1.121:3306/mapreducetest","root", "123456");    
                 
        Job job = Job.getInstance(conf, "test mysql connection");
        job.setJarByClass(mysqlmapre.class); 
       
        job.setMapperClass(ConnMysqlMapper.class);  
        job.setReducerClass(ConnMysqlReducer.class);  
          
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);
        
        job.setInputFormatClass(TextInputFormat.class);  
        job.setOutputFormatClass(DBOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        
        DBOutputFormat.setOutput(job, "t1", "word","count");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}



运行后提示的错误是:
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-24 15:00:47,361 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1380292941_0001 failed with state FAILED due to: NA
2017-03-24 15:00:47,424 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 35

求大神帮忙解决下问题,或者提点下在下这是错哪里了?多谢了!

7 个解决方案

#1


157,158行应该改成:

        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(IntWritable.class);         
        job.setOutputKeyClass(TblsWritable.class);  
        job.setOutputValueClass(TblsWritable.class);

102,104行应该改成:
 
    public static class ConnMysqlReducer extends Reducer<Text, IntWritable, TblsWritable, TblsWritable>{       
        public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException,InterruptedException{  
   

         

#2


谢谢!!果然有用!大神!!

#3


引用 1 楼 moguobiao 的回复:
157,158行应该改成:

        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(IntWritable.class);         
        job.setOutputKeyClass(TblsWritable.class);  
        job.setOutputValueClass(TblsWritable.class);

102,104行应该改成:
 
    public static class ConnMysqlReducer extends Reducer<Text, IntWritable, TblsWritable, TblsWritable>{       
        public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException,InterruptedException{  
   

         


我理解了你之前回复里的意思,我把ConnMysqlReducer 类里的功能改了一下(我想把map的结果在reduce里相加),就把
public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>
这行输出类型改了,可是还是不行,又出现上面一样的异常。
  
public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>{  
     private IntWritable result = new IntWritable();
        public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{  
               int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
        }  
    }  


异常提示:
2017-03-26 21:54:40,092 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local1956595032_0001
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:115)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-26 21:54:40,094 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1956595032_0001 failed with state FAILED due to: NA

麻烦再帮忙看下,谢谢…

#4


你这样修改,Reducer输出的是<Text,IntWritable>,不再是TblsWritable。相应的161行要做改动(删了,用default)。

如果还是想写入数据库,可以把求和的逻辑取代109-111行:

 int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
 context.write(new TblsWritable(key.toString(),sum),null);
                 

#5


!!!谢谢!看来我还是经验太少,需要高手提点~
原来把write里面的类型改下就好了~
哈哈,多谢了~

#6


引用 4 楼 moguobiao 的回复:
你这样修改,Reducer输出的是<Text,IntWritable>,不再是TblsWritable。相应的161行要做改动(删了,用default)。

如果还是想写入数据库,可以把求和的逻辑取代109-111行:

 int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
 context.write(new TblsWritable(key.toString(),sum),null);
                 


真不好意思…我……我还有一个问题,我想在代码中实现删除MYSQL中之前已经存在的相同word(或者将之前表中的数据全删了),请问要怎么办?在网上找不到在类DBConfiguration下实例化statement,来删除数据库对象。

#7


引用 6 楼 u013525058 的回复:
Quote: 引用 4 楼 moguobiao 的回复:

你这样修改,Reducer输出的是<Text,IntWritable>,不再是TblsWritable。相应的161行要做改动(删了,用default)。

如果还是想写入数据库,可以把求和的逻辑取代109-111行:

 int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
 context.write(new TblsWritable(key.toString(),sum),null);
                 


真不好意思…我……我还有一个问题,我想在代码中实现删除MYSQL中之前已经存在的相同word(或者将之前表中的数据全删了),请问要怎么办?在网上找不到在类DBConfiguration下实例化statement,来删除数据库对象。


将之前表中的数据全删了 -- This should be easy, delete the whole table before you run the MapReduce job.
删除MYSQL中之前已经存在的相同word --  DBOutputFormat has a method  constructQuery(), you may create your own custom DBOutputFormat, override this method, where you delete the record before inserting.

#1


157,158行应该改成:

        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(IntWritable.class);         
        job.setOutputKeyClass(TblsWritable.class);  
        job.setOutputValueClass(TblsWritable.class);

102,104行应该改成:
 
    public static class ConnMysqlReducer extends Reducer<Text, IntWritable, TblsWritable, TblsWritable>{       
        public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException,InterruptedException{  
   

         

#2


谢谢!!果然有用!大神!!

#3


引用 1 楼 moguobiao 的回复:
157,158行应该改成:

        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(IntWritable.class);         
        job.setOutputKeyClass(TblsWritable.class);  
        job.setOutputValueClass(TblsWritable.class);

102,104行应该改成:
 
    public static class ConnMysqlReducer extends Reducer<Text, IntWritable, TblsWritable, TblsWritable>{       
        public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException,InterruptedException{  
   

         


我理解了你之前回复里的意思,我把ConnMysqlReducer 类里的功能改了一下(我想把map的结果在reduce里相加),就把
public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>
这行输出类型改了,可是还是不行,又出现上面一样的异常。
  
public static class ConnMysqlReducer extends Reducer<Text,IntWritable,Text,IntWritable>{  
     private IntWritable result = new IntWritable();
        public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{  
               int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
        }  
    }  


异常提示:
2017-03-26 21:54:40,092 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local1956595032_0001
java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.Text cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:115)
at com.sun.mysql.mysqlmapre$ConnMysqlReducer.reduce(mysqlmapre.java:1)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2017-03-26 21:54:40,094 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local1956595032_0001 failed with state FAILED due to: NA

麻烦再帮忙看下,谢谢…

#4


你这样修改,Reducer输出的是<Text,IntWritable>,不再是TblsWritable。相应的161行要做改动(删了,用default)。

如果还是想写入数据库,可以把求和的逻辑取代109-111行:

 int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
 context.write(new TblsWritable(key.toString(),sum),null);
                 

#5


!!!谢谢!看来我还是经验太少,需要高手提点~
原来把write里面的类型改下就好了~
哈哈,多谢了~

#6


引用 4 楼 moguobiao 的回复:
你这样修改,Reducer输出的是<Text,IntWritable>,不再是TblsWritable。相应的161行要做改动(删了,用default)。

如果还是想写入数据库,可以把求和的逻辑取代109-111行:

 int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
 context.write(new TblsWritable(key.toString(),sum),null);
                 


真不好意思…我……我还有一个问题,我想在代码中实现删除MYSQL中之前已经存在的相同word(或者将之前表中的数据全删了),请问要怎么办?在网上找不到在类DBConfiguration下实例化statement,来删除数据库对象。

#7


引用 6 楼 u013525058 的回复:
Quote: 引用 4 楼 moguobiao 的回复:

你这样修改,Reducer输出的是<Text,IntWritable>,不再是TblsWritable。相应的161行要做改动(删了,用default)。

如果还是想写入数据库,可以把求和的逻辑取代109-111行:

 int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
 context.write(new TblsWritable(key.toString(),sum),null);
                 


真不好意思…我……我还有一个问题,我想在代码中实现删除MYSQL中之前已经存在的相同word(或者将之前表中的数据全删了),请问要怎么办?在网上找不到在类DBConfiguration下实例化statement,来删除数据库对象。


将之前表中的数据全删了 -- This should be easy, delete the whole table before you run the MapReduce job.
删除MYSQL中之前已经存在的相同word --  DBOutputFormat has a method  constructQuery(), you may create your own custom DBOutputFormat, override this method, where you delete the record before inserting.