mr微博内容推荐

时间:2022-12-10 14:41:17
 
 

 第一次迭代
1 package com.laoxiao.mr.weibo; import java.io.StringReader; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme; /**
* 第一个MR,计算TF和计算N(微博总数)
* @author root
*
*/
public class firstMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ protected void map(LongWritable key, Text value, Context context)
throws java.io.IOException ,InterruptedException {
String [] temp=StringUtils.split(value.toString(),"\t");
if(temp.length>=2){
String id=temp[0].trim();
String str=temp[1].trim();
StringReader sr =new StringReader(str);
IKSegmenter ikSegmenter =new IKSegmenter(sr, true);
Lexeme word=null;
while( (word=ikSegmenter.next()) !=null ){
String w= word.getLexemeText();
context.write(new Text(w+"_"+id), new IntWritable(1));
}
context.write(new Text("count"), new IntWritable(1));
}else{
System.out.println("value is error:"+value.toString());
}
};
}
package com.laoxiao.mr.weibo; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import sun.management.resources.agent; public class firstReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ protected void reduce(Text arg0, java.lang.Iterable<IntWritable> arg1, Context arg2)
throws java.io.IOException ,InterruptedException {
int sum=0;
for (IntWritable i : arg1) {
sum+=i.get();
}
arg2.write(arg0, new IntWritable(sum));
};
} package com.laoxiao.mr.weibo; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class firstRepartition extends HashPartitioner<Text, IntWritable>{ @Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
if(key.toString().equals("count")){
return 3;
}else{
return super.getPartition(key, value, numReduceTasks-1);
} }
} package com.laoxiao.mr.weibo; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class firstJob { public static void main(String[] args) {
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://node1:8020");
config.set("yarn.resourcemanager.hostname", "node1");
try {
FileSystem fs =FileSystem.get(config);
Job job=Job.getInstance(config);
job.setJarByClass(firstJob.class);
job.setJobName("weibo1"); job.setMapperClass(firstMapper.class);
job.setReducerClass(firstReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setPartitionerClass(firstRepartition.class);
//job.setCombinerClass(firstReducer.class);
job.setNumReduceTasks(4); FileInputFormat.addInputPath(job, new Path("/root/input/data/weibo.txt")); Path path =new Path("/usr/output/weibo1");
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job,path); boolean f= job.waitForCompletion(true);
if(f){
System.out.println("first job run finished!!");
} } catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

第二次迭代

 package com.laoxiao.mr.weibo;

 import java.io.IOException;

 import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
//统计df:词在多少个微博中出现过。
public class secondMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException { //获取当前 mapper task的数据片段(split)
FileSplit fs = (FileSplit) context.getInputSplit(); if (!fs.getPath().getName().contains("part-r-00003")) { String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
String[] ss = v[0].split("_");
if (ss.length >= 2) {
String w = ss[0];
context.write(new Text(w), new IntWritable(1));
}
} else {
System.out.println(value.toString() + "-------------");
}
} }
}
package com.laoxiao.mr.weibo; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class secondReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ protected void reduce(Text arg0, java.lang.Iterable<IntWritable> arg1, Context context)
throws java.io.IOException ,InterruptedException {
int sum=0;
for (IntWritable i : arg1) {
sum+=1;
}
context.write(arg0, new IntWritable(sum));
};
}
package com.laoxiao.mr.weibo; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class secondJob { public static void main(String[] args) {
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://node1:8020");
config.set("yarn.resourcemanager.hostname", "node1");
try {
FileSystem fs =FileSystem.get(config);
Job job=Job.getInstance(config);
job.setJarByClass(secondJob.class);
job.setJobName("weibo2"); job.setMapperClass(secondMapper.class);
job.setReducerClass(secondReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//job.setPartitionerClass(firstRepartition.class);
//job.setCombinerClass(firstReducer.class);
//job.setNumReduceTasks(4); FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1")); Path path =new Path("/usr/output/weibo2");
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job,path); boolean f= job.waitForCompletion(true);
if(f){
System.out.println("second job run finished!!");
} } catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

第三次迭代

package com.laoxiao.mr.weibo;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme; /**
* 最后计算
* @author root
*
*/
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
//存放微博总数
public static Map<String, Integer> cmap = null;
//存放df
public static Map<String, Integer> df = null; // 在map方法执行之前
protected void setup(Context context) throws IOException,
InterruptedException {
System.out.println("******************");
if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) { URI[] ss = context.getCacheFiles();
if (ss != null) {
for (int i = 0; i < ss.length; i++) {
URI uri = ss[i];
if (uri.getPath().endsWith("part-r-00003")) {//微博总数
Path path =new Path(uri.getPath());
// FileSystem fs =FileSystem.get(context.getConfiguration());
// fs.open(path);
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line = br.readLine();
if (line.startsWith("count")) {
String[] ls = line.split("\t");
cmap = new HashMap<String, Integer>();
cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
} else if (uri.getPath().endsWith("part-r-00000")) {//词条的DF
df = new HashMap<String, Integer>();
Path path =new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line;
while ((line = br.readLine()) != null) {
String[] ls = line.split("\t");
df.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
}
}
}
}
} protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
// System.out.println("--------------------");
if (!fs.getPath().getName().contains("part-r-00003")) { String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
int tf =Integer.parseInt(v[1].trim());//tf值
String[] ss = v[0].split("_");
if (ss.length >= 2) {
String w = ss[0];
String id=ss[1]; double s=tf * Math.log(cmap.get("count")/df.get(w));
NumberFormat nf =NumberFormat.getInstance();
nf.setMaximumFractionDigits(5);
context.write(new Text(id), new Text(w+":"+nf.format(s)));
}
} else {
System.out.println(value.toString() + "-------------");
}
}
}
}
package com.laoxiao.mr.weibo; import java.io.IOException; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class LastReduce extends Reducer<Text, Text, Text, Text>{ protected void reduce(Text key, Iterable<Text> arg1,
Context context)
throws IOException, InterruptedException { StringBuffer sb =new StringBuffer(); for( Text i :arg1 ){
sb.append(i.toString()+"\t");
} context.write(key, new Text(sb.toString()));
} }
package com.laoxiao.mr.weibo; import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LastJob { public static void main(String[] args) {
Configuration config =new Configuration();
config.set("fs.defaultFS", "hdfs://node1:8020");
config.set("yarn.resourcemanager.hostname", "node1");
//config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");
try {
FileSystem fs =FileSystem.get(config);
//JobConf job =new JobConf(config);
Job job =Job.getInstance(config);
job.setJarByClass(LastJob.class);
job.setJobName("weibo3"); // DistributedCache.addCacheFile(uri, conf);
//2.5
//把微博总数加载到内存
job.addCacheFile(new Path("/usr/output/weibo1/part-r-00003").toUri());
//把df加载到内存
job.addCacheFile(new Path("/usr/output/weibo2/part-r-00000").toUri()); //设置map任务的输出key类型、value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setMapperClass();
job.setMapperClass(LastMapper.class);
job.setReducerClass(LastReduce.class); //mr运行时的输入数据从hdfs的哪个目录中获取
FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
Path outpath =new Path("/usr/output/weibo3");
if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job,outpath ); boolean f= job.waitForCompletion(true);
if(f){
System.out.println("执行job成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}