通过MapReduce实现 TF-IDF值的统计
数据:文章ID 文件内容
今天约了姐妹去逛街吃美食,周末玩得很开心啊!
......
......
结果数据:
开心:0.28558719539400335 吃:0.21277211221173534 了:0.1159152517783012 美食:0.29174432675350614 去:0.18044286652763497 玩:0.27205714412756765 啊:0.26272169358877784 姐妹:0.3983823545319593 逛街:0.33320559604063593 得很:0.45170136842118586 周末:0.2672478858982343 今天:0.16923426566752778 约:0.0946874743049455
......
......
在整个的处理过程中通过两步来完成
第一步主要生成三种格式的文件
1、使用分词工具将文章内容进行拆分成多个词条;并记录文章的总词条数 关于分词工具的使用请参考 TF-IDF
第一步处理后结果:
今天_3823890378201539 A:,B:,
周末_3823890378201539 A:,B:,
得很_3823890378201539 A:,B:,
约_3823890378201539 B:,A:,
......
2、记录词条在多少篇文章中出现过
处理后结果:
今天
周末
约
......
3、记录文章总数
处理后结果:
counter
第二步将文件2,3的内容加载到缓存,利用2,3文件的内容对文件1的内容通过mapreduce进行计算
针对数据量不是很大的数据可以加载到缓存,如果数据量过大,不考虑这种方式;
源码
Step1.java:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme; import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry; /**
* Created by Edward on 2016/7/21.
*/
public class Step1 { public static void main(String[] args)
{
//access hdfs's user
//System.setProperty("HADOOP_USER_NAME","root"); Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020"); try {
FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf);
job.setJarByClass(RunJob.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setPartitionerClass(FilterPartition.class); //需要指定 map out 的 key 和 value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); //设置reduce task的数量
job.setNumReduceTasks(4); FileInputFormat.addInputPath(job, new Path("/test/tfidf/input")); Path path = new Path("/test/tfidf/output");
if(fs.exists(path))//如果目录存在,则删除目录
{
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path); boolean b = job.waitForCompletion(true);
if(b)
{
System.out.println("OK");
} } catch (Exception e) {
e.printStackTrace();
}
} public static class MyMapper extends Mapper<LongWritable, Text, Text, Text > {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Map<String, Integer> map = new HashMap<String, Integer>(); String[] str = value.toString().split("\t");
StringReader stringReader = new StringReader(str[1]);
IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);
Lexeme lexeme = null;
Long count = 0l;
while((lexeme = ikSegmenter.next())!=null) {
String word = lexeme.getLexemeText();
if(map.containsKey(word)) {
map.put(word, map.get(word)+1);
}
else{
map.put(word, 1);
}
count++;
}
for(Entry<String, Integer> entry: map.entrySet())
{
context.write(new Text(entry.getKey()+"_"+str[0]), new Text("A:"+entry.getValue()));//tf词条在此文章中的个数
context.write(new Text(entry.getKey()+"_"+str[0]), new Text("B:"+count));//此文章中的总词条数
context.write(new Text(entry.getKey()),new Text("1"));//词条在此文章中出现+1,计算词条在那些文章中出现过
}
context.write(new Text("counter"), new Text(1+""));//文章数累加器
}
} public static class MyReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //计算总文章数
if(key.toString().equals("conter")) {
long sum = 0l;
for(Text v :values)
{
sum += Long.parseLong(v.toString());
}
context.write(key, new Text(sum+""));
}
else{
if(key.toString().contains("_")) {
StringBuilder stringBuilder = new StringBuilder();
for (Text v : values) {
stringBuilder.append(v.toString());
stringBuilder.append(",");
}
context.write(key, new Text(stringBuilder.toString()));
}
else {//计算词条在那些文章中出现过
long sum = 0l;
for(Text v :values)
{
sum += Long.parseLong(v.toString());
}
context.write(key, new Text(sum+""));
}
}
}
}
}
FilterPartition.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /**
* Created by Edward on 2016/7/22.
*/
public class FilterPartition extends HashPartitioner<Text, Text> { @Override
public int getPartition(Text key, Text value, int numReduceTasks) { if(key.toString().contains("counter"))
{
return numReduceTasks-1;
} if(key.toString().contains("_"))
{
return super.getPartition(key, value, numReduceTasks-2);
}
else
{
return numReduceTasks-2;
}
}
}
Step2.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map; /**
* Created by Edward on 2016/7/22.
*/
public class Step2 {
public static void main(String[] args)
{
//access hdfs's user
//System.setProperty("HADOOP_USER_NAME","root"); Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://node1:8020"); try {
FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf);
job.setJarByClass(RunJob.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class); //需要指定 map out 的 key 和 value
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); //分布式缓存,每个slave都能读到数据
//词条在多少文章中出现过
job.addCacheFile(new Path("/test/tfidf/output/part-r-00002").toUri());
//文章的总数
job.addCacheFile(new Path("/test/tfidf/output/part-r-00003").toUri()); FileInputFormat.addInputPath(job, new Path("/test/tfidf/output")); Path path = new Path("/test/tfidf/output1");
if(fs.exists(path))//如果目录存在,则删除目录
{
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job, path); boolean b = job.waitForCompletion(true);
if(b)
{
System.out.println("OK");
}
} catch (Exception e) {
e.printStackTrace();
}
} public static class MyMapper extends Mapper<LongWritable, Text, Text, Text > { public static Map<String, Double> dfmap = new HashMap<String, Double>(); public static Map<String, Double> totalmap = new HashMap<String, Double>(); @Override
protected void setup(Context context) throws IOException, InterruptedException {
URI[] cacheFiles = context.getCacheFiles();
Path pArtNum = new Path(cacheFiles[0].getPath());
Path pArtTotal = new Path(cacheFiles[1].getPath()); //加载词条在多少篇文章中出现过
BufferedReader buffer = new BufferedReader(new FileReader(pArtNum.getName()));
String line = null;
while((line = buffer.readLine()) != null){
String[] str = line.split("\t");
dfmap.put(str[0], Double.parseDouble(str[1]));
} //加载文章总数
buffer = new BufferedReader(new FileReader(pArtTotal.getName()));
line = null;
while((line = buffer.readLine()) != null){
String[] str = line.split("\t");
totalmap.put(str[0], Double.parseDouble(str[1]));
}
} @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strings = value.toString().split("\t");
String k = strings[0]; if(k.contains("counter")) {
//过滤掉 文章总数
}
else if(k.contains("_")){
String word = k.split("_")[0];
String[] info = strings[1].split(",");
String n=null;
String num=null;
if(info[0].contains("A")){
n = info[0].substring(info[0].indexOf(":")+1);
num = info[1].substring(info[0].indexOf(":")+1);
}
if(info[0].contains("B")){
num = info[0].substring(info[0].indexOf(":")+1);
n = info[1].substring(info[0].indexOf(":")+1);
}
double result = 0l; result = (Double.parseDouble(n)/Double.parseDouble(num)) * Math.log( totalmap.get("counter")/dfmap.get(word));
System.out.println("n=" + Double.parseDouble(n));
System.out.println("num=" + Double.parseDouble(num));
System.out.println("counter=" + totalmap.get("counter"));
System.out.println("wordnum=" + dfmap.get(word));
context.write(new Text(k.split("_")[1]), new Text(word+":"+result));
}
else{
//过滤掉 词条在多少篇文章中出现过
}
}
} public static class MyReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder stringBuilder = new StringBuilder();
for(Text t: values){
stringBuilder.append(t.toString());
stringBuilder.append("\t");
}
context.write(key, new Text(stringBuilder.toString()) );
}
}
}