美文网首页
Flink - CEP监控下单付款超时通知

Flink - CEP监控下单付款超时通知

作者: 大猪大猪 | 来源:发表于2019-07-13 22:51 被阅读0次

    在电商领域通常会有这样一种需要,如果客户下单了,但是在10分钟内不付款,应该需要通知客服,再由客服寻问客户为什么还没有付款,从而提高付款效率,我们可以采用Flink - CEP的超时机制来处理。

    执行流程

    Flink-CEP监控

    Usage

    导入依赖

    compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: "1.6.2"
    compile group: 'org.apache.flink', name: 'flink-cep-scala_2.11', version: "1.6.2"
    

    使用Iterator模拟用户下单行为

    case class OrderEvent(
                           userId: String,
                           `type`: String
                         )
    class DataSource extends Iterator[OrderEvent] with Serializable {
      val atomicInteger = new AtomicInteger(0)
    
      val orderEventList = List(
        OrderEvent("1", "create"),
        OrderEvent("2", "create"),
        OrderEvent("2", "pay")
      )
    
      override def hasNext: Boolean = {
        TimeUnit.SECONDS.sleep(1)
        true
      }
    
      override def next(): OrderEvent = {
        orderEventList(atomicInteger.getAndIncrement() % 3)
      }
    }
    

    创建定单流

    val orderEventStream = env.fromCollection(new DataSource())
    

    创建定单匹配流程
    以下表示如果在1秒钟内创建定单并付款则完成购物操作

    val orderPayPattern = Pattern.begin[OrderEvent]("begin")
          .where(_.`type`.equals("create"))
          .next("next")
          .where(_.`type`.equals("pay"))
          .within(Time.seconds(1))
    

    创建侧输出流

    val orderTiemoutOutput = OutputTag[OrderEvent]("orderTimeout")
    

    把定单流应用到匹配流程中

    val patternStream = CEP.pattern(orderEventStream.keyBy("userId"), orderPayPattern)
    

    将正常定单流与侧超时流分开

    val complexResult = patternStream.select(orderTiemoutOutput) {
          (pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {
            val createOrder = pattern.get("begin")
            OrderEvent("timeout", createOrder.get.iterator.next().userId)
          }
        } {
          pattern: Map[String, Iterable[OrderEvent]] => {
            val payOrder = pattern.get("next")
            OrderEvent("success", payOrder.get.iterator.next().userId)
          }
        }
    

    将正常定单流与超时定单流打印输出

    val timeoutResult = complexResult.getSideOutput(orderTiemoutOutput)
    
    complexResult.print()
    timeoutResult.print()
    
    env.execute
    

    也可以自行添加Sink将消息发送到消息队列等

    timeoutResult.addSink(new SinkFunction[OrderEvent] {
          override def invoke(value: OrderEvent, context: SinkFunction.Context[_]): Unit = {
            //do something or send message
          }
        })
    

    完整项目

    JavaScala两个版本
    传送门 https://github.com/dounine/flink-cep-demos/tree/master/order-timeout


    相关文章

      网友评论

          本文标题:Flink - CEP监控下单付款超时通知

          本文链接:https://www.haomeiwen.com/subject/sxjvkctx.html