向MapReduce转换:通过部分成绩计算矩阵乘法

时间:2021-09-20 11:28:14

代码共分为四部分:


<strong><span style="font-size:18px;">/***
* @author YangXin
* @info 封装共现关系列
*/
package unitSix;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VectorWritable;

public class CooccurrenceColumnWrapperMapper extends Mapper<IntWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{
public void map(IntWritable key, VectorWritable value, Context context) throws IOException, InterruptedException{
context.write(key, new VectorOrPrefWritable(value.get()));
}
}
</span></strong>

<strong><span style="font-size:18px;">/***
* @author YangXin
* @info 分割用户数量
*/
package unitSix;

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class UserVectorSplitterMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{
public void map(VarLongWritable key, VectorWritable value, Context context) throws IOException, InterruptedException{
long userID = key.get();
Vector userVector = value.get();
Iterator<Vector.Element> it = userVector.nonZeroes().iterator();
IntWritable itemIndexWritable = new IntWritable();
while(it.hasNext()){
Vector.Element e = it.next();
int itemIndex = e.index();
float preferenceValue = (float)e.get();
itemIndexWritable.set(itemIndex);
context.write(itemIndexWritable, new VectorOrPrefWritable(userID, preferenceValue));
}
}
}</span></strong>


<strong><span style="font-size:18px;">/***
* @author YangXin
* @info 计算部分推荐向量
*/
package unitSix;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class PartialMultiplyMapper extends Mapper<IntWritable, VectorAndPrefsWritable, VarLongWritable, VectorWritable>{
public void map(IntWritable key, VectorAndPrefsWritable vectorAndPrefsWritable, Context context) throws IOException, InterruptedException{
Vector cooccurrenceColumn = vectorAndPrefsWritable.getVector();
List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
List<Float> prefValues = vectorAndPrefsWritable.getValues();
for(int i = 0; i < userIDs.size(); i++){
long userID = userIDs.get(i);
float prefValue = prefValues.get(i);
Vector partialProduct = cooccurrenceColumn.times(prefValue);
context.write(new VarLongWritable(userID), new VectorWritable(partialProduct));;
}
}
}
</span></strong>


<strong><span style="font-size:18px;">/***
* @author YangXin
* @info 实现部分成绩的combiner
*/
package unitSix;

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;

public class AggregateCombiner extends Reducer<VarLongWritable, VectorWritable, VarLongWritable, VectorWritable>{
public void reduce(VarLongWritable key, Iterable<VectorWritable> values, Context context) throws IOException, InterruptedException{
Vector partial = null;
for(VectorWritable vectorWritable : values){
partial = partial == null ? vectorWritable.get() : partial.plus(vectorWritable.get());
}
context.write(key, new VectorWritable(partial));
}
}
</span></strong>