flink(七) 电商用户行为分析(七)订单支付实时监控之订单超时、订单交易匹配

时间:2024-03-07 15:06:32

1 简介

  在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网
站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的
交易数据来做一个实时对账。在接下来的内容中,我们将实现这两个需求。

2 模块创建和数据准备

  同样地,在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 OrderTimeoutDetect。在这个子模块中,我们同样将会用到 flink 的 CEP 库来实现
事件流的模式匹配,所以需要在 pom 文件中引入 CEP 的相关依赖:
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
package com.atguigu.orderpay_detect

import java.util

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


// 输入输出的样例类
case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)
case class OrderResult(orderId:Long, resultMsg: String)

object OrderTimeOut {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 从文件中读取数据,并转换为样例类
    val resource = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
        override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
      })

    // 1 定义一个要匹配事件序列的模式
    val orderPayPattern = Pattern
      .begin[OrderEvent]("create").where(_.eventType == "create") // 首先是订单的create事件
      .followedBy("pay").where(_.eventType == "pay") // 后面来的是订单的pay事件
      .within(Time.minutes(15))

    // 2 将pattern应用在按照orderId分组的数据流上
    val patternStream = CEP.pattern(orderEventStream.keyBy(_.orderId), orderPayPattern)

    // 3 定义一个侧输出流标签,用来标明超时事件的侧输出流
    val orderTimeOutOutputTag = new OutputTag[OrderResult]("order timeout")

    // 4 调用select方法,提取匹配事件和超时事件,分别进行转换输出
    val resultStream: DataStream[OrderResult] = patternStream
      .select(orderTimeOutOutputTag, new OrderTimeoutSelect(), new OrderPaySelect())

    // 5 打印输出
    resultStream.print("payed")
    resultStream.getSideOutput(orderTimeOutOutputTag).print("timeout")

    env.execute(" order timeout detect job")

  }

}

// 自定义超时处理函数
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{
  override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
    val timeoutOrderId = map.get("create").iterator().next().orderId
    OrderResult(timeoutOrderId, "timeout at" + l)
  }
}

//自定义匹配处理函数
class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{
  override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
    val payedOrderId = map.get("pay").get(0).orderId
    OrderResult(payedOrderId, "payed successfully")
  }

}

withoutCEP

package com.atguigu.orderpay_detect

import com.atguigu.orderpay_detect.OrderTimeOut.getClass
import org.apache.flink.api.common.state._
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector


// 输入输出的样例类
case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)
case class OrderResult(orderId:Long, resultMsg: String)

object OrderTimeoutWithoutCEP {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 从文件中读取数据,并转换为样例类
    val resource = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
        override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
      })

    // 自定义Process Function 做精细化的流程控制

    val orderResultStream:DataStream[OrderResult] = orderEventStream
        .keyBy(_.orderId)
        .process( new OrderPayMatchDetect())

    // 打印输出
    orderEventStream.print()
    orderResultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print("timeout")

    env.execute(" order timeout detect job")
  }
}

// 实现自定义KeyedProcessFunction, 主流输出正常支付订单,侧输出流输出超时报警订单

class OrderPayMatchDetect() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{
  // 定义状态,用来保存是否来过create和pay事件的标识位,以及定时器事件戳
  lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed", classOf[Boolean]))
  lazy val isCreatedState: ValueState[Boolean] =  getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-created", classOf[Boolean]))
  lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("time-ts", classOf[Long]))

  val orderTimeoutOutputTag = new OutputTag[OrderResult]("timeout")



  override def processElement(value: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = {
    // 先取出当前状态
    val isPayed = isPayedState.value()
    val isCreated = isCreatedState.value()
    val timerTs = timerTsState.value()


    // 判断当前事件的类型,分成不同情况讨论
    // 情况1:来的是create, 要继续判断之前是否有pay来过
    if (value.eventType == "create") {
      // 情况1.1: 如果已经pay过的话,匹配成功
      if (isPayed) {
        collector.collect(OrderResult(value.orderId, "payed successfully"))
        isPayedState.clear()
        timerTsState.clear()
        context.timerService().deleteEventTimeTimer(timerTs)
      }
      // 情况1.2:如果没有pay过的话,那么注册一个15分钟的定时器,开始等待
      else {
        val ts = value.eventTime * 1000L + 15 * 60 * 1000L
        context.timerService().registerEventTimeTimer(ts)
        timerTsState.update(ts)
        isCreatedState.update(true)

      }
    }
    // 情况2:来的是pay,要继续判断是否来过create
    else if (value.eventType == "pay"){
    // 情况2.1; 如果create 已经来过,匹配成功,要继续判断间隔时间是否超过了15分钟
      if( isCreated){
        // 情况2.1.1: 如果没有超时,正常输出结果到主流
        if(value.eventTime * 1000L < timerTs) {
          collector.collect(OrderResult(value.orderId, "payed successfully"))
        }else{
          // 情况2.1.2: 如果已经超时,输出timeout报警到侧输出流
          context.output(orderTimeoutOutputTag, OrderResult(value.orderId, "payed but already timeout"))
        }
        // 不论哪种情况,有了输出,清空状态
        isCreatedState.clear()
        timerTsState.clear()
        context.timerService().deleteEventTimeTimer(timerTs)

      }
    // 情况2.2: 如果create没来,需要等待乱序create,注册一个当前pay时间戳的定时器
      else{
        val ts = value.eventTime *1000L
        context.timerService().registerEventTimeTimer(ts)
        timerTsState.update(ts)
        isPayedState.update(true)
      }
  }

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
    // 定时器触发要判断是哪种情况
    if( isPayedState.value()){
      // 如果pay过,那么说明create没来,可能出现数据丢失异常情况
      ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found created log"))
    } else {
      // 如果没有pay 过,那么说明真正15分钟超时
      ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
    }

    // 清理状态
    isPayedState.clear()
    isCreatedState.clear()
    timerTsState.clear()
  }

}

3 来自两条流的订单交易匹配

  对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来
做 合 并 处 理 。 这 里 我 们 利 用 connect 将 两 条 流 进 行 连 接 , 然 后 用 自 定 义 的CoProcessFunction 进行处理。
package com.atguigu.orderpay_detect

import com.atguigu.orderpay_detect.OrderTimeoutWithoutCEP.getClass
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

// 输入输出的样例类
case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)
case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)

object OrderPayTxMatch {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 从文件中读取数据,并转换为样例类
    val resource = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
        override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
      })
      .filter(_.eventType != "") // 只过滤出pay事件
      .keyBy(_.txId)

    // 从文件中读取数据,并转换为样例类
    val resource2 = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val receiptEventStream: DataStream[ReceiptEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\ReceiptLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) {
        override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L
      })
      .keyBy(_.txId)


    // 用connect连接两条流,匹配事件进行处理
    val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream
      .connect(receiptEventStream)
      .process(new OrderPayTxDetect())

    val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
    val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")

    resultStream.print("matched")
    resultStream.getSideOutput(unmatchedPays).print("unmatched-pays")
    resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts")
    env.execute("order pay tx match job")

  }
}

// 自定义CoProcessFunction 实现两条流数据的匹配检验
class OrderPayTxDetect() extends CoProcessFunction[OrderEvent, ReceiptEvent,(OrderEvent, ReceiptEvent)]{
  // 用两个valueState 保存当前交易应对的支付事件和到账事件
  lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent]))
  lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent]))

  val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
  val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")


  override def processElement1(pay: OrderEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    // pay 来了,考察是否有对应的receipt来过
    val receipt = receiptState.value()
    if(receipt !=null){
      collector.collect((pay, receipt))
      receiptState.clear()

    }else{
      // 如果receipt还没有来,那么把pay存入状态,注册一个定时器等待5秒
      payState.update(pay)
      context.timerService().registerEventTimeTimer(pay.eventTime *1000L + 5000L)
    }

  }

  override def processElement2(receipt: ReceiptEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    // receipt 来了,考察是否有对应的receipt来过
    val pay = payState.value()
    if(pay !=null){
      collector.collect((pay, receipt))
      payState.clear()

    }else{
      // 如果pay还没有来,那么把pay存入状态,注册一个定时器等待3秒
      receiptState.update(receipt)
      context.timerService().registerEventTimeTimer(receipt.timestamp *1000L + 3000L)
    }
  }
// 定时触发, 有两种情况,所以要判断当前有没有pay和receipt
  override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
  // 如果pay不为空,说明receipt没来,输出unmatechedPays
    if(payState.value() != null){
      ctx.output(unmatchedPays,payState.value())
    }
    if(receiptState.value() != null){
      ctx.output( unmatchedReceipts, receiptState.value())
    }
    payState.clear()
    receiptState.clear()

  }
}

withJOIN

package com.atguigu.orderpay_detect

import com.atguigu.orderpay_detect.OrderPayTxMatch.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

// 输入输出的样例类
case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)
case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)

object OrderPayTxMatchWithJoin {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 从文件中读取数据,并转换为样例类
    val resource = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val orderEventStream: KeyedStream[OrderEvent, String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
        override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
      })
      .filter(_.eventType != "") // 只过滤出pay事件
      .keyBy(_.txId)

    // 从文件中读取数据,并转换为样例类
    val resource2 = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val receiptEventStream: KeyedStream[ReceiptEvent, String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\ReceiptLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) {
        override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L
      })
      .keyBy(_.txId)

    // 使用join连接两条流
    val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream
      .intervalJoin(receiptEventStream)
      .between(Time.seconds(-3),Time.seconds(5))
      .process(new OrderPayTxDetectWithJoin())

    resultStream.print()
    env.execute("order pay tx match with join job")

  }

}

// 自定义ProcessJoinFunction
class OrderPayTxDetectWithJoin() extends ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{
  override def processElement(left: OrderEvent, right: ReceiptEvent, context: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    collector.collect((left, right))
  }
}