美文网首页
数据分析实践 | flink | 流程优化篇

数据分析实践 | flink | 流程优化篇

作者: Sevsea | 来源:发表于2018-11-25 17:47 被阅读0次

    0x01flink执行流程了解一下

    流程如下:

    flink执行流程
    由一个Source数据处理,结果分发到四个窗口进行处理。
    

    0x02表象:

    flink需要优化,最先表现出来的现状就是:
    窗口中使用metric体现出每秒的数据处理量很低,或停止。

    1.代码中添加metric使用方法可参考:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
    2.如果使用flink dashboard也可以使用metric�功能进行统计
    

    此处以flink dashboard为例。

    0x03问题点及优化:

    1.数据反压

    数据体现(背压(Backpressure)机制):
    -> 一个window中数据处理的速率慢
    -> 导致Source数据处理过程越来越慢
    -> 再导致所有窗口处理越来越慢。
    
    dashboard体现:

    dashboard可以在背压这里看到HIGH时,则存在数据反压问题。


    flink数据反压

    反压逻辑:
    若流程为A-B-C-D-E-F ,ABCDE出现反压(即这里status为high),则表示F处理流程导致E -> D-> C->B ->A 相继慢。

    优化方式:

    1.数据标记分流[详细代码见通用优化]
    2.窗口优化[详细代码见通用优化]

    2.数据倾斜

    在多进程环境下:

    数据体现:
    -> 每个窗口中所有数据的分布不平均,某个窗口处理数据量太大导致速率慢。
    -> 导致Source数据处理过程越来越慢
    -> 再导致所有窗口处理越来越慢。
    
    dashboard体现:
    dashboard中Subtasks中打开每个窗口可以看到每个窗口进程的运行情况:
    
    flink数据倾斜

    如上图,数据分布很不均匀,导致部分窗口数据处理缓慢。

    优化方式:

    1.数据标记分流[详细代码见通用优化]
    2.窗口优化[详细代码见通用优化]
    3.在不影响逻辑的前提下,keyby对数据分流时选择较为均匀的数据。

    3.消费滞后

    尚未出现数据反压和数据倾斜的状况,但是flink的watermarks追不上实时时间,不能实时处理。

    需单进程确认点:

    1. flink读取的数据是否产生的及时。
    2. 窗口Aggregate处理是否存在死循环或较慢的点
        (如:正则/redis/http等)
    3. flink计算结果的输出处理慢。
        (如:使用.disablechain.addsink()后再在dashboard中查看窗口和输出分别处理的速率)
    

    可优化点:

    1. 将窗口的处理逻辑优化的简单一些,将较长时间的处理放在数据处理部分或windowFunction部分。

    4.需在窗口内做大量的外连情况,如redis/es等,redis连接过多会慢或直接报错。[2019.11.17更新]

    解决方案:

    1.可以在窗口外面申请全局redis连接池作为全局变量。

    class MyProcessWindowFunction extends RichWindowFunction[Accumulator,String,String,TimeWindow] {
      @transient var config_redis = new JedisPoolConfig()
      config_redis.setMaxTotal(300)
      config_redis.setMaxWaitMillis (2*1000)
    
      @transient var jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
      @transient var client = Esinit() // 此处为es外联的申明
      @transient var log = LoggerFactory.getLogger(getClass)
      //其他的一些全局变量也可以在这里定义,如log
      LoginCheck_api.KeepSession() 
      //检查保持状态的函数也可以在这里处理,这样不会每个窗口都处理一遍。
    
      override def apply (key: String, window: TimeWindow, input: Iterable[Accumulator], out: Collector[String]): Unit = {
        ... 
        //窗口如果定义为null则重新做定义
        if(jedisPool==null){
          w_log = LoggerFactory.getLogger(getClass)
        
          config_redis = new JedisPoolConfig()
          config_redis.setMaxTotal(300)
          config_redis.setMaxWaitMillis (2*1000)
          jedisPool = new JedisPool(config_redis,"10.10.10.10.",1234,0,"yourpassword")
          LoginCheck_api.KeepSession()
        }
        if(client==null){
          client = Esinit()
        }
        ...
    
    

    2.网络延时问题[2019.12.4更新]
    场景:flink反压,且排查redis无太多慢查日志
    检查提交集群对redis的延时情况,正常应该在0.099ms以内不会影响到程序的处理过程。

    3.将对外操作放进单独多线程操作(如果上述两个问题都解决不了问题)[2019.12.4更新]
    以redis举例:

    import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask}
    
    import redis.clients.jedis.{JedisPool, JedisPoolConfig}
    
    object ThreadPool {
      var config_redis = new JedisPoolConfig()
      config_redis.setMaxTotal(500)
      config_redis.setMaxIdle(500)
      config_redis.setBlockWhenExhausted(false)
      config_redis.setMaxWaitMillis (1000)
      config_redis.setMinEvictableIdleTimeMillis(6000)
      config_redis.setTimeBetweenEvictionRunsMillis(3000)
      var jedisPool = new JedisPool(config_redis,"10.10.10.10",1234,0,"yourpassword")
      val threadPool:ExecutorService=Executors.newFixedThreadPool(500)
      def sadd(key:String,value:String):Int= {
        var r = 1
        try {
          val future=new FutureTask[String](new Callable[String] {
            override def call():String = {
              var isexists = 1L //sadd返回1为添加成功,0为已存在/添加不成功
              var jedis = jedisPool.getResource
              try{
                isexists = jedis.sadd(bolt_url,id_str)
              }catch{
                case e=>
              }finally {
                jedis.close()
              }
              return isexists.toString
            }
          })
          threadPool.execute(future)
          r = future.get().toInt //导出结果
          if(r==1){
          ...//逻辑操作
          }else{
          ...//逻辑操作
          }
        }finally {
    //      threadPool.shutdown()
        }
        return r//可选择是否返回结果
      }
    
      def main (args: Array[String]): Unit = {
        var t =sadd("a","b")
        println(t) 
        threadPool.shutdown()
      }
    
    
    }
    
    

    而后在窗口中调用ThreadPool.sadd方法,获取到redis操作结果后的逻辑操作也可在窗口外进行,窗口只负责调度。

    5.通用优化:

    1.数据标记分流:
    
    使用数据标记过滤进入窗口的数据,
    而非使用filter,map等方式去筛选数据。
    split分流 select选择分流. 
    
    val frequency_ = Features.split(
            (s:Map[String,Any])=>
              s.get("method").get.toString  match{
              
                case "a"|"b"|"c"|
                      => List("str")
                case "1"|"2"
                      =>List("int")
                case _
                      =>List("normal")
              }
    
          )
    
    val all = frequency_.select("str","int").assignTimestampsAndWatermarks(new TimestampExtractor())
    
    all.keyby().aggregate()
          
          ...
          
         
        
    Ps. https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
      
    
    2.窗口聚合计算
    
    window apply窗口最后触发时进行一次性计算 aggregate来一条数据计算一次。
    
    Ps.https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/
    
    3.keyby关键词无法自行选择较均匀的情况下,
    可以采用keyby(Random(20)+key)的形式进行分配窗口。
    
    最好的方式:
    原有DataStream中添加专门用于分窗口的字段,但是可能会影响你窗口聚合的结果。
    
    def dealing_input(str):(String,String){
        val keyby_key = scala.util.Random.nextInt(20).toString+"-"+key
        return (data,keyby_key)
    }
    
    input.keyby(_._2).window().xxx
    如何在处理完将随时数去掉请参考另一篇文章:
    https://www.jianshu.com/p/1bca3c2758c1
    
    

    遇坑待更新

    相关文章

      网友评论

          本文标题:数据分析实践 | flink | 流程优化篇

          本文链接:https://www.haomeiwen.com/subject/dsapqqtx.html