MapReduce自定义分组实现

时间:2023-01-27 09:34:27

当mapreduce没有自定义分组时,map中所有的key被分为一组,其分组操作默认是走的HashPartitioner:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}
默认numReduceTasks为1,上述程序对于任何key(map输出的key)都是返回0,因此所有的key被分到一个组。但是在某些业务场景中,我们可能需要对key进行分组。分组的实现包括两步:

1.编写partition组件

如HashPartitioner,自定义的分组类应当继承抽象类Partitioner,并实现其抽象方法getPartition。例如,对手机号进行分组(map中key的值封装的是手机号):

import java.util.HashMap;

import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 怎么使得这个partition组件生效
 * @author ASUS
 *
 * @param <KEY>
 * @param <VALUE>
 */
public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{
	private static HashMap<String,Integer> areaMap=new HashMap<String,Integer>();
	static{
		areaMap.put("135", 0);
		areaMap.put("136", 1);
		areaMap.put("137", 2);
		areaMap.put("138", 3);
		areaMap.put("139", 4);
	}
	
	public int getPartition(KEY key, VALUE value, int numPartitions) {
		// 从key中拿到手机号,查询手机号归属地词典,不同省份返回不同的归属地号
		int areaCode=areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));
		return areaCode;
	}

}
分组后mapreduce的处理结果就会根据不同的分组给出,一个分组一个结果文件。

2.注册自定义的partition组件

在启动hadoop的main方法中进行注册:

	//设置自定义分组逻辑
	job.setPartitionerClass(AreaPartitioner.class);
	//设置reducer数量 应当和AreaPartitioner设置的分组数目一致,,或者多于。少于的时候会报错
	job.setNumReduceTasks(5);

这里需要注意的是numReduceTasks数目应当和分组数目一致。