项目源码:https://github.com/haha174/spark-session.git
文章地址:http://www.haha174.top/article/details/255956
一般的情况下,我们都不会直接输出所有的结果,需要对过滤条件对数据进行一定的过滤最后得到需要的数据,本篇介绍一下session 过滤
首先设置过滤条件参数
String FIELD_SESSION_ID="sessionid";
String PARAM_START_DATE="startDate";
String PARAM_END_DATE="endDate";
String FIELD_CITY="city";
String FIELD_SEARCH_KEYWORDS = "searchKeywords";
String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds";
String PARAM_SEX="sex";
String PARAM_KEYWORDS="keywords";
String PARAM_CITIES="cities";
String PARAM_PROFESSIONALS="professionals";
String PARAM_ENDAGE="endAge";
String PARAM_STARTAGE="startAge";
String PARAM_CATEGORY_IDS="categoryIds";
下面编写校验的工具类 为什么将过滤单独编写一个工具类呢主要为了后期的维护和优化
/** * 校验工具类 * @author wchen129 * @date 2018-03-21 */
public class ValidUtils {
/** * 校验数据中的指定字段,是否在指定范围内 * @param data 数据 * @param dataField 数据字段 * @param parameter 参数 * @param startParamField 起始参数字段 * @param endParamField 结束参数字段 * @return 校验结果 */
public static boolean between(String data, String dataField,
String parameter, String startParamField, String endParamField) {
String startParamFieldStr = StringUtils.getFieldFromConcatString(
parameter, "\\|", startParamField);
String endParamFieldStr = StringUtils.getFieldFromConcatString(
parameter, "\\|", endParamField);
if(startParamFieldStr == null || endParamFieldStr == null) {
return true;
}
int startParamFieldValue = Integer.valueOf(startParamFieldStr);
int endParamFieldValue = Integer.valueOf(endParamFieldStr);
String dataFieldStr = StringUtils.getFieldFromConcatString(
data, "\\|", dataField);
if(dataFieldStr != null) {
int dataFieldValue = Integer.valueOf(dataFieldStr);
if(dataFieldValue >= startParamFieldValue &&
dataFieldValue <= endParamFieldValue) {
return true;
} else {
return false;
}
}
return false;
}
/** * 校验数据中的指定字段,是否有值与参数字段的值相同 * @param data 数据 * @param dataField 数据字段 * @param parameter 参数 * @param paramField 参数字段 * @return 校验结果 */
public static boolean in(String data, String dataField,
String parameter, String paramField) {
String paramFieldValue = StringUtils.getFieldFromConcatString(
parameter, "\\|", paramField);
if(paramFieldValue == null) {
return true;
}
String[] paramFieldValueSplited = paramFieldValue.split(",");
String dataFieldValue = StringUtils.getFieldFromConcatString(
data, "\\|", dataField);
if(dataFieldValue != null) {
String[] dataFieldValueSplited = dataFieldValue.split(",");
for(String singleDataFieldValue : dataFieldValueSplited) {
for(String singleParamFieldValue : paramFieldValueSplited) {
if(singleDataFieldValue.equals(singleParamFieldValue)) {
return true;
}
}
}
}
return false;
}
/** * 校验数据中的指定字段,是否在指定范围内 * @param data 数据 * @param dataField 数据字段 * @param parameter 参数 * @param paramField 参数字段 * @return 校验结果 */
public static boolean equal(String data, String dataField,
String parameter, String paramField) {
String paramFieldValue = StringUtils.getFieldFromConcatString(
parameter, "\\|", paramField);
if(paramFieldValue == null) {
return true;
}
String dataFieldValue = StringUtils.getFieldFromConcatString(
data, "\\|", dataField);
if(dataFieldValue != null) {
if(dataFieldValue.equals(paramFieldValue)) {
return true;
}
}
return false;
}
}
过滤的业务逻辑
private static JavaPairRDD<String, String> filterSession(
JavaPairRDD<String, String> sessionid2AggrInfoRDD,
final JSONObject taskParam) {
// 为了使用我们后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串
// 此外,这里其实大家不要觉得是多此一举
// 其实我们是给后面的性能优化埋下了一个伏笔
String startAge = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_STARTAGE);
String endAge = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_ENDAGE);
String professionals = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_PROFESSIONALS);
String cities = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_CITIES);
String sex = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_SEX);
String keywords = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_KEYWORDS);
String categoryIds = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_CATEGORY_IDS);
String _parameter = (startAge != null ? Constants.SESSION_PROJECT.PARAM_STARTAGE + "=" + startAge + "|" : "")
+ (endAge != null ? Constants.SESSION_PROJECT.PARAM_ENDAGE + "=" + endAge + "|" : "")
+ (professionals != null ? Constants.SESSION_PROJECT.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
+ (cities != null ? Constants.SESSION_PROJECT.PARAM_CITIES + "=" + cities + "|" : "")
+ (sex != null ? Constants.SESSION_PROJECT.PARAM_SEX + "=" + sex + "|" : "")
+ (keywords != null ? Constants.SESSION_PROJECT.PARAM_KEYWORDS + "=" + keywords + "|" : "")
+ (categoryIds != null ? Constants.SESSION_PROJECT.PARAM_CATEGORY_IDS + "=" + categoryIds: "");
if(_parameter.endsWith("\\|")) {
_parameter = _parameter.substring(0, _parameter.length() - 1);
}
final String parameter = _parameter;
// 根据筛选参数进行过滤
JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(
new Function<Tuple2<String,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<String, String> tuple) throws Exception {
// 首先,从tuple中,获取聚合数据
String aggrInfo = tuple._2;
// 接着,依次按照筛选条件进行过滤
// 按照年龄范围进行过滤(startAge、endAge)
if(!ValidUtils.between(aggrInfo, Constants.FIELD.FIELD_AGE,
parameter, Constants.SESSION_PROJECT.PARAM_STARTAGE, Constants.SESSION_PROJECT.PARAM_ENDAGE)) {
return false;
}
// 按照职业范围进行过滤(professionals)
// 互联网,IT,软件
// 互联网
if(!ValidUtils.in(aggrInfo, Constants.FIELD.FIELD_PROFESSIONAL,
parameter, Constants.SESSION_PROJECT.PARAM_PROFESSIONALS)) {
return false;
}
// 按照城市范围进行过滤(cities)
// 北京,上海,广州,深圳
// 成都
if(!ValidUtils.in(aggrInfo, Constants.FIELD.FIELD_CITY,
parameter, Constants.SESSION_PROJECT.PARAM_CITIES)) {
return false;
}
// 按照性别进行过滤
// 男/女
// 男,女
if(!ValidUtils.equal(aggrInfo, Constants.FIELD.FIELD_SEX,
parameter, Constants.SESSION_PROJECT.PARAM_SEX)) {
return false;
}
// 按照搜索词进行过滤
// 我们的session可能搜索了 火锅,蛋糕,烧烤
// 我们的筛选条件可能是 火锅,串串香,iphone手机
// 那么,in这个校验方法,主要判定session搜索的词中,有任何一个,与筛选条件中
// 任何一个搜索词相当,即通过
if(!ValidUtils.in(aggrInfo, Constants.SESSION_PROJECT.FIELD_SEARCH_KEYWORDS,
parameter, Constants.SESSION_PROJECT.PARAM_KEYWORDS)) {
return false;
}
// 按照点击品类id进行过滤
if(!ValidUtils.in(aggrInfo, Constants.SESSION_PROJECT.FIELD_CLICK_CATEGORY_IDS,
parameter, Constants.SESSION_PROJECT.PARAM_CATEGORY_IDS)) {
return false;
}
return true;
}
});
return filteredSessionid2AggrInfoRDD;
}