MapReduce的自定义排序、分区和分组

时间:2021-03-16 15:12:00

自定义排序(WritableComparable)

当写mr程序来处理文本时,经常会将处理后的信息封装到我们自定义的bean中,并将bean作为map输出的key来传输

而mr程序会在处理数据的过程中(传输到reduce之前)对数据排序(如:map端生成的文件中的内容分区且区内有序)。

操作:

自定义bean来封装处理后的信息,可以自定义排序规则用bean中的某几个属性来作为排序的依据

代码节段:

自定义的bean实现WritableComparable接口,并对compareTo的实现

 //先按照年龄排序,再按性别(年龄小,sex大的在前)

@Override
public int compareTo(Person o) {
  if(o.age==this.age){
    if(o.sex==this.sex){
      return 0;
    }else{
      return o.sex-this.sex;
    }
  }else{
  return this.age-o.age;
  }
}

注意: 1.hadoop开发了一套自己的序列化和反序列化策略(Writable),因为map端的文件要下载到reduce端的话如果不在同一台节点上是会走网络进行传输(hadoop-rpc),所以对象需要序列化。

    2.如果空构造函数被覆盖,一定要显示的定义一下,否则反序列化时会抛异常。

自定义分区(Partitioner)

Mapreduce中会将maptask输出的kv对,默认(HashPartitioner)根据key的hashcode%reducetask数来分区。如果要按照我们自己的需求进行分组,则需要改写数据分发组件Partitioner继承抽象类:Partitioner。 

操作:

在job对象中,设job.setPartitionerClass(自定义分区类.class)

节选代码:

public class CustomPartitioner extends Partitioner<Text,Text>{

  /*
  * numPartitions其实我们可以设置,在job.setNumReduceTasks(n)设置。
  * 假如job.setNumReduceTasks(5),那么这里的numPartitions=5,那么默认的HashPartitioner的机制就是用key的hashcode%numPartitions来决定分区属于哪个分区,所以分区数量就等于我们设置的reduce数量5个。
  */
  @Override
  public int getPartition(Text key, Text value, int numPartitions) {
    Integer hash = numMap.get(key.toString().substring(0, 1));
    //将没有匹配到的数据放入3号分区
    return hash==null?3:hash;
  }
}

自定义分组(GroupingComparator)

假设把bean作为key发送给reduce,而在reduce端我们希望将年龄相同的kv聚合成组,那么就可以如下方式实现。

1.自定义分组要继承WritableComparator,然后重写compare方法。

2.定义完成后要设置job.setGroupingComparatorClass(CustomGroupingComparator.class);
代码节选

public class CustomGroupingComparator extends WritableComparator{
  protected CustomGroupingComparator() {
    super(Person.class, true);
  }
  @Override
  public int compare(WritableComparable a, WritableComparable b) {
    Bean abean = (Bean) a;
    Bean bbean = (Bean) b;
    //将item_id相同的bean都视为相同,从而聚合为一组
    return abean.getAge()-bbean.getAge();
  }
}

 

自定义分组排序(SortGroupingComparator)尽快补充上

 

 

自定义分组排序(SortComparator)尽快补充上

 每个分区内都会调用job.setSortComparatorClass()设置的key比较函数类排序;

如果没有通过job.setSortComparatorClass()设置key比较函数类,则使用key的实现的compareTo方法