美文网首页
累加器应用-日志采集

累加器应用-日志采集

作者: ryancao_b9b9 | 来源:发表于2020-05-02 20:45 被阅读0次

一、需求背景
在Driver端统一采集各Executor输出的日志信息(后续将汇聚至ES中)

二、软件版本(Spark 2.3)
Spark2.x之后,之前的的accumulator被废除,用AccumulatorV2代替;
累加器(accumulator):Accumulator是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它们可用于实现计数器(如MapReduce)或总和计数。
Accumulator是存在于Driver端的,Executor节点不断把值发到Driver端,在Driver端计数(Spark UI在SparkContext创建时被创建,即在Driver端被创建,因此它可以读取Accumulator的数值),存在于Driver端的一个值,从节点是读取不到的。
Spark提供的Accumulator主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,但是却给我们提供了多个task对于同一个变量并行操作的功能,但是task只能对Accumulator进行累加操作,不能读取它的值,只有Driver程序可以读取Accumulator的值。
自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。

注意:用累加器的过程中只能使用一次action的操作才能保证结果的准确性,否则,要使用cache,persist将之前的依赖截断。不然你action第二次,将会连着第一次的结果加上

三、AccumulatorV2接口梳理

isZero: 当AccumulatorV2中存在类似数据不存在这种问题时,是否结束程序。 
copy: 拷贝一个新的AccumulatorV2 
reset: 重置AccumulatorV2中的数据 
add: 操作数据累加方法实现 
merge: 合并数据 
value: AccumulatorV2对外访问的数据结果

四、自定义累加器实现部分代码
1、LogAccumulator

class LogAccumulator extends AccumulatorV2[LogKey, ConcurrentLinkedQueue[String]] {

  // 用于限定日志最少保存量,防止当日志量达到maxLogSize时频繁的进行clear操作
  private lazy val minLogSize = PropUtils.getInt(PropKeys.SPARK_FIRE_ACC_LOG_MIN_SIZE, DefaultVals.minLogSize).abs
  // 用于限定日志最大保存量,防止日志量过大,撑爆driver
  private lazy val maxLogSize = PropUtils.getInt(PropKeys.SPARK_FIRE_ACC_LOG_MAX_SIZE, DefaultVals.maxLogSize).abs
  // 用于存放日志的队列
  private val logQueue = new ConcurrentLinkedQueue[String]
  // 判断是否打开日志累加器
  private lazy val isEnable = PropUtils.getBoolean(PropKeys.SPARK_FIRE_ACC_ENABLE, true) && PropUtils.getBoolean(PropKeys.SPARK_FIRE_ACC_LOG_ENABLE, true)

  /**
    * 判断累加器是否为空
    */
  override def isZero: Boolean = this.logQueue.size() == 0

  /**
    * 用于复制累加器
    */
  override def copy(): AccumulatorV2[LogKey, ConcurrentLinkedQueue[String]] = new LogAccumulator

  /**
    * driver端执行有效,用于清空累加器
    */
  override def reset(): Unit = this.logQueue.clear

  /**
    * executor端执行,用于收集日志信息
    *
    * @param timeCost
    * 日志记录对象
    */
  override def add(logKey: LogKey): Unit = {
    if (this.isEnable && logKey != null) {
      this.logQueue.add(JSON.toJSONString(logKey, SerializerFeature.WriteNullStringAsEmpty))
      this.clear
    }
  }

  /**
    * executor端向driver端merge累加数据
    *
    * @param other
    * executor端累加结果
    */
  override def merge(other: AccumulatorV2[LogKey, ConcurrentLinkedQueue[String]]): Unit = {
    if (other != null && other.value.size() > 0) {
      this.logQueue.addAll(other.value)
      this.clear
    }
  }

  /**
    * driver端获取累加器的值
    *
    * @return
    * 收集到的日志信息
    */
  override def value: ConcurrentLinkedQueue[String] = this.logQueue

  /**
    * 当日志累积量超过maxLogSize所设定的值时清理过期的日志数据
    * 直到达到minLogSize所设定的最小值,防止频繁的进行清理
    */
  def clear: Unit = {
    if (this.logQueue.size() > this.maxLogSize) {
      while (this.logQueue.size() > this.minLogSize) {
        this.logQueue.poll
      }
    }
  }
}

2、LogKey

public class LogKey implements Serializable {
    private String id = UUID.randomUUID().toString();
    private String ip;
    private String load;
    private static String applicationId;
    private static String executorId;
    private Integer stageId;
    private Long taskId;
    private Integer partitionId;
    private String stackTraceInfo;
    private String level = "WARN";
    private Long start;
    private String startTime;
    private String endTime;
    ...
}

相关文章

  • 累加器应用-日志采集

    一、需求背景在Driver端统一采集各Executor输出的日志信息(后续将汇聚至ES中) 二、软件版本(Spar...

  • 流量回放原理

    流量回放是通过日志采集再处理来获取所需要的数据 日志采集 通过logstash工具从应用容器拉取日志信息,可以根据...

  • 流量回放原理

    流量回放是通过日志采集再处理来获取所需要的数据 日志采集 通过logstash工具从应用容器拉取日志信息,可以根据...

  • 字段修改产生的异常

    由于我们的项目的日志采集系统应用的是logback-spring 日志采集所以很多错误由于日志级别的更改打印出来的...

  • Kubernetes日志收集方案

    承接上文Kubernetes部署应用服务,本文介绍基于ELK的k8s应用日志收集 一、日志架构 日志采集方式一般分...

  • 第一章 总述

    大数据系统体系:数据采集、数据计算、数据服务、数据应用 一. 数据采集层: 1)web端日志采集技术方案:Aplu...

  • 大数据之数据采集

    大数据体系一般分为:数据采集、数据计算、数据服务、以及数据应用 几大层次。 在数据采集层,主要分为 日志采集 和 ...

  • 《大数据之路》读书笔记

    第2章 日志采集 2.1 浏览器的页面日志采集 主要分为两类:页面展现日志采集、页面交互日志采集 2.1.1 页面...

  • 读书笔记|为产品经理总结阿里大数据之路

    阿里大数据体系分为四层,由下而上分别是数据采集层、数据计算层、数据服务层和数据应用层。 一、数据采集分为日志采集和...

  • flume采集应用日志信息

    Logger logger=Logger.getRootLogger(); 然后将war 包传到tomcat 的w...

网友评论

      本文标题:累加器应用-日志采集

      本文链接:https://www.haomeiwen.com/subject/njpcghtx.html