MapReduce——计算温度最大值 (基于全新2.2.0API)

时间:2023-02-06 07:48:27

MapReduce——计算温度最大值 (基于全新2.2.0API)


deprecated: Job类的所有Constructors, 新的API用静态方法getInstance(conf)来去的Job的实例;

Code:

 import java.io.IOException;
 import java.util.Iterator;

 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 public class getMaxTemperature  extends Configured implements Tool {

   class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
     @Override
     public void map(LongWritable key, Text val, Context context) throws IOException, InterruptedException {
       String line = val.toString();
       String year = line.substring(15, 19);

       int Temperature;
       if (!hasPlus(line)){
         Temperature = Integer.parseInt(line.substring(87, 92));
       } else {
         Temperature = Integer.parseInt(line.substring(88,92));
       }
       String qual = line.substring(92, 93);
       if(!matched(qual)) {
         context.write(new Text(year), new IntWritable(Temperature));
    }

     }

     private boolean hasPlus(String line) {
       return line.charAt(87) == '+' ?  true : false;
     }

     private boolean matched(String line) {
         return line.matches("[01459") ? true : false;
     }

   }

   class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
     @Override
     public void reduce(Text key, Iterable<IntWritable> vals, Context context) throws IOException, InterruptedException {
       int maxValue = Integer.MIN_VALUE;
       for( IntWritable value : vals ) {
         maxValue = Math.max(maxValue, value.get());
       }
       context.write(key, new IntWritable(maxValue));
     }
   }

     @Override
     public int run(String[] args) throws Exception {
       Configuration conf = getConf();
       Job job = Job.getInstance(conf);
       job.setJobName("helloRuby");
       job.setJarByClass(getClass());
       FileInputFormat.addInputPath(job, new Path(args[0]));
       FileOutputFormat.setOutputPath(job, new Path(args[1]));

       job.setMapperClass(MaxTemperatureMapper.class);
       job.setCombinerClass(MaxTemperatureReducer.class);
       job.setReducerClass(MaxTemperatureReducer.class);

       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(IntWritable.class);

       return job.waitForCompletion(true) ? 0 : 1;
     }

   public static void main(String[] args) throws Exception {
     ToolRunner.run(new getMaxTemperature() , args);
   }
 }