美文网首页
流式计算概述和Spark Streaming tips

流式计算概述和Spark Streaming tips

作者: 这个该叫什么呢 | 来源:发表于2018-01-17 15:24 被阅读0次

    流式计算概述

    常规计算引擎分类

    1. 批处理
      • 高吞吐,低延迟
      • 面向静态数据集合的处理
      • 分钟甚至小时级别延迟
      • 比如MR, Spark
    2. 流式计算
      • 面向行级别数据处理
      • 毫秒级延迟
      • 比如storm

    流式计算分类

    1. 面向行
      Apache Flink --- 收集一堆数据,然后一行一行处理
      Storm
    2. 面向micro-Batch
      Spark Streaming --- 收集一堆数据,然后一起处理

    流式计算通用户环节

    数据源 ---> 数据缓存 ---> 流式引擎 ---> 结果存储

    流式计算计算方式

    1. 固定窗口
      Spark Streaming 常规支持的方式

    2. 滑动窗口( window )

    3. 会话计算( mapWithStates )
      存储Spark Streaming的状态信息(类似session),可以进行过期处理

    Spark Streaming编程要点

    Spark Streaming: exactly once delivery
    特殊情况:故障重算,推测执行等

    1. 监控和管理 jobs
    • where to run the driver?
      Yarn cluster mode. Driver will continue to running when the client machine goes down.
    • How to restart driver ?
      set up automatic restart.
      In spark configuration (e.g. spark-defaults.conf):
    spark.yarn.maxAppAttempts=2  // 重试尝试次数
    spark.yarn.am.attemptFailuresValidityInterval=1h  // 重置尝试次数的时间
    spark.yarn.max.executor.failures={8 * num_executors}  // executor失败的最大次数
    spark.yarn.executor.failuresValidityInterval=1h // 重置失败的时间
    spark.task.maxFailures=8   // task重试次数 默认是4
    spark.speculation=true  //预测执行, 前提:task是幂等
    
    • Summary
      各种Listener接口
    1. 如何优雅的管理streaming app
      思路: Thread hooks – Check for an external flag every N seconds
    /** * Stop the execution of the streams, with option of ensuring all received data 
        * has been processed. 
        *
        * * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext 
        * will be stopped regardless of whether this StreamingContext has been 
        * started. 
        * @param stopGracefully if true, stops gracefully by waiting for the processing of all 
        * received data to be completed 
        */ 
        def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
        receiverTracker.stop(processAllReceivedData) //default is to wait 10 second, grace waits until done jobGenerator.stop(processAllReceivedData) // Will use spark.streaming.gracefulStopTimeout 
        jobExecutor.shutdown() 
        val terminated = if (processAllReceivedData) { 
            jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time 
        } else { 
            jobExecutor.awaitTermination(2, TimeUnit.SECONDS) 
        } 
        if (!terminated) { jobExecutor.shutdownNow() 
        }
    

    How to be graceful?
    • 方法一 cmd line
    – $SPARK_HOME_DIR/bin/spark-submit --master $MASTER_REST_URL --kill $DRIVER_ID
    – spark.streaming.stopGracefullyOnShutdown=true

    private def stopOnShutdown(): Unit = { 
    val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false) 
    logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook") 
    // Do not stop SparkContext, let its own shutdown hook stop it 
    stop(stopSparkContext = false, stopGracefully = stopGracefully) }
    

    • 方法二 By marker file 推荐
    – Touch a file when starting the app on HDFS
    – Remove the file when you want to stop
    – Separate thread in Spark app, calls

    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
    

    相关文章

      网友评论

          本文标题:流式计算概述和Spark Streaming tips

          本文链接:https://www.haomeiwen.com/subject/qiluoxtx.html