美文网首页
利用Spark监听listener来监控任务完成进度

利用Spark监听listener来监控任务完成进度

作者: alexlee666 | 来源:发表于2019-10-24 21:01 被阅读0次

    一、背景

    当时在做数据湖的项目,需要使用Spark SQL做数据ETL,即并发地将全表数据从RDBMS经过数据转换等导入到HDFS中。由于Web UI上需要显示ETL的进度,因此需要能够指导当前导了多少个row。但是由于是多个executor并发地读取数据,而如何获取每个executor导了多少个row就是一个问题了,Spark SQL本身并没有提供这样的API。本文将介绍如何使用Spark监听listener来预估任务完成的进度。


    二、实现方法

    • 首先,自定义一个监听类,并继承SparkListener并override方法;
    • 实例化该监听类得到监听器对象,sparkcontex添加该监听器对象即可。

    三、业务代码示例

    import org.apache.spark.scheduler._
    import org.slf4j.LoggerFactory
    
    /*
    * This class is used to listen the progress of submitted spark job
    * The number of completed tasks will be counted
    * In this way, the rough progress of submitted spark job can be estimated
    * */
    class MySparkListener(instanceName:String,schemaName:String,tableName:String,ceilNum:Long,rowCount:Long,parallelismNum:Int,partitionNum:Int) extends SparkListener{
    
      val logger = LoggerFactory.getLogger(classOf[MySparkListener])
      var taskCount: Int = 0
      override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
        super.onApplicationStart(applicationStart)
        logger.info("\n\n\n>>>>>> Spark application started")
      }
    
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
        super.onApplicationEnd(applicationEnd)
        logger.info("\n\n\n>>>>>> Spark application ended")
      }
    
      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
        super.onJobEnd(jobEnd)
      }
    
      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
        super.onStageCompleted(stageCompleted)
      }
    
      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
        super.onTaskEnd(taskEnd)
        taskCount = taskCount + 1
        if(taskCount <= parallelismNum + 1){
          val sparkJobProgress = Math.floor(rowCount*(0.1+taskCount*0.76/(parallelismNum+1))).toLong
          if(sparkJobProgress <= rowCount){
              // 处理逻辑,更新进度......
          }
         }
       }
    }
    
    
    
    object Main {
      def main(args: Array[String]): Unit = {
          val sparkSession = SparkSession.builder().master("yarn").appName("Datalake")getOrCreate()
          val sc = sparkSession.sparkContext
           logger.info(">>>>>> start spark listener")
           val sparkListener = new      MySparkListener(instanceName,schemaName,tableName,ceilNum,rowCount,parallelismNum,partitionNum)
           sc.addSparkListener(sparkListener)
    
    

    如有错误,敬请指正!

    相关文章

      网友评论

          本文标题:利用Spark监听listener来监控任务完成进度

          本文链接:https://www.haomeiwen.com/subject/xqylvctx.html