UDAF: 多进一出
GenericUDAFEvaluator : 就是根据job的不同阶段执行不同的方法
Hive通过GenericUDAFEvaluator.Modle来确定job的执行阶段
PARTIAL1: 从原始数据到部分聚合,调用方法iterate和terminatePartial方法
PARTIAL2: 从部分数据聚合到部分数据聚合,会调用merge和terminatePartial
FINAL: 从部分数据聚合到全部数据聚合,会调用merge和terminate
COMPLETE: 从原始数据全部聚合,会调用方法iterate和terminate
除了上面提到的iterate,merge,terminatePartial以外,还有init(初始化并返回,返回值的类型)
getNewAggregationBuffer(获取新的buffer,也就是方法间传递参数的对象),reset(重置buffer对象)
需求: 实现一个自定义的sum函数,要求韩函数支持整型和浮点型的sum操作
简单示例,重写SUM函数
package com.hive.udaf; import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveWritableObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.io.LongWritable; /**
* @author liuwl
* mysum support float & double
*/
public class mysum extends AbstractGenericUDAFResolver{ public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info) throws SemanticException { // parameters is all clomuns
if(info.isAllColumns()){
throw new SemanticException("this function is not support all parameters");
}
// only one clomun parameter
ObjectInspector[] inspectors = info.getParameterObjectInspectors();
if(inspectors.length != 1){
throw new SemanticException("the parameters is only one clomun");
}
if(inspectors[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
throw new SemanticException("the parameters must be Basic data types");
}
// input parameter's Category
AbstractPrimitiveWritableObjectInspector woi = (AbstractPrimitiveWritableObjectInspector)inspectors[0];
switch (woi.getPrimitiveCategory()) {
case INT:
case LONG:
case BYTE:
case SHORT:
return new udafLong();
case FLOAT:
case DOUBLE:
return new udafDouble();
default:
throw new SemanticException("the parameter's Category is not support");
}
} /**
* sum the long data
*/
public static class udafLong extends GenericUDAFEvaluator{ // define data Category
public PrimitiveObjectInspector longInputor; static class sumlongagg implements AggregationBuffer{
long sum;
boolean empty;
} @Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters);
if(parameters.length!=1){
throw new UDFArgumentException("Argument Exception");
}
if(this.longInputor == null){
this.longInputor=(PrimitiveObjectInspector)parameters[0];
}
return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
} @Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException { sumlongagg slg = new sumlongagg();
this.reset(slg);
return slg;
} @Override
public void reset(AggregationBuffer agg) throws HiveException { sumlongagg slg = (sumlongagg)agg;
slg.sum = 0;
slg.empty = true;
} @Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { if(parameters.length !=1 ){
throw new UDFArgumentException("Argument Exception");
}
this.merge(agg, parameters[0]);
} @Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return this.terminate(agg);
} @Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException { sumlongagg slg = (sumlongagg)agg;
if(partial != null){
slg.sum += PrimitiveObjectInspectorUtils.getLong(partial, longInputor);
slg.empty = false;
}
} @Override
public Object terminate(AggregationBuffer agg) throws HiveException { sumlongagg slg = (sumlongagg)agg;
if(slg.empty){
return null;
}
return new LongWritable(slg.sum);
} } /**
* sum the double data
*/
public static class udafDouble extends GenericUDAFEvaluator{ // define data Category
public PrimitiveObjectInspector doubleInputor; static class sumdoubleagg implements AggregationBuffer{
double sum;
boolean empty;
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException { super.init(m, parameters);
if(parameters.length!=1){
throw new UDFArgumentException("Argument Exception");
}
if(this.doubleInputor == null){
this.doubleInputor=(PrimitiveObjectInspector)parameters[0];
}
return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; } @Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException { sumdoubleagg sdg = new sumdoubleagg();
this.reset(sdg);
return sdg;
} @Override
public void reset(AggregationBuffer agg) throws HiveException { sumdoubleagg sdg = (sumdoubleagg)agg;
sdg.sum = 0;
sdg.empty = true;
} @Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException { if(parameters.length !=1 ){
throw new UDFArgumentException("Argument Exception");
}
this.merge(agg, parameters[0]);
} @Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return this.terminate(agg);
} @Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException { sumdoubleagg sdg = (sumdoubleagg)agg;
if(partial != null){
sdg.sum += PrimitiveObjectInspectorUtils.getDouble(partial, doubleInputor);
sdg.empty = false;
}
} @Override
public Object terminate(AggregationBuffer agg) throws HiveException { sumdoubleagg sdg = (sumdoubleagg)agg;
if(sdg.empty){
return null;
}
return new DoubleWritable(sdg.sum);
}
}
}
测试
hive (workdb)> add jar /home/liuwl/opt/datas/mysum.jar;
hive (workdb)> create temporary function mysum as 'com.hive.udaf.mysum';
hive (workdb)> select sum(deptno),mysum(deptno) from emp;
结果: _c0 _c1
310 310