今天,在做canopy算法实例时,遇到这个问题,所以记录下来。下面是源码:
package czx.com.mahout; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.common.AbstractJob; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.VectorWritable; public class TextVecWrite extends AbstractJob { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new TextVecWrite(), args); } /** * TextVecWriterMapper * @author czx *; */ public static class TextVecWriterMapper extends Mapper<LongWritable, Text, LongWritable,VectorWritable >{ @SuppressWarnings("unchecked") @Override protected void map(LongWritable key, Text value, @SuppressWarnings("rawtypes") org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\\s{1,}"); RandomAccessSparseVector vector = new RandomAccessSparseVector(split.length); for(int i=0;i<split.length;++i){ vector.set(i, Double.parseDouble(split[i])); } VectorWritable vectorWritable = new VectorWritable(vector); context.write(key, vectorWritable); } } /** * TextVectorWritableReducer * @author czx * */ public static class TextVectorWritableReducer extends Reducer<LongWritable, VectorWritable, LongWritable , VectorWritable >{ @Override protected void reduce(LongWritable arg0, Iterable<VectorWritable> arg1, Context arg2) throws IOException, InterruptedException { for(VectorWritable v:arg1){ arg2.write(arg0, v); } } } @Override public int run(String[] arg0) throws Exception { addInputOption(); addOutputOption(); if(parseArguments(arg0)==null){ return -1; } Path input = getInputPath(); Path output = getOutputPath(); Configuration conf = getConf(); Job job = new Job(conf,"textvectorWritable with input:"+input.getName()); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(TextVecWriterMapper.class); job.setReducerClass(TextVectorWritableReducer.class); job.setMapOutputKeyClass(LongWritable.class); job.setOutputValueClass(VectorWritable.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(VectorWritable.class); job.setJarByClass(TextVecWrite.class); FileInputFormat.addInputPath(job, input); SequenceFileOutputFormat.setOutputPath(job, output); if(!job.waitForCompletion(true)){ throw new InterruptedException("Canopy Job failed processing "+input); } return 0; } }
将程序编译打包成JAR并运行如下:
hadoop jar ClusteringUtils.jar czx.com.mahout.TextVecWrite -i /user/hadoop/testdata/synthetic_control.data -o /home/czx/1
但出现如下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/commons/cli2/Option at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.hadoop.util.RunJar.main(RunJar.java:205) Caused by: java.lang.ClassNotFoundException: org.apache.commons.cli2.Option at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 3 more
最后,发现是将mahout根目录下的相应的jar包复制到hadoop-2.4.1/share/hadoop/common/lib文件夹下时,少复制了mahout-core-0.9-job.jar,于是复制mahout-core-0.9-job.jar后,重新启动hadoop即可。