weather案例, 简单分析每年的前三个月的最高温即可, 使用自定义的分组和排序
设计分析
- 设定多个reduce
- 每年的数据都很多,如果按照默认情况处理,统计性能是非常慢(因为默认只有一个reduce),所以我们需要重新分配reduceTask,将一年的数据交给一个reduceTask处理,
- 分区
- 那个数据交给哪个reduceTask处理是有Patitioner决定(patition对每个map输出的数据分配置一个分区号这个分区号决定map输出数据送到那个reudeTask),
- 自定义分区
- 由于我们是将一年的数据交给一个reduce处理,但是默认分区是按照key.hashCode()的值 模 reduceTask数量得到分区号,所以我们需要重写分区,
-
自定义排序
- 由于我们是要每月最该的三个温度,所以需要对温度进行排序,所以在洗牌(shuffler)过程中自定义sort,
-
自定义分组
- 分组的目的:是按照实际的需求,将数据分成一个个组, 传给reduceTask,我们的需求是统计每年每月温度最高的三个,如果一组数据就是这一年的数据,我们对着一年的数据进行统计,是很复杂的,如果我们将每月的数据分成一个组,这样就会方便多了, 默认的分组是按照key是否相同进行分组,所以我们要自定义分组
- 自定义key
- 默认的partition是根据key的hashcode模reduceTask数量,得到分区号
- 默认的排序是根据key的字典排序
- 默认的分组是根据key相同,进行比较进行分组
- 这几个都与key与联系, 所以我们需要影响这些步骤的因素添加到key中,
- 根据上面分析,partition与年有关,sort与温度有关,分组和月份有关
- 总结:所以key中需要包含year, month, T
1, MyKey,
因为对温度进行分组, 排序, pardition操作, 所以默认的字典顺序不能满足需求
*** 自定义的key中的数据, 必须在构造中进行初始化, 否则报 NullpointException
package com.wenbronk.weather; import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /**
* 自定义key, 对key进行分组
* 实现writableComparble方法, 可序列化并比较是否同一个对象
* @author root
*
*/
public class MyKey implements WritableComparable<MyKey> { private int year;
private int month;
private double hot; public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public double getHot() {
return hot;
}
public void setHot(double hot) {
this.hot = hot;
} /**
* 反序列化
*/
@Override
public void readFields(DataInput arg0) throws IOException {
this.year = arg0.readInt();
this.month = arg0.readInt();
this.hot = arg0.readDouble();
} /**
* 序列化
*/
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeInt(year);
arg0.writeInt(month);
arg0.writeDouble(hot);
} /**
* 比较, 判断是否同一个对象, 当对象作为key时
*/
@Override
public int compareTo(MyKey o) {
int c1 = Integer.compare(this.year, o.getYear());
if (c1 == ) {
int c2 = Integer.compare(this.month, o.getMonth());
if (c2 == ) {
return Double.compare(this.hot, o.getHot());
}
}
return ;
} }
2, sort
package com.wenbronk.weather; import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator; /**
* 自定义排序
* @author root
*/
public class MySort extends WritableComparator { /**
* 在构造方法中, 通过调用父类构造创建MyKey
* MyKey.class : 比较的对象
* true : 创建这个对象
*/
public MySort() {
super(MyKey.class, true);
} /**
* 自定义排序方法
* 传入的比较对象为 map 输出的key
*
* 年相同比较月, 月相同, 温度降序
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
MyKey key1 = (MyKey) a;
MyKey key2 = (MyKey) b; int r1 = Integer.compare(key1.getYear(), key2.getYear());
if (r1 == ) {
int r2 = Integer.compare(key1.getMonth(), key2.getMonth()); if (r2 == ) {
// 温度降序
return - Double.compare(key1.getHot(), key2.getHot());
}else {
return r2;
}
}
return r1;
} }
3, group
package com.wenbronk.weather; import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator; /**
* 自定义分组
* @author root
*
*/
public class MyGroup extends WritableComparator { public MyGroup() {
super(MyKey.class, true);
} /**
* 年, 月相同, 则为一组
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
MyKey key1 = (MyKey) a;
MyKey key2 = (MyKey) b; int r1 = Integer.compare(key1.getYear(), key2.getYear());
if (r1 == ) {
return Integer.compare(key1.getMonth(), key2.getMonth());
}
return r1;
} }
4, parditon
package com.wenbronk.weather; import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /**
* 自定义partition, 保证一年一个reducer进行处理
* 从map接收值
* @author root
*
*/
public class MyPartition extends HashPartitioner<MyKey, DoubleWritable> { /**
* maptask每输出一个数据, 调用一次此方法
* 执行时间越短越好
* 年的数量是确定的, 可以传递reduceTask数量, 在配置文件可设置, 在程序执行时也可设置
*
*/
@Override
public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
// 减去最小的, 更精确
return (key.getYear() - ) % numReduceTasks;
} }
5, 执行类
package com.wenbronk.weather; import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**
* 执行mapreduce 统计每年温度的前三个
*
* @author wenbronk
*
*/
public class RunMapReduce { public static void main(String[] args) throws Exception {
// 初始化时加载src或classpath下所有的配置文件
Configuration configuration = new Configuration(); // 本地执行
configuration.set("fs.default", "hdfs://wenbronk.hdfs.com:8020 ");
configuration.set("yarn.resourcemanager", "hdfs://192.168.208.106"); // 服务器执行
// configuration.set("mapred.jar", "C:/Users/wenbr/Desktop/weather.jar");
// configuration.set("mapred.jar", "E:\\sxt\\target\\weather.jar");
// configuration.set("mapreduce.app-submission.cross-platform", "true");
//
// configuration.set("mapreduce.framework.name", "yarn");
// configuration.set("yarn.resourcemanager.address", "192.168.208.106:"+8030);
// configuration.set("yarn.resourcemanager.scheduler.address", "192.168.208.106:"+8032); // 得到执行的任务
Job job = Job.getInstance();
// 入口类
job.setJarByClass(RunMapReduce.class); // job名字
job.setJobName("weather"); // job执行是map执行的类
job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReduce.class);
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(DoubleWritable.class); // 使用自定义的排序, 分组
job.setPartitionerClass(MyPartition.class);
job.setSortComparatorClass(MySort.class);
job.setGroupingComparatorClass(MyGroup.class);
// job.setJar("E:\\sxt\\target\\weather.jar"); //设置 分区数量
job.setNumReduceTasks(); // **** 使用插件上传data.txt到hdfs/root/usr/data.txt //****使得左边为key, 右边为value, 此类默认为 "\t" 可以自定义
// 或者 config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
job.setInputFormatClass(KeyValueTextInputFormat.class); // 使用文件
FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\weather.txt"));
// FileInputFormat.addInputPath(job, new Path("/root/usr/weather.txt")); // 使用一个不存在的目录进行
Path path = new Path("/root/usr/weather");
// 如果存在删除
FileSystem fs = FileSystem.get(configuration);
if (fs.exists(path)) {
fs.delete(path, true);
} // 输出
FileOutputFormat.setOutputPath(job, path); boolean forCompletion = job.waitForCompletion(true); if (forCompletion) {
System.out.println("success");
}
} /**
* key: 将 LongWritalbe 改成 Text类型的
*
* 将输入更改为需要的 key, value, mapper所做的事情
*
* @author wenbronk
*/
static class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable> {
/**
* 转换字符串为日期对象
*/
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /**
* 将键值取出来, 封装为key 每行第一个分隔符"\t"左侧为key, 右侧有value, 传递过来的数据已经切割好了
*/
@Override
protected void map(Text key, Text value, Mapper<Text, Text, MyKey, DoubleWritable>.Context context)
throws IOException, InterruptedException {
try {
Date date = formatter.parse(key.toString());
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH); double hot = Double.parseDouble(value.toString().substring(, value.toString().lastIndexOf("c"))); MyKey mykey = new MyKey();
mykey.setYear(year);
mykey.setMonth(month);
mykey.setHot(hot); context.write(mykey, new DoubleWritable(hot));
} catch (ParseException e) {
e.printStackTrace();
}
}
} /**
* 经过partition, 分组, 排序, 传递数据给reducer 需要自定义partition, 保证一年一个reduce 自定义排序,
* 保证按照年, 月, 温度 自定义分组, 年月相同, 一个组
* 传进来的温度, 为已经排好序的
* @author root
*/
static class WeatherReduce extends Reducer<MyKey, DoubleWritable, Text, NullWritable> {
NullWritable nullWritable = NullWritable.get();
@Override
protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,
Reducer<MyKey, DoubleWritable, Text, NullWritable>.Context arg2)
throws IOException, InterruptedException { int i = ;
for (DoubleWritable doubleWritable : arg1) {
i++;
String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + doubleWritable.get();
// key中已经包含需要的结果了
arg2.write(new Text(msg), NullWritable.get());
// 每个月的前三个
if (i == ) {
break;
}
} }
} }
初始文档
-- :: 34c
-- :: 36c
-- :: 32c
-- :: 37c
-- :: 23c
-- :: 41c
-- :: 27c
-- :: 45c
-- :: 46c
-- :: 47c
系列来自尚学堂视频
https://blog.csdn.net/wuxintdrh/article/details/54917232