hadoop 最大值最小值
1、数据准备
[root@x00 hd]# cat eightteen_a.txt
102
10
39
109
200
11
3
90
28
[root@x00 hd]# cat eightteen_b.txt
5
2
30
838
10005
结果预测
Max 10005
Min 2
public class MaxMinMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private Text textkey = new Text();
@Override
protected void map(LongWritable key, Text value,
Context context)throws IOException, InterruptedException {
String line = value.toString();
if(line.trim().length()>0){
context.write(new Text("textkey"), new LongWritable(Integer.valueOf(line.trim())));
}
}
}
public class MaxMinReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
Long max = Long.MIN_VALUE;//设置最大值
Long min = Long.MAX_VALUE;//设置最小值
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context context)
throws IOException, InterruptedException {
for (LongWritable val : values) {
if(val.get()>max){
max = val.get();
}
if(val.get()<min){
min = val.get();
}
}
context.write(new Text("max"), new LongWritable(max));
context.write(new Text("min"), new LongWritable(min));
}
}
package com.hadoop.five;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobMain {
public static void main(String[] args)throws Exception {
Configuration configuration = new Configuration();
Job job = new Job(configuration,"max_min_job");
job.setJarByClass(JobMain.class);
job.setMapperClass(MaxMinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MaxMinReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
System.exit(job.waitForCompletion(true)?0:1);
}
}