
案例描述
找出每个月气温最高的2天
数据集
-- :: 34c -- :: 38c -- :: 36c -- :: 32c -- :: 37c -- :: 23c -- :: 41c -- :: 27c -- :: 45c -- :: 46c -- :: 47c
代码
MyTQ.class
package com.hadoop.mr.tq; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 客户端 * @author Lindsey * */ public class MyTQ { public static void main(String args []) throws Exception{ //加载配置文件 Configuration conf = new Configuration(true); //创建客户端 Job job = Job.getInstance(conf); job.setJarByClass(MyTQ.class); //Map配置 job.setMapperClass(TMapper.class); job.setMapOutputKeyClass(Tq.class); job.setMapOutputValueClass(IntWritable.class); //分区类:处理大数据量均衡并发处理 job.setPartitionerClass(TPartitioner.class); //比较类:用buffer字节数组内的key排序 job.setSortComparatorClass(TSortComparator.class); //Reduce配置 job.setNumReduceTasks(2); job.setReducerClass(TReducer.class); //分组比较类:年月相同为一组 job.setGroupingComparatorClass(TGroupingComparator.class); //输入输出源 Path input = new Path("/user/hadoop/input/weather.txt"); FileInputFormat.addInputPath(job, input); Path output = new Path("/user/hadoop/output/weather"); if(output.getFileSystem(conf).exists(output)){ output.getFileSystem(conf).delete(output,true); } FileOutputFormat.setOutputPath(job, output); //提交 job.waitForCompletion(true); } }
TMapper.class
package com.hadoop.mr.tq; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; public class TMapper extends Mapper<LongWritable, Text, Tq,IntWritable>{ /* * k-v 映射 * K(Tq) V(IntWritable) * 1949-10-01 14:21:02 34c * */ Tq mkey = new Tq(); IntWritable mval =new IntWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { try { //字符串分割 String [] strs = StringUtils.split(value.toString(),'\t'); //设置时间格式 注意月份是大写! SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); //解析为Date格式 Date date = sdf.parse(strs[0]); //日历上设置时间 Calendar cal = Calendar.getInstance(); cal.setTime(date); //Key mkey.setYear(cal.get(Calendar.YEAR)); mkey.setMonth(cal.get(Calendar.MONTH)+1); mkey.setDay(cal.get(Calendar.DAY_OF_MONTH)); int temperture = Integer.parseInt(strs[1].substring(0,strs[1].length()-1)); mkey.setTemperature(temperture); //value mval.set(temperture); //输出 context.write(mkey, mval); } catch (ParseException e) { e.printStackTrace(); } } }
Tq.class
package com.hadoop.mr.tq; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Tq implements WritableComparable<Tq>{ private int year; private int month; private int day; private int temperature; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getTemperature() { return temperature; } public void setTemperature(int temperature) { this.temperature = temperature; } @Override public void readFields(DataInput in) throws IOException { this.year=in.readInt(); this.month=in.readInt(); this.day=in.readInt(); this.temperature=in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(day); out.writeInt(temperature); } @Override public int compareTo(Tq that) { //约定:日期正序 int y = Integer.compare(this.year,that.getYear()); if(y == 0){ //年份相同 int m = Integer.compare(this.month,that.getMonth()); if(m == 0){ //月份相同 return Integer.compare(this.day,that.getDay()); } return m; } return y; } }
TPartitioner.class
package com.hadoop.mr.tq; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * 分区规则设计 使数据分区均衡避免倾斜 * @author Lindsey * */ public class TPartitioner extends Partitioner<Tq,IntWritable>{ @Override public int getPartition(Tq key, IntWritable value, int numPartitions) { return key.getYear() % numPartitions; } }
TSortComparator.class
package com.hadoop.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TSortComparator extends WritableComparator{ //对字节数据中map排序 需要先将Key反序列化为对象再比较 public TSortComparator(){ super(Tq.class,true); //true是将Tq实例化 } /* 时间正序 、温度倒序 */ @Override public int compare(WritableComparable a, WritableComparable b) { Tq t1 = (Tq) a; Tq t2 = (Tq) b; int y = Integer.compare(t1.getYear(),t2.getYear()); if(y == 0){ int m = Integer.compare(t1.getMonth(),t2.getMonth()); if(m == 0){ //加上负号实现倒序 return -Integer.compare(t1.getTemperature(),t2.getTemperature()); } return m; } return y; } }
TReducer.class
package com.hadoop.mr.tq; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.shaded.org.glassfish.grizzly.compression.lzma.impl.lz.InWindow; public class TReducer extends Reducer<Tq, IntWritable, Text,IntWritable>{ Text rkey = new Text(); IntWritable rval = new IntWritable(); /* * 相同的Key为一组:Tq */ @Override protected void reduce(Tq key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int flg = 0; //标志,表示是否已经取了当天的天气 int day = 0; for(IntWritable v:values){ if(flg == 0){ day = key.getDay(); //设置文本内容 yyyy-mm-dd:temperture rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getTemperature()); flg++; context.write(rkey, rval); } if(flg!=0 && day!=key.getDay()){ rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getTemperature()); context.write(rkey, rval); break; } } } }
TGroupingComparator.class
package com.hadoop.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TGroupingComparator extends WritableComparator{ public TGroupingComparator() { super(Tq.class,true); } /* * 面向Reduce * 年月相同为一组 返回0表示为同一组 */ @Override public int compare(WritableComparable a, WritableComparable b) { Tq t1 = (Tq) a; Tq t2 = (Tq) b; int y = Integer.compare(t1.getYear(),t2.getYear()); if(y == 0){ return Integer.compare(t1.getMonth(),t2.getMonth()); } return y; } }
运行结果
part-r-00000
part-r-00001