map/reduce实现数据去重

时间:2023-03-09 08:32:25
map/reduce实现数据去重
 import java.io.IOException;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Dedpu extends Configured implements Tool {
/**
* 数据去重
* 数据样例:
* 输入数据
* 2006-6-9 a
* 2006-6-10 b
* 2006-6-9 a
* 结果数据
* 2006-6-9 a
* 2006-6-10 b
* 设计思路:
* Map阶段 <时间,字符>
* Reduce阶段输入<时间,list<字符>>,去除重复的字符,输出
*
* **/
public static class Map extends Mapper<LongWritable,Text,Text,Text>{
public void map(LongWritable key,Text value,Context context)throws IOException, InterruptedException{
String line=value.toString();
Text myvalue=new Text("");
context.write(new Text(line), myvalue);
// StringTokenizer tokenizer=new StringTokenizer(line);
// String datestr="",datastr="";
// while(tokenizer.hasMoreTokens())
// {
// datestr=tokenizer.nextToken();
// datastr=tokenizer.nextToken();
// context.write(new Text(datestr), new Text(datastr));
//
// }
} } public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key,Iterable<Text>values,Context context)throws IOException,InterruptedException{ context.write(key, new Text(""));
// ArrayList arr=new ArrayList();
// Text mykey=key;
// for(Text txt:values)
// {
//
// if(!arr.contains(txt.toString())){
// arr.add(txt.toString());
// }
//
//
// }
// for(int i=0;i<arr.size();i++){
// context.write(mykey, new Text(arr.get(i).toString()));
//
// } } } public int run(String[] args)throws Exception
{
Configuration conf=new Configuration();
Job job=new Job(conf,"Data Depution");
job.setJarByClass(Dedpu.class); job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success=job.waitForCompletion(true);
return success?0:1; } public static void main(String[] args) throws Exception{
int ret=ToolRunner.run(new Dedpu(), args);
System.exit(ret);
}
}

相关文章