今天某位仁兄给了一道Hive的题目
hive里有个表存储了 (用户ID) (点击时间) (点击网址) 输出 用户ID,点击顺序,from url ,to url。 其中点击顺序是每个id执行按时间排序后的顺序号,from url为上一次点击的网址,to url 为当次点击的网址。 顺序号为1的时候from url 为空就行了
1.实现基于纯Hive SQL的ETL过程
2.实现一个能加速上述处理过程的Hive Generic UDF,并给出使用此UDF实现ETL过程的Hive SQL
按照俺的理解,这个应该做UDAF跟UDTF结合比较容易
于是就着手写了一些代码
Part.1:
View Code
package Url; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator; /** * 针对同一用户的访问序列构造访问序列对 * @author juefan */ public class UDAFRank extends UDAF{ //构造对象存储用户的每一条数据 public static class State{ String url; Long time; public State(){ url = null; time = null; } } public static class UDAFRankEvaluator implements UDAFEvaluator{ public final List<State> stateList; public UDAFRankEvaluator(){ stateList = new ArrayList<State>(); } public void init(){ if(stateList != null){ stateList.clear(); } } //聚合用户的访问数据 public boolean iterate(String url, Long time){ if (url == null || time == null) { return false; } else { State tmpState = new State(); tmpState.url = url; tmpState.time = time; stateList.add(tmpState); tmpState = new State(); } return true; } public List<State> terminatePartial(){ return stateList; } //二次聚合同一用户的访问数据 public boolean merge(List<State> other){ if (stateList == null || other == null) { return false; } stateList.addAll(other); return true; } public String terminate(){ Statecomparator1 compare = new Statecomparator1(); StringBuilder resultBuilder = new StringBuilder(); Collections.sort(stateList, compare); int size = stateList.size(); //用户只有一个访问记录,不能构造序列对 resultBuilder.append("NULL").append("SEP_01").append(stateList.get(0).url).append("SEP_02"); //将用户序列对写进结果中 for(int i = 0; i < size - 1; i++){ resultBuilder.append(stateList.get(i).url).append("SEP_01").append(stateList.get(i + 1).url).append("SEP_02"); } return resultBuilder.toString(); } //对数组里面的用户访问数据按照访问时间从小到大排序 public class Statecomparator1 implements Comparator<Object>{ public int compare(Object o1, Object o2){ State s1 = (State)o1; State s2 = (State)o2; return s1.time > s2.time ? 1:0; } } } }
Part.2:
View Code
package Url; import java.util.ArrayList; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** * 拆分用户序列对 * @author juefan */ public class UDTFPair extends GenericUDTF{ @Override public void close() throws HiveException { } @Override public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException { if (args.length != 1) { throw new UDFArgumentLengthException("ExplodeMap takes only one argument"); } if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("ExplodeMap takes string as a parameter"); } ArrayList<String> fieldNames = new ArrayList<String>(); ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); fieldNames.add("from"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("to"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("rank"); fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] args) throws HiveException { String input = args[0].toString(); String[] test = input.split("SEP_02"); int size = test.length; for (int i = 0; i < size; i++) { try { String[] result = new String[3]; result[0] = test[i].split("SEP_01")[0]; result[1] = test[i].split("SEP_01")[1]; result[2] = Integer.toString(i + 1); forward(result); } catch (Exception e) { continue; } } } }
上面这种做法的好处就是运行速度较快
接下来按照题目的要求来写,就是纯Hive SQL的方式
代码如下:
create table dm_fan_test12 as select a.ucookie, a.ranks, b.currenturl as from_url, a.currenturl as to_url from( select a.ucookie, a.currenturl, sum(a.re) as ranks --计算出当前url在该用户的访问序列中的次序 from( select a.ucookie, a.currenturl, case when a.currenttime < b.currenttime then 0 else 1 end as re from dm_fan_test11 a join dm_fan_test11 b on a.ucookie = b.ucookie )a group by a.ucookie, a.currenturl )a left outer join( select a.ucookie, a.currenturl, sum(a.re) as rank --计算出当前url在该用户的访问序列中的次序 from( select a.ucookie, a.currenturl, case when a.currenttime < b.currenttime then 0 else 1 end as re from dm_fan_test11 a join dm_fan_test11 b on a.ucookie = b.ucookie )a group by a.ucookie, a.currenturl )b on a.ucookie = b.ucookie and (a.ranks - 1) = b.rank;
总结:
使用UDAF跟UDTF的方式最终计算只用到一个节点,运算时间是35秒,纯Hive SQL的也是只用到一个节点,运算时间是225秒,相差7倍,这就是自定义函数的魅力了,而且,纯Hive SQL还有一个问题就是当一个用户同一时间点访问的URL有多个的时候,数据会有所丢失,导致结果不正确!