实战数据:
预期结果:
测试数据:
002|2014-09-10 00-09|东油大学
002|2014-09-10 09-17|学苑小区
001|2014-09-12 00-09|东油大学
001|2014-09-12 09-17|新玛特
002|2014-09-13 00-09|东油大学
002|2014-09-13 09-17|新玛特
003|2014-09-14 00-09|东油大学
003|2014-09-14 09-17|新玛特
003|2014-09-14 17-27|农垦大学
001|2014-10-10 00-09|东油大学
001|2014-10-10 09-17|学苑小区
Pig代码:
--加载数据 data = load '/user/hadoop/telecom/telecomdata' USING PigStorage('|') AS (imsi:chararray,time:chararray,loc : chararray); --转换格式
REGISTER /usr/local/pig-0.13.0/contrib/piggybank/java/piggybank.jar ;
REGISTER /usr/local/pig-0.13.0/contrib/piggybank/java/lib/joda-time-2.1.jar ;
DEFINE CustomFormatToISO org.apache.pig.piggybank.evaluation.datetime.convert.CustomFormatToISO();
toISO = FOREACH data GENERATE imsi, CustomFormatToISO(SUBSTRING(time,0,13), 'YYYY-MM-dd HH') AS time:chararray,loc; --按照用户imsi分组
grp = group toISO by imsi;
describe grp; --调用Datafu的MarkovPairs把连续位置放在同一行
REGISTER /usr/local/pig-0.13.0/contrib/piggybank/java/lib/datafu-1.0.0.jar;
define MarkovPairs datafu.pig.stats.MarkovPairs();
pairs = Foreach grp
{
sorted = ORDER toISO BY time;
pair = MarkovPairs(sorted);
generate Flatten(pair) as ( data:tuple(imsi,time,loc),next:tuple(imsi,time,loc));
}
describe pairs; --展开数据
prj = foreach pairs generate data.imsi as imsi, data.time as time ,next,time as next_time , data.loc as loc ,next.loc as next_loc; --过滤不在同一天的数据
DEFINE ISODaysBetween org.apache.pig.piggybank.evaluation.datetime.diff.ISODaysBetween();
flt = filter prj by ISODaysBetween(next_time, time)==0L;
describe flt; --分组计数
total_count = foreach (group flt by loc) generate group as loc,count(flt) as total;
describe total_count; --计算连续位置对(pair)的数目
pairs_count = foreach (group flt by (loc, next_loc))
generate flatten (group) as (loc,next_loc),count(flt) as cnt;
describe pairs_count; --表连接
jnd=join pairs_count by loc ,total_count by loc using 'replicated';
describe jnd; --计算相对频数,并保留前三
prob = foreach jnd generate pairs_count::loc as loc,pairs_count::next_loc as next_loc,(double)cnt / (double)total as probability;
describe prob; top3 = foreach (group prob by loc)
{
sorted = order prob by probability desc;
top = limit sorted 3;
generate flatten (top);
} describe top3;
--输出数据
store top3 into 'output';
代码说明:
1.piggybank
api:http://pig.apache.org/docs/r0.13.0/api/
2.datafu
api:http://datafu.incubator.apache.org/docs/datafu/1.0.0/
下载地址:http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.linkedin.datafu%22