在这之前要先说一下WritableComparable接口。Writable接口大家可能都知道,它是一个实现了序列化协议的序列化对象。在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。那WritableComparable接口是可序列化并且可比较的接口。MapReduce中所有的key值类型都必须实现这个接口,既然是可序列化的那就必须得实现readFiels()和write()这两个序列化和反序列化函数,既然也是可比较的那就必须得实现compareTo()函数,该函数即是比较和排序规则的实现。这样MR中的key值就既能可序列化又是可比较的。下面几符图是API中对WritableComparable接口的解释及其方法,还有一个实现了该接口的对象的列子:
图 一 WritableComparable 接口解释
图 二 WritableComparable 接口方法
图 三 自定义对象实现WritableComparable 接口例子
从图三可以看到自定义的对象实现WritableComparable接口除了实现readFields和write方法之外,还必须得实现compareTo()方法,这也是key值排序的关键,实现了改方法key值之间是可比较可排序的,所以也不用另外写排序函数了。Hadoop提供的数据类型如Text,IntWritable,LongWritable,DoubleWritable和FloatWritable等也都实现了WritableComparable接口。所以我们最开始写wordcount等例子,我们使用Longritable,Text等类型做key值并没有去实现compareTo函数,最后结果也是排序好的,就是因为Hadoop提供的数据类型已经实现了WC接口,已经实现了compareTo函数了。如果你有特殊要求,Text,IntWritable这些类型不能满足你对key值的要求,需要自己新建一个数据对象作为自己的key值类型,那就像上图 图三那样重写一个类实现WritableComparable接口,同时实现compareTo函数,函数内部实现你自己的排序规则,最后reduce的数据就会按key值排序了。
所以总结上面,hadoop会调用key值类型的compareTo函数按照该函数的要求对key值进行排序。所以你想对哪些列排序就要把哪些列并入到key值对象中,像二次排序那样,要对两列进行排序,两列值都要并入key中,则key成为包含两个属性的复合key,Hadoop 提供的key值可用的类型不能满足要求,那就重写一个对象实现WritableComparable接口(类图三),该对象包含连个属性,并实现compareTo函数,最后会根据key值对两列数据排序,从而实现二次排序。
下面是我自己写的一个
package sina.dip.logfilter.mr;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class ComplexKey implements WritableComparable<ComplexKey> {
private Text name;
private Text value;
private Text minf;
public ComplexKey() {
this.name = new Text();
this.value = new Text();
this.minf = new Text();
}
public ComplexKey(String name, String value, String minf) {
this.name = new Text(name);
this.value = new Text(value);
this.minf = new Text(minf);
}
public Text getName() {
return name;
}
public void setName(Text name) {
this.name = name;
}
public Text getValue() {
return value;
}
public void setValue(Text value) {
this.value = value;
}
public Text getMinf() {
return minf;
}
public void setMinf(Text minf) {
this.minf = minf;
}
@Override
public int compareTo(ComplexKey c) {
int compare = 0;
compare = name.compareTo(c.name);
if (compare != 0) {
return compare;
}
compare = value.compareTo(c.value);
if (compare != 0) {
return compare;
}
compare = minf.compareTo(c.minf);
if (compare != 0) {
return compare;
}
return 0;
}
@Override
public void readFields(DataInput in) throws IOException {
name.readFields(in);
value.readFields(in);
minf.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
name.write(out);
value.write(out);
minf.write(out);
}
}