函数如何使用:
hive> desc concat_test;
OK
a int
b string
hive> select * from concat_test;
OK
1 good
2 other
1 nice
1 hello
hive> select a,concat(b,',') from concat_test group by a;
OK
1 good,nice,hello
2 other
函数实现:
package com.hadoopbook.hive;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
public class concat extends UDAF {
public static class ConcatUDAFEvaluator implements UDAFEvaluator{
public static class PartialResult{
String result;
String delimiter;
}
private PartialResult partial;
public void init() {
partial = null;
}
public boolean iterate(String value,String deli){
if (value == null){
return true;
}
if (partial == null){
partial = new PartialResult();
partial.result = new String("");
if( deli == null || deli.equals("") )
{
partial.delimiter = new String(",");
}
else
{
partial.delimiter = new String(deli);
}
}
if ( partial.result.length() > 0 )
{
partial.result = partial.result.concat(partial.delimiter);
}
partial.result = partial.result.concat(value);
return true;
}
public PartialResult terminatePartial(){
return partial;
}
public boolean merge(PartialResult other){
if (other == null){
return true;
}
if (partial == null){
partial = new PartialResult();
partial.result = new String(other.result);
partial.delimiter = new String(other.delimiter);
}
else
{
if ( partial.result.length() > 0 )
{
partial.result = partial.result.concat(partial.delimiter);
}
partial.result = partial.result.concat(other.result);
}
return true;
}
public String terminate(){
return new String(partial.result);
}
}
}
关于UDAF开发注意点:
1.需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,这两个包都是必须的
2.函数类需要继承UDAF类,内部类Evaluator实现UDAFEvaluator接口
3.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数
1)init函数类似于构造函数,用于UDAF的初始化
2)iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean
3)terminatePartial无参数,其为iterate函数轮转结束后,返回乱转数据,iterate和terminatePartial类似于hadoop的Combiner
4)merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean
5)terminate返回最终的聚集函数结果