一、需求背景
在Driver端统一采集各Executor输出的日志信息(后续将汇聚至ES中)
二、软件版本(Spark 2.3)
Spark2.x之后,之前的的accumulator被废除,用AccumulatorV2代替;
累加器(accumulator):Accumulator是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它们可用于实现计数器(如MapReduce)或总和计数。
Accumulator是存在于Driver端的,Executor节点不断把值发到Driver端,在Driver端计数(Spark UI在SparkContext创建时被创建,即在Driver端被创建,因此它可以读取Accumulator的数值),存在于Driver端的一个值,从节点是读取不到的。
Spark提供的Accumulator主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,但是却给我们提供了多个task对于同一个变量并行操作的功能,但是task只能对Accumulator进行累加操作,不能读取它的值,只有Driver程序可以读取Accumulator的值。
自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。
注意:用累加器的过程中只能使用一次action的操作才能保证结果的准确性,否则,要使用cache,persist将之前的依赖截断。不然你action第二次,将会连着第一次的结果加上
三、AccumulatorV2接口梳理
isZero: 当AccumulatorV2中存在类似数据不存在这种问题时,是否结束程序。
copy: 拷贝一个新的AccumulatorV2
reset: 重置AccumulatorV2中的数据
add: 操作数据累加方法实现
merge: 合并数据
value: AccumulatorV2对外访问的数据结果
四、自定义累加器实现部分代码
1、LogAccumulator
class LogAccumulator extends AccumulatorV2[LogKey, ConcurrentLinkedQueue[String]] {
// 用于限定日志最少保存量,防止当日志量达到maxLogSize时频繁的进行clear操作
private lazy val minLogSize = PropUtils.getInt(PropKeys.SPARK_FIRE_ACC_LOG_MIN_SIZE, DefaultVals.minLogSize).abs
// 用于限定日志最大保存量,防止日志量过大,撑爆driver
private lazy val maxLogSize = PropUtils.getInt(PropKeys.SPARK_FIRE_ACC_LOG_MAX_SIZE, DefaultVals.maxLogSize).abs
// 用于存放日志的队列
private val logQueue = new ConcurrentLinkedQueue[String]
// 判断是否打开日志累加器
private lazy val isEnable = PropUtils.getBoolean(PropKeys.SPARK_FIRE_ACC_ENABLE, true) && PropUtils.getBoolean(PropKeys.SPARK_FIRE_ACC_LOG_ENABLE, true)
/**
* 判断累加器是否为空
*/
override def isZero: Boolean = this.logQueue.size() == 0
/**
* 用于复制累加器
*/
override def copy(): AccumulatorV2[LogKey, ConcurrentLinkedQueue[String]] = new LogAccumulator
/**
* driver端执行有效,用于清空累加器
*/
override def reset(): Unit = this.logQueue.clear
/**
* executor端执行,用于收集日志信息
*
* @param timeCost
* 日志记录对象
*/
override def add(logKey: LogKey): Unit = {
if (this.isEnable && logKey != null) {
this.logQueue.add(JSON.toJSONString(logKey, SerializerFeature.WriteNullStringAsEmpty))
this.clear
}
}
/**
* executor端向driver端merge累加数据
*
* @param other
* executor端累加结果
*/
override def merge(other: AccumulatorV2[LogKey, ConcurrentLinkedQueue[String]]): Unit = {
if (other != null && other.value.size() > 0) {
this.logQueue.addAll(other.value)
this.clear
}
}
/**
* driver端获取累加器的值
*
* @return
* 收集到的日志信息
*/
override def value: ConcurrentLinkedQueue[String] = this.logQueue
/**
* 当日志累积量超过maxLogSize所设定的值时清理过期的日志数据
* 直到达到minLogSize所设定的最小值,防止频繁的进行清理
*/
def clear: Unit = {
if (this.logQueue.size() > this.maxLogSize) {
while (this.logQueue.size() > this.minLogSize) {
this.logQueue.poll
}
}
}
}
2、LogKey
public class LogKey implements Serializable {
private String id = UUID.randomUUID().toString();
private String ip;
private String load;
private static String applicationId;
private static String executorId;
private Integer stageId;
private Long taskId;
private Integer partitionId;
private String stackTraceInfo;
private String level = "WARN";
private Long start;
private String startTime;
private String endTime;
...
}
网友评论