map函数:对于矩阵M中的每个元素m(ij),产生一系列的key-value对<(i,k),(M,j,m(ij))>
其中k=1,2.....知道矩阵N的总列数;对于矩阵N中的每个元素n(jk),产生一系列的key-value对<(i , k) , (N , j ,n(jk)>, 其中i=1,2.......直到i=1,2.......直到矩阵M的总列数。
map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
package com.cb.matrix;
import static org.mockito.Matchers.intThat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import com.sun.org.apache.bcel.internal.generic.NEW;
public class MatrixMapper extends Mapper<Object, Text, Text, Text> {
private Text map_key= new Text();
private Text map_value= new Text();
private int columnN;
private int rowM;
/**
* 执行map()函数前先由conf.get()得到main函数中提供的必要变量
* 也就是从输入文件名中得到的矩阵维度信息
*/
@Override
protected void setup(Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration config=context.getConfiguration();
columnN=Integer.parseInt(config.get( "columnN" ));
rowM =Integer.parseInt(config.get( "rowM" ));
}
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//得到文件名,从而区分输入矩阵M和N
FileSplit fileSplit=(FileSplit)context.getInputSplit();
String fileName=fileSplit.getPath().getName();
if (fileName.contains( "M" )) {
String[] tuple =value.toString().split( "," );
int i =Integer.parseInt(tuple[ 0 ]);
String[] tuples=tuple[ 1 ].split( "\t" );
int j=Integer.parseInt(tuples[ 0 ]);
int Mij=Integer.parseInt(tuples[ 1 ]);
for ( int k= 1 ;k<columnN+ 1 ;k++){
map_key.set(i+ "," +k);
map_value.set( "M" + "," +j+ "," +Mij);
context.write(map_key, map_value);
}
}
else if (fileName.contains( "N" )){
String[] tuple=value.toString().split( "," );
int j=Integer.parseInt(tuple[ 0 ]);
String[] tuples =tuple[ 1 ].split( "\t" );
int k=Integer.parseInt(tuples[ 0 ]);
int Njk=Integer.parseInt(tuples[ 1 ]);
for ( int i= 1 ;i<rowM+ 1 ;i++){
map_key.set(i+ "," +k);
map_value.set( "N" + "," +j+ "," +Njk);
context.write(map_key, map_value);
}
}
}
}
|
reduce函数:对于每个键(i,k)相关联的值(M,j,m(ij))及(N,j,n(jk)),根据相同的j值将m(ij)和n(jk)分别存入不同的数组中,然后将俩者的第j个元素抽取出来分别相乘,最后相加,即可得到p(jk)的值。
reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package com.cb.matrix;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MatrixReducer extends Reducer<Text, Text, Text, Text> {
private int sum= 0 ;
private int columnM;
@Override
protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf =context.getConfiguration();
columnM=Integer.parseInt(conf.get( "columnM" ));
}
@Override
protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
int [] M= new int [columnM+ 1 ];
int [] N= new int [columnM+ 1 ];
for (Text val:arg1){
String[] tuple=val.toString().split( "," );
if (tuple[ 0 ].equals( "M" )){
M[Integer.parseInt(tuple[ 1 ])]=Integer.parseInt(tuple[ 2 ]);
} else {
N[Integer.parseInt(tuple[ 1 ])]=Integer.parseInt(tuple[ 2 ]);
}
for ( int j= 1 ;j<columnM+ 1 ;j++){
sum+=M[j]*N[j];
}
arg2.write(arg0, new Text(Integer.toString(sum)));
sum= 0 ;
}
}
}
|
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
原文链接:https://my.oschina.net/u/3264690/blog/909239