美文网首页
flink - 实时 - UV统计 - 布隆过滤器实现

flink - 实时 - UV统计 - 布隆过滤器实现

作者: 坨坨的大数据 | 来源:发表于2022-04-13 22:53 被阅读0次

    1.知识点

    • scala输入输出样例类
    • keyBy并行度为1计算UV的技巧

    map(data => ("uv", data.userId))..keyBy(_._1)

    • keyBy并行度>1 计算UV的技巧

    自定义MapFunction,随机自定义key+"uv"

    Random.nextString(10) + "uv"

    • WindowedStream.trigger的使用
      trigger触发器,每来一条数据直接清空窗口,放到redis进行计算
    • trigger返回WindowedStream,继续调用process(ProcessWindowFunction)
    • WindowedStream.process()的使用
      windowStream调用接口
    • 布隆过滤器的实现

    2.业务目标

    滚动输出最近1小时内的PV

    窗口:1小时

    指标:点击量

    3.流程心法

    总流程:创建输入输出类--->执行环境--->transform转换--->各类窗口函数的调用

    主Object:

    1.创建执行环境,设置时间语义,并行度等

    2.transform api map转换为输入样例类,并设置watermark

    3.key 定义成常量"v",那么keyBy就分为同一组,如果并行则可以自定义mapFunction

    4.实现trigger

    5.实现processWindowFunction

    4.模块详解

    4.1 创建输入输出样例类

    4.2 主object实现

    4.2.1 创建执行环境并添加数据源

    val env = StreamExecutionEnvironment.getExecutionEnvironment    
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    
      env.setParallelism(1)     
    // 从文件中读取数据    
    val resource = getClass.getResource("/UserBehavior.csv")    
    val inputStream: DataStream[String] = env.readTextFile(resource.getPath)
    

    4.2.2 Datastream map转换为输入样例类

     // 转换成样例类类型并提取时间戳和watermark    
    val dataStream: DataStream[UserBehavior] = inputStream      
      .map(data => {        
        val arr = data.split(",")        
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)      
      })      
      .assignAscendingTimestamps(_.timestamp * 1000L)
    

    4.2.3 处理逻辑(1)----filter类型,timeWindow

    val uvStream = dataStream      
        .filter(_.behavior == "pv")      
        .map( data => ("uv", data.userId) ) //如果要并行,并行自定义mymapper      
        .keyBy(_._1)      
        .timeWindow(Time.hours(1))  //滚动窗口      
        .trigger(new MyTrigger())  //trigger触发器,每来一条数据直接清空窗口,放到redis计算。      
        .process( new UvCountWithBloom() )
    

    4.2.4 处理逻辑(2)----Trigger实现

    class MyTrigger() extends Trigger[(String,Long),TimeWindow]{  
      override def onElement(t: (String, Long), l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = {     
        TriggerResult.FIRE_AND_PURGE   
      }   
      //系统时间有进展时做什么操作  
      override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE   
    
      //watermark改变做什么操作  
      override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE   
    
      override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = {   
      }
    }
    

    4.2.5 处理逻辑(2)----ProcessWindowFunction实现

    1.定义redis中存储位图的key ,本例为窗口结束时间

    2.定义一个redis hash表,保存统计之后的每个窗口结束时间的uv count.

    表名:uvcount

    KEY: 窗口结束时间

    VALUE:uv count值

    3. 对userid进行hash, 从位图中查看hash后的偏移量是否窜在,若存在则uvcount不操作。若不存在则uvcount+1,位图也相应更新

    class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow]{  
      // 定义redis连接以及布隆过滤器  
      lazy val jedis = new Jedis("localhost", 6379)  
      lazy val bloomFilter = new Bloom(1<<29)    // 2的29次方,1左移29位。 位的个数:2^6(64) * 2^20(1M) * 2^3(8bit) ,64MB   
    
      // 本来是收集齐所有数据、窗口触发计算的时候才会调用;现在每来一条数据都调用一次  
      override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {    
        // 先定义redis中存储位图的key    
        val storedBitMapKey = context.window.getEnd.toString      
        
        //另外将当前窗口的uv count值,作为状态保存到redis里,用一个叫做uvcount的hash表来保存(windowEnd,count)    
        val uvCountMap = "uvcount"    
        val currentKey = context.window.getEnd.toString    
        var count = 0L     
    
        // 从redis中取出当前窗口的uv count值    
        if(jedis.hget(uvCountMap, currentKey) != null)      
          count = jedis.hget(uvCountMap, currentKey).toLong     
        
        // 去重:判断当前userId的hash值对应的位图位置,是否为0    
        val userId = elements.last._2.toString    
        // 计算hash值,就对应着位图中的偏移量    
        val offset = bloomFilter.hash(userId, 61)    
        val isExist = jedis.getbit(storedBitMapKey, offset)     
    
        if(!isExist){      
          // 如果不存在,那么位图对应位置置1,并且将count值加1            
          jedis.setbit(storedBitMapKey, offset, true)      
          jedis.hset(uvCountMap, currentKey, (count + 1).toString)    
        }  
      }
    }
    

    4.2.6 处理逻辑(3)----布隆过滤器实现

    也可以调用外部google等现成的布隆过滤器.

    设计布隆过滤器的要点:
    1.选好点的hash函数
    2.不同userid经过hash到同一位上。不要那么稠密。
    即1亿的user,我们给出2亿的位,出现碰撞的概率就特别小。
    10B * 1亿,大概1GB, 用位来存,1bit * 1亿 大概10m,放redis放内存都是个很好的 选择。
    即使我们扩大位防止碰撞,放6亿,也是68M,可以放到redis中。有可能出现hash碰撞

    class Bloom(size: Long) extends Serializable{  
      private val cap = size    // 默认cap应该是2的整次幂   
    
      //hash函数 value即userid,seed随机数种子  
      def hash(value: String, seed: Int): Long = {    
        var result = 0    
        //遍历userid,对每一位进行随机数种子的处理    
        for( i <- 0 until value.length ){      
          result = result * seed + value.charAt(i)    
        }     
        // 返回hash值,要映射到cap范围内    
        (cap - 1) & result  
      }
    }
    

    4.3 完整代码






    相关文章

      网友评论

          本文标题:flink - 实时 - UV统计 - 布隆过滤器实现

          本文链接:https://www.haomeiwen.com/subject/eukasrtx.html