import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MapReduceDemo extends Configured implements Tool{ static class DemoMapper extends Mapper<LongWritable,Text,Text,Text>{ private String filename; private int rownum=2; private int colnum=2; private int rowA=1; private int rowB=1; protected void setup(Context context)throws IOException,InterruptedException{ InputSplit split=context.getInputSplit(); filename=((FileSplit)split).getPath().getName(); } protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{ String[] token=value.toString().split(","); if(filename.equals("A")){ for(int i=1;i<=colnum;++i){ Text k=new Text(rowA+","+i); for(int j=1;j<=token.length;++j){ Text v=new Text(filename+":"+j+","+token[j-1]); context.write(k, v); System.out.println(k.toString()+" "+v.toString()); } } ++rowA; } else if(filename.equals("B")){ for(int i=1;i<=rownum;++i){ for(int j=1;j<=token.length;++j){ Text k=new Text(i+","+j); Text v=new Text(filename+":"+rowB+","+token[j-1]); context.write(k, v); System.out.println(k.toString()+" "+v.toString()); } } ++rowB; } } } static class DemoReducer extends Reducer<Text,Text,Text,IntWritable>{ private Map<String,String> mapA=new HashMap<String,String>(); private Map<String,String> mapB=new HashMap<String,String>(); protected void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{ for(Text line:values){ String filename=line.toString().substring(0,1); if(filename.equals("A")){ mapA.put(line.toString().substring(2,3), line.toString().substring(4)); }else if(filename.equals("B")){ mapB.put(line.toString().substring(2, 3),line.toString().substring(4)); } } int result=0; Iterator<String> it=mapA.keySet().iterator(); while(it.hasNext()){ String mapk=it.next(); result+=Integer.parseInt(mapA.get(mapk))*Integer.parseInt(mapB.get(mapk)); } context.write(key, new IntWritable(result)); System.out.println(); } } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int exitcode=ToolRunner.run(new MapReduceDemo(), args); System.exit(exitcode); } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration conf=new Configuration(); Job job=new Job(conf); job.setJarByClass(getClass()); FileInputFormat.setInputPaths(job, new Path(arg0[0]),new Path(arg0[1])); FileOutputFormat.setOutputPath(job, new Path(arg0[2])); job.setMapperClass(DemoMapper.class); job.setReducerClass(DemoReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true)? 1:0; } }
A 矩阵2*3
B矩阵 3*2