项目源码:https://github.com/haha174/spark-session.git
/** * 第五步:遍历每天每小时的session,然后根据随机索引进行抽取 */
// 执行groupByKey算子,得到<dateHour,(session aggrInfo)>
JavaPairRDD<String, Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey();
// 我们用flatMap算子,遍历所有的<dateHour,(session aggrInfo)>格式的数据
// 然后呢,会遍历每天每小时的session
// 如果发现某个session恰巧在我们指定的这天这小时的随机抽取索引上
// 那么抽取该session,直接写入MySQL的random_extract_session表
// 将抽取出来的session id返回回来,形成一个新的JavaRDD<String>
// 然后最后一步,是用抽取出来的sessionid,去join它们的访问行为明细数据,写入session表
JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<String,Iterable<String>>, String, String> () {
private static final long serialVersionUID = 1L;
@Override
public Iterator<Tuple2<String, String>> call(
Tuple2<String, Iterable<String>> tuple)
throws Exception {
List<Tuple2<String, String>> extractSessionids =
new ArrayList<Tuple2<String, String>>();
String dateHour = tuple._1;
String date = dateHour.split("_")[0];
String hour = dateHour.split("_")[1];
Iterator<String> iterator = tuple._2.iterator();
List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour);
ISessionRandomExtractDAO sessionRandomExtractDAO =
DAOFactory.getSessionRandomExtractDAO();
int index = 0;
while(iterator.hasNext()) {
String sessionAggrInfo = iterator.next();
if(extractIndexList.contains(index)) {
String sessionid = StringUtils.getFieldFromConcatString(
sessionAggrInfo, "\\|", Constants.SESSION_PROJECT.FIELD_SESSION_ID);
// 将数据写入MySQL
SessionRandomExtract sessionRandomExtract = new SessionRandomExtract();
sessionRandomExtract.setTaskid(taskid);
sessionRandomExtract.setSessionid(sessionid);
sessionRandomExtract.setStartTime(StringUtils.getFieldFromConcatString(
sessionAggrInfo, "\\|", Constants.SESSION_PROJECT.FIELD_START_TIME));
sessionRandomExtract.setSearchKeywords(StringUtils.getFieldFromConcatString(
sessionAggrInfo, "\\|", Constants.SESSION_PROJECT.FIELD_SEARCH_KEYWORDS));
sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString(
sessionAggrInfo, "\\|", Constants.SESSION_PROJECT.FIELD_CLICK_CATEGORY_IDS));
sessionRandomExtractDAO.insert(sessionRandomExtract);
// 将sessionid加入list
extractSessionids.add(new Tuple2<String, String>(sessionid, sessionid));
}
index++;
}
return extractSessionids.iterator ();
}
});