Flink 计算 Pv 和 Uv 的通用方法

时间:2022-03-10 02:51:26

Flink 计算 Pv 和 Uv 的通用方法

PV(访问量):即Page View, 即页面浏览量或点击量,用户每次刷新即被计算一次。

UV(独立访客):即Unique Visitor,访问您网站的一台电脑客户端为一个访客。00:00-24:00内相同的客户端只被计算一次。

计算网站App的实时pv和uv,是很常见的统计需求,这里提供通用的计算方法,不同的业务需求只需要小改即可拿来即用。

需求

利用Flink实时统计,从0点到当前的pv、uv。

一、需求分析

从Kafka发送过来的数据含有:时间戳、时间、维度、用户id,需要从不同维度统计从0点到当前时间的pv和uv,第二天0点重新开始计数第二天的。

二、技术方案

Kafka数据可能会有延迟乱序,这里引入watermark;

通过keyBy分流进不同的滚动window,每个窗口内计算pv、uv;

由于需要保存一天的状态,process里面使用ValueState保存pv、uv;

使用BitMap类型ValueState,占内存很小,引入支持bitmap的依赖;

保存状态需要设置ttl过期时间,第二天把第一天的过期,避免内存占用过大。

三、数据准备

这里假设是用户订单数据,数据格式如下:

  1. {"time":"2021-10-3122:00:01","timestamp":"1635228001","product":"苹果手机","uid":255420}
  2. {"time":"2021-10-3122:00:02","timestamp":"1635228001","product":"MacBookPro","uid":255421}

四、代码实现

整个工程代码截图如下(抹去了一些不方便公开的信息):

Flink 计算 Pv 和 Uv 的通用方法

pvuv-project

1. 环境

kafka:1.0.0;

Flink:1.11.0;

2. 发送测试数据

首先发送数据到kafka测试集群,maven依赖:

  1. org.apache.kafka
  2. kafka-clients
  3. 2.4.1

2.4.1

发送代码:

  1. importcom.alibaba.fastjson.JSON;
  2. importcom.alibaba.fastjson.JSONObject;
  3. importjodd.util.ThreadUtil;
  4. importorg.apache.commons.lang3.StringUtils;
  5. importorg.junit.Test;
  6. importjava.io.*;
  7. publicclassSendDataToKafka{
  8. @Test
  9. publicvoidsendData()throwsIOException{
  10. Stringinpath="E:\\我的文件\\click.txt";
  11. Stringtopic="click_test";
  12. intcnt=0;
  13. Stringline;
  14. InputStreaminputStream=newFileInputStream(inpath);
  15. Readerreader=newInputStreamReader(inputStream);
  16. LineNumberReaderlnr=newLineNumberReader(reader);
  17. while((line=lnr.readLine())!=null){
  18. //这里的KafkaUtil是个生产者、消费者工具类,可以自行实现
  19. KafkaUtil.sendDataToKafka(topic,String.valueOf(cnt),line);
  20. cnt=cnt+1;
  21. ThreadUtil.sleep(100);
  22. }
  23. }
  24. }

3. 主要程序

先定义个pojo:

  1. @NoArgsConstructor
  2. @AllArgsConstructor
  3. @Data
  4. @ToString
  5. publicclassUserClickModel{
  6. privateStringdate;
  7. privateStringproduct;
  8. privateintuid;
  9. privateintpv;
  10. privateintuv;
  11. }

接着就是使用Flink消费kafka,指定Watermark,通过KeyBy分流,进入滚动窗口函数通过状态保存pv和uv。

  1. publicclassUserClickMain{
  2. privatestaticfinalMapconfig=Configuration.initConfig("commons.xml");
  3. publicstaticvoidmain(String[]args)throwsException{
  4. //初始化环境,配置相关属性
  5. StreamExecutionEnvironmentsenv=StreamExecutionEnvironment.getExecutionEnvironment();
  6. senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  7. senv.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);
  8. senv.setStateBackend(newFsStateBackend("hdfs://bigdata/flink/checkpoints/userClick"));
  9. //读取kafka
  10. PropertieskafkaProps=newProperties();
  11. kafkaProps.setProperty("bootstrap.servers",config.get("kafka-ipport"));
  12. kafkaProps.setProperty("group.id",config.get("kafka-groupid"));
  13. //kafkaProps.setProperty("auto.offset.reset","earliest");
  14. //watrmark允许数据延迟时间
  15. longmaxOutOfOrderness=5*1000L;
  16. SingleOutputStreamOperatordataStream=senv.addSource(
  17. newFlinkKafkaConsumer<>(
  18. config.get("kafka-topic"),
  19. newSimpleStringSchema(),
  20. kafkaProps
  21. ))
  22. //设置watermark
  23. .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(maxOutOfOrderness))
  24. .withTimestampAssigner((element,recordTimestamp)->{
  25. //时间戳须为毫秒
  26. returnLong.valueOf(JSON.parseObject(element).getString("timestamp"))*1000;
  27. })).map(newFCClickMapFunction()).returns(TypeInformation.of(newTypeHint(){
  28. }));
  29. //按照(date,product)分组
  30. dataStream.keyBy(newKeySelector>(){
  31. @Override
  32. publicTuple2getKey(UserClickModelvalue)throwsException{
  33. returnTuple2.of(value.getDate(),value.getProduct());
  34. }
  35. })
  36. //一天为窗口,指定时间起点比时间戳时间早8个小时
  37. .window(TumblingEventTimeWindows.of(Time.days(1),Time.hours(-8)))
  38. //10s触发一次计算,更新统计结果
  39. .trigger(ContinuousEventTimeTrigger.of(Time.seconds(10)))
  40. //计算pvuv
  41. .process(newMyProcessWindowFunctionBitMap())
  42. //保存结果到mysql
  43. .addSink(newFCClickSinkFunction());
  44. senv.execute(UserClickMain.class.getSimpleName());
  45. }
  46. }

代码都是一些常规代码,但是还是有几点需要注意的。

注意

设置watermark,flink1.11中使用WatermarkStrategy,老的已经废弃了;

我的数据里面时间戳是秒,需要乘以1000,flink提取时间字段,必须为毫秒;

.window只传入一个参数,表明是滚动窗口,TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))这里指定了窗口的大小为一天,由于中国北京时间是东8区,比国际时间早8个小时,需要引入offset,可以自行进入该方法源码查看英文注释。

Rather than that,if you are living in somewhere which is not using UTC±00:00 time,

* such as China which is using UTC+08:00,and you want a time window with size of one day,

* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.

* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.

一天大小的窗口,根据watermark机制一天触发计算一次,显然是不合理的,需要用trigger函数指定触发间隔为10s一次,这样我们的pv和uv就是10s更新一次结果。

4. 关键代码,计算uv

由于这里用户id刚好是数字,可以使用bitmap去重,简单原理是:把 user_id 作为 bit 的偏移量 offset,设置为 1 表示有访问,使用 1 MB的空间就可以存放 800 多万用户的一天访问计数情况。

redis是自带bit数据结构的,不过为了尽量少依赖外部存储媒介,这里自己实现bit,引入相应maven依赖即可:

  1. org.roaringbitmap
  2. RoaringBitmap
  3. 0.8.0

计算pv、uv的代码其实都是通用的,可以根据自己的实际业务情况快速修改的:

  1. publicclassMyProcessWindowFunctionBitMapextendsProcessWindowFunction,TimeWindow>{
  2. privatetransientValueState<Integer>pvState;
  3. privatetransientValueStatebitMapState;
  4. @Override
  5. publicvoidopen(Configurationparameters)throwsException{
  6. super.open(parameters);
  7. ValueStateDescriptor<Integer>pvStateDescriptor=newValueStateDescriptor<>("pv",Integer.class);
  8. ValueStateDescriptorbitMapStateDescriptor=newValueStateDescriptor("bitMap"
  9. ,TypeInformation.of(newTypeHint(){}));
  10. //过期状态清除
  11. StateTtlConfigstateTtlConfig=StateTtlConfig
  12. .newBuilder(Time.days(1))
  13. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  14. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  15. .build();
  16. //开启ttl
  17. pvStateDescriptor.enableTimeToLive(stateTtlConfig);
  18. bitMapStateDescriptor.enableTimeToLive(stateTtlConfig);
  19. pvState=this.getRuntimeContext().getState(pvStateDescriptor);
  20. bitMapState=this.getRuntimeContext().getState(bitMapStateDescriptor);
  21. }
  22. @Override
  23. publicvoidprocess(Tuple2key,Contextcontext,Iterableelements,Collectorout)throwsException{
  24. //当前状态的pvuv
  25. Integerpv=pvState.value();
  26. Roaring64NavigableMapbitMap=bitMapState.value();
  27. if(bitMap==null){
  28. bitMap=newRoaring64NavigableMap();
  29. pv=0;
  30. }
  31. Iteratoriterator=elements.iterator();
  32. while(iterator.hasNext()){
  33. pv=pv+1;
  34. intuid=iterator.next().getUid();
  35. //如果userId可以转成long
  36. bitMap.add(uid);
  37. }
  38. //更新pv
  39. pvState.update(pv);
  40. UserClickModelUserClickModel=newUserClickModel();
  41. UserClickModel.setDate(key.f0);
  42. UserClickModel.setProduct(key.f1);
  43. UserClickModel.setPv(pv);
  44. UserClickModel.setUv(bitMap.getIntCardinality());
  45. out.collect(UserClickModel);
  46. }
  47. }

注意

由于计算uv第二天的时候,就不需要第一天数据了,要及时清理内存中前一天的状态,通过ttl机制过期;

最终结果保存到mysql里面,如果数据结果分类聚合太多,要注意mysql压力,这块可以自行优化;

五、其它方法

除了使用bitmap去重外,还可以使用Flink SQL,编码更简洁,还可以借助外面的媒介Redis去重:

  • 基于 set
  • 基于 bit
  • 基于 HyperLogLog
  • 基于bloomfilter

具体思路是,计算pv、uv都塞入redis里面,然后再获取值保存统计结果,也是比较常用的。

原文链接:https://mp.weixin.qq.com/s/6nApSSK-xDAwnXp1r2m-ug