spark 大型项目实战(十六):用户访问session分析(十六) --session 聚合过滤

时间:2021-06-15 00:51:44

项目源码:https://github.com/haha174/spark-session.git
文章地址:http://www.haha174.top/article/details/255956
一般的情况下,我们都不会直接输出所有的结果,需要对过滤条件对数据进行一定的过滤最后得到需要的数据,本篇介绍一下session 过滤
首先设置过滤条件参数

        String FIELD_SESSION_ID="sessionid";
         String PARAM_START_DATE="startDate";
         String PARAM_END_DATE="endDate";
         String FIELD_CITY="city";
        String FIELD_SEARCH_KEYWORDS = "searchKeywords";
        String FIELD_CLICK_CATEGORY_IDS = "clickCategoryIds";
        String PARAM_SEX="sex";
        String PARAM_KEYWORDS="keywords";
        String PARAM_CITIES="cities";
        String PARAM_PROFESSIONALS="professionals";
        String PARAM_ENDAGE="endAge";
        String PARAM_STARTAGE="startAge";
        String PARAM_CATEGORY_IDS="categoryIds";

下面编写校验的工具类 为什么将过滤单独编写一个工具类呢主要为了后期的维护和优化

/** * 校验工具类 * @author wchen129 * @date 2018-03-21 */
public class ValidUtils {

    /** * 校验数据中的指定字段,是否在指定范围内 * @param data 数据 * @param dataField 数据字段 * @param parameter 参数 * @param startParamField 起始参数字段 * @param endParamField 结束参数字段 * @return 校验结果 */
    public static boolean between(String data, String dataField, 
            String parameter, String startParamField, String endParamField) {
        String startParamFieldStr = StringUtils.getFieldFromConcatString(
                parameter, "\\|", startParamField);
        String endParamFieldStr = StringUtils.getFieldFromConcatString(
                parameter, "\\|", endParamField); 
        if(startParamFieldStr == null || endParamFieldStr == null) {
            return true;
        }

        int startParamFieldValue = Integer.valueOf(startParamFieldStr);
        int endParamFieldValue = Integer.valueOf(endParamFieldStr);

        String dataFieldStr = StringUtils.getFieldFromConcatString(
                data, "\\|", dataField);
        if(dataFieldStr != null) {
            int dataFieldValue = Integer.valueOf(dataFieldStr);
            if(dataFieldValue >= startParamFieldValue &&
                    dataFieldValue <= endParamFieldValue) {
                return true;
            } else {
                return false;
            }
        }

        return false;
    }

    /** * 校验数据中的指定字段,是否有值与参数字段的值相同 * @param data 数据 * @param dataField 数据字段 * @param parameter 参数 * @param paramField 参数字段 * @return 校验结果 */
    public static boolean in(String data, String dataField, 
            String parameter, String paramField) {
        String paramFieldValue = StringUtils.getFieldFromConcatString(
                parameter, "\\|", paramField);
        if(paramFieldValue == null) {
            return true;
        }
        String[] paramFieldValueSplited = paramFieldValue.split(",");  

        String dataFieldValue = StringUtils.getFieldFromConcatString(
                data, "\\|", dataField);
        if(dataFieldValue != null) {
            String[] dataFieldValueSplited = dataFieldValue.split(",");

            for(String singleDataFieldValue : dataFieldValueSplited) {
                for(String singleParamFieldValue : paramFieldValueSplited) {
                    if(singleDataFieldValue.equals(singleParamFieldValue)) {
                        return true;
                    }
                }
            }
        }

        return false;
    }

    /** * 校验数据中的指定字段,是否在指定范围内 * @param data 数据 * @param dataField 数据字段 * @param parameter 参数 * @param paramField 参数字段 * @return 校验结果 */
    public static boolean equal(String data, String dataField, 
            String parameter, String paramField) {  
        String paramFieldValue = StringUtils.getFieldFromConcatString(
                parameter, "\\|", paramField);
        if(paramFieldValue == null) {
            return true;
        }

        String dataFieldValue = StringUtils.getFieldFromConcatString(
                data, "\\|", dataField);
        if(dataFieldValue != null) {
            if(dataFieldValue.equals(paramFieldValue)) {
                return true;
            }
        }

        return false;
    }

}

过滤的业务逻辑

private static JavaPairRDD<String, String> filterSession(
            JavaPairRDD<String, String> sessionid2AggrInfoRDD,
            final JSONObject taskParam) {
        // 为了使用我们后面的ValieUtils,所以,首先将所有的筛选参数拼接成一个连接串
        // 此外,这里其实大家不要觉得是多此一举
        // 其实我们是给后面的性能优化埋下了一个伏笔
        String startAge = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_STARTAGE);
        String endAge = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_ENDAGE);
        String professionals = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_PROFESSIONALS);
        String cities = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_CITIES);
        String sex = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_SEX);
        String keywords = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_KEYWORDS);
        String categoryIds = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_CATEGORY_IDS);

        String _parameter = (startAge != null ? Constants.SESSION_PROJECT.PARAM_STARTAGE + "=" + startAge + "|" : "")
                + (endAge != null ? Constants.SESSION_PROJECT.PARAM_ENDAGE + "=" + endAge + "|" : "")
                + (professionals != null ? Constants.SESSION_PROJECT.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
                + (cities != null ? Constants.SESSION_PROJECT.PARAM_CITIES + "=" + cities + "|" : "")
                + (sex != null ? Constants.SESSION_PROJECT.PARAM_SEX + "=" + sex + "|" : "")
                + (keywords != null ? Constants.SESSION_PROJECT.PARAM_KEYWORDS + "=" + keywords + "|" : "")
                + (categoryIds != null ? Constants.SESSION_PROJECT.PARAM_CATEGORY_IDS + "=" + categoryIds: "");

        if(_parameter.endsWith("\\|")) {
            _parameter = _parameter.substring(0, _parameter.length() - 1);
        }

        final String parameter = _parameter;

        // 根据筛选参数进行过滤
        JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(

                new Function<Tuple2<String,String>, Boolean>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<String, String> tuple) throws Exception {
                        // 首先,从tuple中,获取聚合数据
                        String aggrInfo = tuple._2;

                        // 接着,依次按照筛选条件进行过滤
                        // 按照年龄范围进行过滤(startAge、endAge)
                        if(!ValidUtils.between(aggrInfo, Constants.FIELD.FIELD_AGE,
                                parameter, Constants.SESSION_PROJECT.PARAM_STARTAGE, Constants.SESSION_PROJECT.PARAM_ENDAGE)) {
                            return false;
                        }

                        // 按照职业范围进行过滤(professionals)
                        // 互联网,IT,软件
                        // 互联网
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD.FIELD_PROFESSIONAL,
                                parameter, Constants.SESSION_PROJECT.PARAM_PROFESSIONALS)) {
                            return false;
                        }

                        // 按照城市范围进行过滤(cities)
                        // 北京,上海,广州,深圳
                        // 成都
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD.FIELD_CITY,
                                parameter, Constants.SESSION_PROJECT.PARAM_CITIES)) {
                            return false;
                        }

                        // 按照性别进行过滤
                        // 男/女
                        // 男,女
                        if(!ValidUtils.equal(aggrInfo, Constants.FIELD.FIELD_SEX,
                                parameter, Constants.SESSION_PROJECT.PARAM_SEX)) {
                            return false;
                        }

                        // 按照搜索词进行过滤
                        // 我们的session可能搜索了 火锅,蛋糕,烧烤
                        // 我们的筛选条件可能是 火锅,串串香,iphone手机
                        // 那么,in这个校验方法,主要判定session搜索的词中,有任何一个,与筛选条件中
                        // 任何一个搜索词相当,即通过
                        if(!ValidUtils.in(aggrInfo, Constants.SESSION_PROJECT.FIELD_SEARCH_KEYWORDS,
                                parameter, Constants.SESSION_PROJECT.PARAM_KEYWORDS)) {
                            return false;
                        }

                        // 按照点击品类id进行过滤
                        if(!ValidUtils.in(aggrInfo, Constants.SESSION_PROJECT.FIELD_CLICK_CATEGORY_IDS,
                                parameter, Constants.SESSION_PROJECT.PARAM_CATEGORY_IDS)) {
                            return false;
                        }

                        return true;
                    }

                });

        return filteredSessionid2AggrInfoRDD;
    }

欢迎关注,更多福利

spark 大型项目实战(十六):用户访问session分析(十六) --session 聚合过滤