美文网首页
flink example: APP推广渠道统计

flink example: APP推广渠道统计

作者: 阿猫阿狗Hakuna | 来源:发表于2020-06-07 14:46 被阅读0次
    object AppMarketingByChannel {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        val dataStream = env.addSource(new SimulatedEventSource())
          .assignAscendingTimestamps(_.timestamp)
          .filter(_.behavior != "UNINSTALL")
          .map( data => {
            ( (data.channel, data.behavior), 1L)
          })
          .keyBy(_._1)  //以渠道和行为类型作为key进行分组
          .timeWindow(Time.hours(1), Time.seconds(10))
          .process(new MarketingCountByChannel())
    
        dataStream.print()
        env.execute("app marketing by channel")
      }
    
    }
    
    // 自定义数据源
    class SimulatedEventSource() extends RichSourceFunction[MarketingUserBehavior] {
      // 定义是否运行标志位
      var running = true
      // 定义用户行为集合
      val behivorTypes: Seq[String] = Seq("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL")
      // 定义渠道集合
      val channelSets: Seq[String] = Seq("WECHAT", "WEIBO", "APPSTORE", "HUAWEISTORE")
      // 定义随机数发生器
      val rand: Random = new Random()
    
      override def run(sourceContext: SourceFunction.SourceContext[MarketingUserBehavior]): Unit = {
        // 定义生成数据的上限
        val maxElements = Long.MaxValue
        var count = 0L
    
        //随机生成所有数据
        while(running && count < maxElements) {
          val id = UUID.randomUUID().toString
          val behavior = behivorTypes(rand.nextInt(behivorTypes.size))
          val channel = channelSets(rand.nextInt(channelSets.size))
          val ts = System.currentTimeMillis()
    
          sourceContext.collect(MarketingUserBehavior(id, behavior, channel, ts))
    
          count += 1
          TimeUnit.MILLISECONDS.sleep(10L)
        }
      }
    
      override def cancel(): Unit = {
        running = false
      }
    }
    
    class MarketingCountByChannel extends ProcessWindowFunction[((String, String), Long), MarketingViewCount, (String, String), TimeWindow] {
      override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Long)], out: Collector[MarketingViewCount]): Unit = {
        val startTs = new Timestamp(context.window.getStart).toString
        val endTs = new Timestamp(context.window.getEnd).toString
        val channel = key._1
        val behavior = key._2
        val count = elements.size
    
        out.collect(MarketingViewCount(startTs, endTs, channel, behavior, count))
      }
    }
    

    相关文章

      网友评论

          本文标题:flink example: APP推广渠道统计

          本文链接:https://www.haomeiwen.com/subject/jaggzhtx.html