二阶矩阵相乘公式
上例中的C11=A11*B11+A12*B21+A13*B31=1*3+0*2+2*1=5、C12=A11*B12+A12*B22+A13*B32=1*1+0*1+2*0=1
分析
因为分布式计算的特点,需要找到相互独立的计算过程,以便能够在不同的节点上进行计算而不会彼此影响。根据矩
阵乘法的公式,C中各个元素的计算都是相互独立的,即各个cij在计算过程中彼此不影响。这样的话,在Map阶段可
以把计算所需要的元素都集中到同一个key中,然后,在Reduce阶段就可以从中解析出各个元素来计算cij。 另外,
以a11为例,它将会在c11、c12...c1p的计算中使用,以b11为例,它将会在c11、c21...cm1的计算中使用,也就是说,在Map阶段,当我们从HDFS取出一行记录时,如
果该记录是A的元素,则需要存储成p个<key, value>对,并且这p个key互不相同;如果该记录是B的元素,则需要存
储成m个<key, value>对,同样的,m个key也应互不相同;但同时,用于存放计算cij的ai1、ai2……ain和b1j、
b2j……bnj的<key, value>对的key应该都是相同的,这样才能被传递到同一个Reduce中。
设计
普遍有一个共识是:数据结构+算法=程序,所以在编写代码之前需要先理清数据存储结构和处理数据的算法。
Map阶段
在Map阶段,需要做的是进行数据准备。把来自矩阵A的元素aij,标识成p条<key, value>的形式,key="i,k",(其中
k=1,2,...,p),value="A,j,Aij";把来自矩阵B的元素bij,标识成m条<key, value>形式,key="k,j"(其中
k=1,2,...,m),value="B,i,Bij"。 经过处理,用于计算cij需要的a、b就转变为有相同key("i,j")的数据对,通过value
中"A"、"B"能区分元素是来自矩阵A还是矩阵B,以及具体的位置(在矩阵A的第几列,在矩阵B的第几行)。
Shuffle阶段
这个阶段是Hadoop自动完成的阶段,具有相同key的value被分到同一个list中,形成<key,list(value)>对,再传递给Reduce。
Reduce阶段
在Reduce阶段,有两个问题需要解决:
a. 当前的<key, list(value)>对是为了计算矩阵C的哪个元素?因为map阶段对数据的处理,key(i,j)中的数据对,就
是其在矩阵C中的位置,第i行j列。
b. list中的每个value是来自矩阵A或矩阵B的哪个位置?这个也在map阶段进行了标记,对于value(x,y,z),只需要找
到y相同的来自不同矩阵(即x分别为A和B)的两个元素,取z相乘,然后加和即可。
矩阵的两种表示方式
矩阵常用的两种表示方式,第一种是原始的表示方式,第二种是稀疏矩阵(只存储非0的元素)的表示方式。
第一种:使用最原始的表示方式,相同行内不同列数据通过","分割,不同行通过换行分割;
第二种:通过行列表示法,即文件中的每行数据有三个元素通过分隔符分割,第一个元素表示行,第二个元素表示
列,第三个元素表示数据。这种方式对于可以不列出为0的元素,即可以减少稀疏矩阵的数据量。
编写代码:
第一种数据结构
查看源数据:
[hadoop@master ~]$ hadoop fs -cat /user/hdfs/matrix/B 10,15 0,2 11,9 [hadoop@master ~]$ hadoop fs -cat /user/hdfs/matrix/A 1,2,3 4,5,0 7,8,9 10,11,12
MartrixMultiply:
package com.oner.mr.matrix; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.oner.mr.util.HdfsDAO; public class MartrixMultiply { public static class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> { private String flag;// A or B; private int m = 4;// 矩阵A的行数 private int p = 2;// 矩阵B的列数 private int rowIndexA = 1; // 矩阵A,当前在第几行 private int rowIndexB = 1; // 矩阵B,当前在第几行 @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); flag = split.getPath().getName();// // 得到读取的矩阵名称 } @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 切分每行数据 String[] fields = MainRun.DELIMITER.split(v1.toString()); if (flag.equals("A")) {// 如果读的是矩阵A,则fields格式为{1,2,3},数组长度为3 for (int k = 1; k <= p; k++) {// p表示矩阵B的列数 Text key = new Text(rowIndexA + "," + k);// for (int j = 0; j < fields.length; j++) {// j代表矩阵A的当前列,fields.length表示矩阵A的列数,等于矩阵B的行数 Text value = new Text("A," + (j + 1) + "," + fields[j]);// v的值为 context.write(key, value);// 输出的数据格式key为(i,k),value为(A,j,Aij)。 System.out.println(key.toString() + " " + value.toString()); } } rowIndexA++; // 每执行一次map方法,矩阵向下移动一行 } else if (flag.equals("B")) {// 如果读的是B,fields的格式为{10,15},数组长度为2 for (int k = 1; k <= m; k++) {// m表示矩阵A的行数 for (int j = 0; j < fields.length; j++) {// fields.length表示矩阵B的列数 Text key = new Text(k + "," + (j + 1)); Text value = new Text("B:" + rowIndexB + "," + fields[j]); context.write(key, value);// 输出的数据格式key为(k,j),value为(B,i,Bij)。 System.out.println(key.toString() + " " + value.toString()); } } rowIndexB++;// 每执行一次map方法,矩阵向下移动一行 } } } public static class MatrixReducer extends Reducer<Text, Text, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Map<String, String> mapA = new HashMap<String, String>(); Map<String, String> mapB = new HashMap<String, String>(); System.out.print(key.toString() + ":"); for (Text value : values) { String val = value.toString(); System.out.print("(" + val + ")"); if (val.startsWith("A")) { String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到A,j,Aij中的j,Aij mapA.put(kv[0], kv[1]);// 将j作为key,Aij作为value存入mapA // System.out.println("A:" + kv[0] + "," + kv[1]); } else if (val.startsWith("B")) { String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到B,j,Bij中的i,Bij mapB.put(kv[0], kv[1]);// 将i作为key,Bij作为value存入mapB // System.out.println("B:" + kv[0] + "," + kv[1]); } } long result = 0; Iterator<String> mkeys = mapA.keySet().iterator();// 得到mapA所有的键集合 while (mkeys.hasNext()) { String mkey = mkeys.next(); if (mapB.get(mkey) == null) {// 因为mkey取的是mapA的key集合,所以只需要判断mapB是否存在即可。 continue; } result += Long.parseLong(mapA.get(mkey)) * Long.parseLong(mapB.get(mkey)); } context.write(key, new LongWritable(result)); System.out.println(); // System.out.println("C:" + key.toString() + "," + result); } } public static void run(Map<String, String> path) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String input = path.get("input"); String input1 = path.get("input1"); String input2 = path.get("input2"); String output = path.get("output"); HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf); hdfs.rmr(input); hdfs.mkdirs(input); hdfs.copyFile(path.get("A"), input1); hdfs.copyFile(path.get("B"), input2); Job job = Job.getInstance(conf); job.setJarByClass(MainRun.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(MatrixMapper.class); job.setReducerClass(MatrixReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集 FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); } }
MainRun:
package com.oner.mr.matrix; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; /* * 驱动程序 */ public class MainRun { public static final String HDFS = "hdfs://master:9000"; public static final Pattern DELIMITER = Pattern.compile("[\t,]"); public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { martrixMultiply(); } private static void martrixMultiply() throws ClassNotFoundException, IOException, InterruptedException { Map<String, String> path = new HashMap<String, String>(); path.put("A", "/home/hadoop/logfile/matrix/A.csv");// 本地的数据文件 path.put("B", "/home/hadoop/logfile/matrix/B.csv"); path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目录 path.put("input1", HDFS + "/user/hdfs/matrix/A"); path.put("input2", HDFS + "/user/hdfs/matrix/B"); path.put("output", HDFS + "/user/hdfs/matrix/output"); MartrixMultiply.run(path); } }
打成jar包后运行:hadoop jar matrix.jar com.oner.mr.matrix.MainRun
查看结果:
[hadoop@master ~]$ hadoop fs -cat /user/hdfs/matrix/output/part-r-00000 1,1 43 1,2 46 2,1 40 2,2 70 3,1 169 3,2 202 4,1 232 4,2 280绘图演示结果:
第二种数据结构
MainRun:
package com.oner.mr.sparsematrix; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; /* * 驱动程序 */ public class MainRun { public static final String HDFS = "hdfs://master:9000"; public static final Pattern DELIMITER = Pattern.compile("[\t,]"); public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { sparseMartrixMultiply(); } private static void sparseMartrixMultiply() throws ClassNotFoundException, IOException, InterruptedException { Map<String, String> path = new HashMap<String, String>(); path.put("A", "/home/hadoop/logfile/matrix2/A.csv");// 本地的数据文件 path.put("B", "/home/hadoop/logfile/matrix2/B.csv"); path.put("input", HDFS + "/user/hdfs/matrix2");// HDFS的目录 path.put("input1", HDFS + "/user/hdfs/matrix2/A"); path.put("input2", HDFS + "/user/hdfs/matrix2/B"); path.put("output", HDFS + "/user/hdfs/matrix2/output"); MartrixMultiply.run(path); } }
MartrixMultiply:
package com.oner.mr.sparsematrix; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import com.oner.mr.util.HdfsDAO; public class MartrixMultiply { public static class SparseMatrixMapper extends Mapper<LongWritable, Text, Text, Text> { private String flag;// A or B; private int m = 4;// 矩阵A的行数 private int p = 2;// 矩阵B的列数 @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); flag = split.getPath().getName();// // 得到读取的矩阵名称 } @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 切分每行数据 String[] fields = MainRun.DELIMITER.split(v1.toString()); if ("A".equals(flag)) { for (int i = 1; i <= p; i++) { context.write(new Text(fields[0] + "," + i), new Text("A," + fields[1] + "," + fields[2])); } } else if ("B".equals(flag)) { for (int i = 1; i <= m; i++) { context.write(new Text(i + "," + fields[1]), new Text("B," + fields[0] + "," + fields[2])); } } } } public static class SparseMatrixReducer extends Reducer<Text, Text, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Map<String, String> mapA = new HashMap<String, String>(); Map<String, String> mapB = new HashMap<String, String>(); for (Text value : values) { String val = value.toString(); if (val.startsWith("A")) { String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到A,j,Aij中的j,Aij mapA.put(kv[0], kv[1]);// 将j作为key,Aij作为value存入mapA } else if (val.startsWith("B")) { String[] kv = MainRun.DELIMITER.split(val.substring(2));// 得到B,j,Bij中的i,Bij mapB.put(kv[0], kv[1]);// 将i作为key,Bij作为value存入mapB } } long result = 0; // 可能在mapA中存在在mapB中不存在的key,或相反情况 // 因为,数据定义的时候使用的是稀疏矩阵的定义 // 所以,这种只存在于一个map中的key,说明其对应元素为0,不影响结果 Iterator<String> mkeys = mapA.keySet().iterator();// 得到mapA所有的键集合 while (mkeys.hasNext()) { String mkey = mkeys.next(); if (mapB.get(mkey) == null) {// 因为mkey取的是mapA的key集合,所以只需要判断mapB是否存在即可。 continue; } result += Long.parseLong(mapA.get(mkey)) * Long.parseLong(mapB.get(mkey)); } context.write(key, new LongWritable(result)); } } public static void run(Map<String, String> path) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String input = path.get("input"); String input1 = path.get("input1"); String input2 = path.get("input2"); String output = path.get("output"); HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf); hdfs.rmr(input); hdfs.mkdirs(input); hdfs.copyFile(path.get("A"), input1); hdfs.copyFile(path.get("B"), input2); Job job = Job.getInstance(conf); job.setJarByClass(MainRun.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(SparseMatrixMapper.class); job.setReducerClass(SparseMatrixReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集 FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); } }
打成jar包运行:hadoop jar matrix
查看结果:
[hadoop@master matrix2]$ hadoop fs -cat /user/hdfs/matrix2/output/part-r-00000 1,1 43 1,2 46 2,1 40 2,2 70 3,1 169 3,2 202 4,1 232 4,2 280