mapreduce 实现pagerank

时间:2022-05-05 09:25:46
输入格式:
A  1  B,C,D
   B  1  C,D
map:
   B  A  1/3
   C  A  1/3
   D  A  1/3
   A  |B,C,D
   C  B  1/2
   D  B  1/2
   B  |C,D
reduce:
   B  (1-0.85)+0.85*1/3  C,D    C  (1-0.85)+0.85*5/6  
     D  (1-0.85)+0.85*5/6
A (1-0.85)+0.85*0  B,C,D import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRankIter {
private static final double damping = 0.85; public static class PRIterMapper extends
Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] tuple = line.split("\t");
String pageKey = tuple[0];
double pr = Double.parseDouble(tuple[1]); if (tuple.length > 2) {
String[] linkPages = tuple[2].split(",");
for (String linkPage : linkPages) {
String prValue =
pageKey + "\t" + String.valueOf(pr / linkPages.length);
context.write(new Text(linkPage), new Text(prValue));
}
context.write(new Text(pageKey), new Text("|" + tuple[2]));
}
}
} public static class PRIterReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String links = "";
double pagerank = 0;
for (Text value : values) {
String tmp = value.toString(); if (tmp.startsWith("|")) {
links = "\t" + tmp.substring(tmp.indexOf("|") + 1);// index从0开始
continue;
} String[] tuple = tmp.split("\t");
if (tuple.length > 1)
pagerank += Double.parseDouble(tuple[1]);
}
pagerank = (double) (1 - damping) + damping * pagerank; // PageRank的计算迭代公式
context.write(new Text(key), new Text(String.valueOf(pagerank) + links));
} } public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job2 = new Job(conf, "PageRankIter");
job2.setJarByClass(PageRankIter.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job2.setMapperClass(PRIterMapper.class);
job2.setReducerClass(PRIterReducer.class);
FileInputFormat.addInputPath(job2, new Path(args[0]));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));
job2.waitForCompletion(true);
}
}
输入为上述的输出
输入格式为:
A  pr
B  pr
... import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRankViewer {
public static class PageRankViewerMapper extends
Mapper<LongWritable, Text, FloatWritable, Text> {
private Text outPage = new Text();
private FloatWritable outPr = new FloatWritable(); public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split("\t");
String page = line[0];
float pr = Float.parseFloat(line[1]);
outPage.set(page);
outPr.set(pr);
context.write(outPr, outPage);
}
} /**重载key的比较函数,使其经过shuffle和sort后反序(从大到小)输出**/
public static class DescFloatComparator extends FloatWritable.Comparator {
// @Override
public float compare(WritableComparator a,
WritableComparable<FloatWritable> b) {
return -super.compare(a, b);
} public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job3 = new Job(conf, "PageRankViewer");
job3.setJarByClass(PageRankViewer.class);
job3.setOutputKeyClass(FloatWritable.class);
job3.setSortComparatorClass(DescFloatComparator.class);
job3.setOutputValueClass(Text.class);
job3.setMapperClass(PageRankViewerMapper.class);
FileInputFormat.addInputPath(job3, new Path(args[0]));
FileOutputFormat.setOutputPath(job3, new Path(args[1]));
job3.waitForCompletion(true);
}
}