美文网首页
Spark-streaming监控批次处理信息

Spark-streaming监控批次处理信息

作者: 海轩_fan | 来源:发表于2020-07-24 15:44 被阅读0次

    实时任务监控原因

    在实时任务执行的过程中,由于数据突然激增或网络阻塞等情况,使得任务数据堆积或失败等

    解决办法

    通过实现SparkListener和StreamingListener重写其中的相关方法,便能监控任务的执行情况,对症下药

    class NeiStreamingListener(ssc:StreamingContext,delay:Long) extends SparkListener with StreamingListener{
      override def onBatchCompleted(batchStarted: StreamingListenerBatchCompleted): Unit = synchronized{
        //  获取任务名称
        val appName = ssc.sparkContext.appName
        if (batchStarted != null){
          val batchInfo = batchStarted.batchInfo
          // 接收批次数据量
          val numRecords = batchInfo.numRecords
          //   调度延迟时间
          val schedulingTime = batchInfo.schedulingDelay.getOrElse(0L)
          //   执行时间
          val processingTime = batchInfo.processingDelay.getOrElse(0L)
          //  总延迟时间
          val totalDelay = batchInfo.totalDelay.getOrElse(0L)
          if(delay != -1 && schedulingTime > delay ){
            val yarnManager = new YarnAppManager()
            yarnManager.getJobState()
            val sparMap = yarnManager.sparkMap()
            if (sparMap.containsKey(appName) ) {
              val appid: String = sparMap.get(appName).getApplicationId.toString
              //  可以在此处添加对应逻辑,告警邮件或短信等
            }
          }
        }
      }
    }
    

    以上代码为重写onBatchCompleted方法,获取实时任务中批次数据处理延迟时间,并通过yarn获取到任务的applicationId;

    相关文章

      网友评论

          本文标题:Spark-streaming监控批次处理信息

          本文链接:https://www.haomeiwen.com/subject/fkyjlktx.html