当mapreduce没有自定义分组时,map中所有的key被分为一组,其分组操作默认是走的HashPartitioner:
public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }默认numReduceTasks为1,上述程序对于任何key(map输出的key)都是返回0,因此所有的key被分到一个组。但是在某些业务场景中,我们可能需要对key进行分组。分组的实现包括两步:
1.编写partition组件
如HashPartitioner,自定义的分组类应当继承抽象类Partitioner,并实现其抽象方法getPartition。例如,对手机号进行分组(map中key的值封装的是手机号):
import java.util.HashMap; import org.apache.hadoop.mapreduce.Partitioner; /** * 怎么使得这个partition组件生效 * @author ASUS * * @param <KEY> * @param <VALUE> */ public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{ private static HashMap<String,Integer> areaMap=new HashMap<String,Integer>(); static{ areaMap.put("135", 0); areaMap.put("136", 1); areaMap.put("137", 2); areaMap.put("138", 3); areaMap.put("139", 4); } public int getPartition(KEY key, VALUE value, int numPartitions) { // 从key中拿到手机号,查询手机号归属地词典,不同省份返回不同的归属地号 int areaCode=areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3)); return areaCode; } }分组后mapreduce的处理结果就会根据不同的分组给出,一个分组一个结果文件。
2.注册自定义的partition组件
在启动hadoop的main方法中进行注册:
//设置自定义分组逻辑 job.setPartitionerClass(AreaPartitioner.class); //设置reducer数量 应当和AreaPartitioner设置的分组数目一致,,或者多于。少于的时候会报错 job.setNumReduceTasks(5);
这里需要注意的是numReduceTasks数目应当和分组数目一致。