spark 大型项目实战(十五):用户访问session分析(十五) --按照session 粒度进行数据聚合

时间:2022-05-20 00:47:31

文章地址:http://www.haha174.top/article/details/253715
源码:https://github.com/haha174/spark-session.git
上一遍中通过代码构造出了测试的数据下面开始对该数据进行聚合
首先分析一下

/** * * 接收用户创建的分析任务,用户可能指定的条件如下: * * 1、时间范围:起始日期~结束日期 * 2、性别:男或女 * 3、年龄范围 * 4、职业:多选 * 5、城市:多选 * 6、搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件 * 7、点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件 * * 我们的spark作业如何接受用户创建的任务? * * J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param * 字段中 * * 接着J2EE平台会执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本 * spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数,传递给Spark作业的main函数 * 参数就封装在main函数的args数组中 * 这是spark本身提供的特性 */

下面创建一个task (向task 表中插入一条数据 在测试的时候写的是固定的值但是实际上是在 环境信息中获取的)
使用之前开发好的jdbc 组件插入一条数据如下

  private static  void setTask() {
        Map<String, String> map = new HashMap();
        map.put("startDate", "2018-05-01");
        map.put("endDate", "2018-06-31");
        String sql = "insert into task(task_id,task_name,task_param) values('" + taskid + "','test01','" + JSON.toJSONString(map) + "')";
        SessionFactory sessionFactory = SessionFactory.getSessionFactory();
        sessionFactory.executeUpdate(sql, null);
    }

下面获取当前的taskId 取得进行session 参数条件

//taskDAO  请参考  https://blog.csdn.net/u012957549/article/details/80445329
   Task task = taskDAO.findById(taskid);
        JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());

获取指定参数内的session 数据

   /** * 获取指定日期范围内的用户访问行为数据 * @param sqlContext SQLContext * @param taskParam 任务参数 * @return 行为数据RDD */
    private static JavaRDD<Row> getActionRDDByDateRange(
            SQLContext sqlContext, JSONObject taskParam) {
        String startDate = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_START_DATE);
        String endDate = ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_END_DATE);

        String sql =
                "select * "
                        + "from user_visit_action "
                        + "where date>='" + startDate + "' "
                        + "and date<='" + endDate + "'";

        Dataset actionDF = sqlContext.sql(sql);

        return actionDF.javaRDD();
    }
   // 首先,可以将行为数据,按照session_id进行groupByKey分组
        // 此时的数据的粒度就是session粒度了,然后呢,可以将session粒度的数据
        // 与用户信息数据,进行join
        // 然后就可以获取到session粒度的数据,同时呢,数据里面还包含了session对应的user的信息
        JavaPairRDD<String, String> sessionid2AggrInfoRDD =
                aggregateBySession(sqlContext, actionRDD);
 /** * 对行为数据按session粒度进行聚合 * @param actionRDD 行为数据RDD * @return session粒度聚合数据 */
    private static JavaPairRDD<String, String> aggregateBySession(SQLContext sqlContext, JavaRDD<Row> actionRDD) {
        // 现在actionRDD中的元素是Row,一个Row就是一行用户访问行为记录,比如一次点击或者搜索
        // 我们现在需要将这个Row映射成<sessionid,Row>的格式
        JavaPairRDD<String, Row> sessionid2ActionRDD = actionRDD.mapToPair(

                /** * PairFunction * 第一个参数,相当于是函数的输入 * 第二个参数和第三个参数,相当于是函数的输出(Tuple),分别是Tuple第一个和第二个值 */
                new PairFunction<Row, String, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Row> call(Row row) throws Exception {
                        return new Tuple2<String, Row>(row.getString(2), row);
                    }

                });

        // 对行为数据按session粒度进行分组
        JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =
                sessionid2ActionRDD.groupByKey();

        // 对每一个session分组进行聚合,将session中所有的搜索词和点击品类都聚合起来
        // 到此为止,获取的数据格式,如下:<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
        JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(

                new PairFunction<Tuple2<String, Iterable<Row>>, Long, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
                            throws Exception {
                        String sessionid = tuple._1;
                        Iterator<Row> iterator = tuple._2.iterator();

                        StringBuffer searchKeywordsBuffer = new StringBuffer("");
                        StringBuffer clickCategoryIdsBuffer = new StringBuffer("");

                        Long userid = null;

                        // 遍历session所有的访问行为
                        while (iterator.hasNext()) {
                            // 提取每个访问行为的搜索词字段和点击品类字段
                            Row row = iterator.next();
                            if (userid == null) {
                                userid = row.getLong(1);
                            }
                            String searchKeyword = row.getString(5);
                            Long clickCategoryId = null;
                            if (row.get(6) != null) {
                                clickCategoryId = row.getLong(6);
                            }
                            // 实际上这里要对数据说明一下
                            // 并不是每一行访问行为都有searchKeyword何clickCategoryId两个字段的
                            // 其实,只有搜索行为,是有searchKeyword字段的
                            // 只有点击品类的行为,是有clickCategoryId字段的
                            // 所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现null值的

                            // 我们决定是否将搜索词或点击品类id拼接到字符串中去
                            // 首先要满足:不能是null值
                            // 其次,之前的字符串中还没有搜索词或者点击品类id

                            if (StringUtils.isNotEmpty(searchKeyword)) {
                                if (!searchKeywordsBuffer.toString().contains(searchKeyword)) {
                                    searchKeywordsBuffer.append(searchKeyword + ",");
                                }
                            }
                            if (clickCategoryId != null) {
                                if (!clickCategoryIdsBuffer.toString().contains(
                                        String.valueOf(clickCategoryId))) {
                                    clickCategoryIdsBuffer.append(clickCategoryId + ",");
                                }
                            }
                        }

                        String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
                        String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());

                        // 我们返回的数据格式,即使<sessionid,partAggrInfo>
                        // 但是,这一步聚合完了以后,其实,我们是还需要将每一行数据,跟对应的用户信息进行聚合
                        // 问题就来了,如果是跟用户信息进行聚合的话,那么key,就不应该是sessionid
                        // 就应该是userid,才能够跟<userid,Row>格式的用户信息进行聚合
                        // 如果我们这里直接返回<sessionid,partAggrInfo>,还得再做一次mapToPair算子
                        // 将RDD映射成<userid,partAggrInfo>的格式,那么就多此一举

                        // 所以,我们这里其实可以直接,返回的数据格式,就是<userid,partAggrInfo>
                        // 然后跟用户信息join的时候,将partAggrInfo关联上userInfo
                        // 然后再直接将返回的Tuple的key设置成sessionid
                        // 最后的数据格式,还是<sessionid,fullAggrInfo>

                        // 聚合数据,用什么样的格式进行拼接?
                        // 我们这里统一定义,使用key=value|key=value
                        String partAggrInfo = Constants.SESSION_PROJECT.FIELD_SESSION_ID + "=" + sessionid + "|"
                                + Constants.SESSION_PROJECT.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
                                + Constants.SESSION_PROJECT.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds;

                        return new Tuple2<Long, String>(userid, partAggrInfo);
                    }

                });

        // 查询所有用户数据,并映射成<userid,Row>的格式
        String sql = "select * from user_info";
        JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();

        JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(

                new PairFunction<Row, Long, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, Row> call(Row row) throws Exception {
                        return new Tuple2<Long, Row>(row.getLong(0), row);
                    }

                });

        // 将session粒度聚合数据,与用户信息进行join
        JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =
                userid2PartAggrInfoRDD.join(userid2InfoRDD);

        // 对join起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据
        JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(

                new PairFunction<Tuple2<Long, Tuple2<String, Row>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<Long, Tuple2<String, Row>> tuple)
                            throws Exception {
                        String partAggrInfo = tuple._2._1;
                        Row userInfoRow = tuple._2._2;

                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.SESSION_PROJECT.FIELD_SESSION_ID);

                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);

                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD.FIELD_SEX + "=" + sex;

                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }

                });

        return sessionid2FullAggrInfoRDD;
    }

欢迎关注,更多福利

spark 大型项目实战(十五):用户访问session分析(十五) --按照session 粒度进行数据聚合