序
本文主要研究一下LoggingEventAsyncDisruptorAppender
LoggingEventAsyncDisruptorAppender
net/logstash/logback/appender/LoggingEventAsyncDisruptorAppender.java
public class LoggingEventAsyncDisruptorAppender extends DelegatingAsyncDisruptorAppender<ILoggingEvent, AppenderListener<ILoggingEvent>> {
/**
* Set to true if the caller data should be captured before publishing the event
* to the {@link RingBuffer}
*/
private boolean includeCallerData;
protected void prepareForDeferredProcessing(ILoggingEvent event) {
super.prepareForDeferredProcessing(event);
if (includeCallerData) {
event.getCallerData();
}
}
public boolean isIncludeCallerData() {
return includeCallerData;
}
public void setIncludeCallerData(boolean includeCallerData) {
this.includeCallerData = includeCallerData;
}
}
LoggingEventAsyncDisruptorAppender继承了DelegatingAsyncDisruptorAppender,它定义了includeCallerData属性,其prepareForDeferredProcessing在includeCallerData为true时执行event.getCallerData()
DelegatingAsyncDisruptorAppender
net/logstash/logback/appender/DelegatingAsyncDisruptorAppender.java
public abstract class DelegatingAsyncDisruptorAppender<Event extends DeferredProcessingAware, Listener extends AppenderListener<Event>> extends AsyncDisruptorAppender<Event, Listener> implements AppenderAttachable<Event> {
/**
* The delegate appenders.
*/
private final AppenderAttachableImpl<Event> appenders = new AppenderAttachableImpl<>();
@Override
protected EventHandler<LogEvent<Event>> createEventHandler() {
return new DelegatingEventHandler();
}
@Override
public void start() {
startDelegateAppenders();
super.start();
}
@Override
public void stop() {
if (!isStarted()) {
return;
}
super.stop();
stopDelegateAppenders();
}
private void startDelegateAppenders() {
for (Iterator<Appender<Event>> appenderIter = appenders.iteratorForAppenders(); appenderIter.hasNext();) {
Appender<Event> appender = appenderIter.next();
if (appender.getContext() == null) {
appender.setContext(getContext());
}
if (!appender.isStarted()) {
appender.start();
}
}
}
private void stopDelegateAppenders() {
for (Iterator<Appender<Event>> appenderIter = appenders.iteratorForAppenders(); appenderIter.hasNext();) {
Appender<Event> appender = appenderIter.next();
if (appender.isStarted()) {
appender.stop();
}
}
}
@Override
public void addAppender(Appender<Event> newAppender) {
appenders.addAppender(newAppender);
}
@Override
public Iterator<Appender<Event>> iteratorForAppenders() {
return appenders.iteratorForAppenders();
}
@Override
public Appender<Event> getAppender(String name) {
return appenders.getAppender(name);
}
@Override
public boolean isAttached(Appender<Event> appender) {
return appenders.isAttached(appender);
}
@Override
public void detachAndStopAllAppenders() {
appenders.detachAndStopAllAppenders();
}
@Override
public boolean detachAppender(Appender<Event> appender) {
return appenders.detachAppender(appender);
}
@Override
public boolean detachAppender(String name) {
return appenders.detachAppender(name);
}
}
DelegatingAsyncDisruptorAppender继承了AsyncDisruptorAppender,它定义了AppenderAttachableImpl,其createEventHandler创建的是DelegatingEventHandler,其start方法会执行startDelegateAppenders,其stop方法会执行stopDelegateAppenders,其addAppender会添加appender到AppenderAttachableImpl中
DelegatingEventHandler
private class DelegatingEventHandler implements EventHandler<LogEvent<Event>> {
/**
* Whether exceptions should be reported with a error status or not.
*/
private boolean silentError;
@Override
public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception {
boolean exceptionThrown = false;
for (Iterator<Appender<Event>> it = appenders.iteratorForAppenders(); it.hasNext();) {
Appender<Event> appender = it.next();
try {
appender.doAppend(logEvent.event);
/*
* Optimization:
*
* If any of the delegate appenders are instances of OutputStreamAppender or Flushable,
* then flush them at the end of the batch.
*/
if (endOfBatch) {
flushAppender(appender);
}
} catch (Exception e) {
exceptionThrown = true;
if (!this.silentError) {
addError(String.format("Unable to forward event to appender [%s]: %s", appender.getName(), e.getMessage()), e);
}
}
}
this.silentError = exceptionThrown;
}
private void flushAppender(Appender<Event> appender) throws IOException {
// Similar to #doAppend() - don't flush if appender is stopped
if (!appender.isStarted()) {
return;
}
if (appender instanceof Flushable) {
flushAppender((Flushable) appender);
} else if (appender instanceof OutputStreamAppender) {
flushAppender((OutputStreamAppender<Event>) appender);
}
}
private void flushAppender(OutputStreamAppender<Event> appender) throws IOException {
if (!appender.isImmediateFlush()) {
OutputStream os = appender.getOutputStream();
if (os != null) {
os.flush();
}
}
}
private void flushAppender(Flushable appender) throws IOException {
appender.flush();
}
}
DelegatingEventHandler实现了EventHandler接口,其onEvent方法主要是遍历AppenderAttachableImpl,挨个执行appender.doAppend(logEvent.event),在endOfBatch的时候会执行flushAppender
AsyncDisruptorAppender
net/logstash/logback/appender/AsyncDisruptorAppender.java
public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAware, Listener extends AppenderListener<Event>> extends UnsynchronizedAppenderBase<Event> {
/**
* Time in nanos to wait between drain attempts during the shutdown phase
*/
private static final long SLEEP_TIME_DURING_SHUTDOWN = 50 * 1_000_000L; // 50ms
protected static final String APPENDER_NAME_FORMAT = "%1$s";
protected static final String THREAD_INDEX_FORMAT = "%2$d";
public static final String DEFAULT_THREAD_NAME_FORMAT = "logback-appender-" + APPENDER_NAME_FORMAT + "-" + THREAD_INDEX_FORMAT;
public static final int DEFAULT_RING_BUFFER_SIZE = 8192;
public static final ProducerType DEFAULT_PRODUCER_TYPE = ProducerType.MULTI;
public static final WaitStrategy DEFAULT_WAIT_STRATEGY = new BlockingWaitStrategy();
public static final int DEFAULT_DROPPED_WARN_FREQUENCY = 1000;
private static final RingBufferFullException RING_BUFFER_FULL_EXCEPTION = new RingBufferFullException();
static {
RING_BUFFER_FULL_EXCEPTION.setStackTrace(new StackTraceElement[] {new StackTraceElement(AsyncDisruptorAppender.class.getName(), "append(..)", null, -1)});
}
/**
* The size of the {@link RingBuffer}.
* Defaults to {@value #DEFAULT_RING_BUFFER_SIZE}.
* <p>
* Must be a positive power of 2.
*/
private int ringBufferSize = DEFAULT_RING_BUFFER_SIZE;
/**
* The {@link ProducerType} to use to configure the Disruptor.
* Only set to {@link ProducerType#SINGLE} if only one thread
* will ever be appending to this appender.
*/
private ProducerType producerType = DEFAULT_PRODUCER_TYPE;
/**
* The {@link WaitStrategy} to used by the RingBuffer
* when pulling events to be processed by {@link #eventHandler}.
* <p>
* By default, a {@link BlockingWaitStrategy} is used, which is the most
* CPU conservative, but results in a higher latency.
* If you need lower latency (at the cost of higher CPU usage),
* consider using a {@link SleepingWaitStrategy} or a {@link PhasedBackoffWaitStrategy}.
*/
private WaitStrategy waitStrategy = DEFAULT_WAIT_STRATEGY;
/**
* Pattern used by the {@link WorkerThreadFactory} to set the
* handler thread name.
* Defaults to {@value #DEFAULT_THREAD_NAME_FORMAT}.
* <p>
*
* If you change the {@link #threadFactory}, then this
* value may not be honored.
* <p>
*
* The string is a format pattern understood by {@link Formatter#format(String, Object...)}.
* {@link Formatter#format(String, Object...)} is used to
* construct the actual thread name prefix.
* The first argument (%1$s) is the string appender name.
* The second argument (%2$d) is the numerical thread index.
* Other arguments can be made available by subclasses.
*/
private String threadNameFormat = DEFAULT_THREAD_NAME_FORMAT;
/**
* When true, child threads created by this appender will be daemon threads,
* and therefore allow the JVM to exit gracefully without
* needing to explicitly shut down the appender.
* Note that in this case, it is possible for log events to not
* be handled.
* <p>
*
* When false, child threads created by this appender will not be daemon threads,
* and therefore will prevent the JVM from shutting down
* until the appender is explicitly shut down.
* Set this to false if you want to ensure that every log event
* prior to shutdown is handled.
* <p>
*
* If you change the {@link #threadFactory}, then this
* value may not be honored.
*/
private boolean useDaemonThread = true;
/**
* When true, if no status listener is registered, then a default {@link OnConsoleStatusListener}
* will be registered, so that error messages are seen on the console.
*/
private boolean addDefaultStatusListener = true;
/**
* For every droppedWarnFrequency consecutive dropped events, log a warning.
* Defaults to {@value #DEFAULT_DROPPED_WARN_FREQUENCY}.
*/
private int droppedWarnFrequency = DEFAULT_DROPPED_WARN_FREQUENCY;
/**
* The {@link ThreadFactory} used to create the handler thread.
*/
private ThreadFactory threadFactory = new WorkerThreadFactory();
/**
* The {@link Disruptor} containing the {@link RingBuffer} onto
* which to publish events.
*/
private Disruptor<LogEvent<Event>> disruptor;
/**
* Sets the {@link LogEvent#event} to the logback Event.
* Used when publishing events to the {@link RingBuffer}.
*/
private EventTranslatorOneArg<LogEvent<Event>, Event> eventTranslator = new LogEventTranslator<>();
/**
* Defines what happens when there is an exception during
* {@link RingBuffer} processing.
*/
private ExceptionHandler<LogEvent<Event>> exceptionHandler = new LogEventExceptionHandler();
/**
* Consecutive number of dropped events.
*/
private final AtomicLong consecutiveDroppedCount = new AtomicLong();
/**
* The {@link EventFactory} used to create {@link LogEvent}s for the RingBuffer.
*/
private LogEventFactory<Event> eventFactory = new LogEventFactory<>();
/**
* Incrementor number used as part of thread names for uniqueness.
*/
private final AtomicInteger threadNumber = new AtomicInteger(1);
/**
* These listeners will be notified when certain events occur on this appender.
*/
protected final List<Listener> listeners = new ArrayList<>();
/**
* Maximum time to wait when appending events to the ring buffer when full before the event
* is dropped. Use the following values:
* <ul>
* <li>{@code -1} to disable timeout and wait until space becomes available.
* <li>{@code 0} for no timeout and drop the event immediately when the buffer is full.
* <li>{@code > 0} to retry during the specified amount of time.
* </ul>
*/
private Duration appendTimeout = Duration.buildByMilliseconds(0);
/**
* Delay between consecutive attempts to append an event in the ring buffer when
* full.
*/
private Duration appendRetryFrequency = Duration.buildByMilliseconds(5);
/**
* How long to wait for in-flight events during shutdown.
*/
private Duration shutdownGracePeriod = Duration.buildByMinutes(1);
/**
* Lock used to limit the number of concurrent threads retrying at the same time
*/
private final ReentrantLock lock = new ReentrantLock();
//......
}
AsyncDisruptorAppender继承了logback的UnsynchronizedAppenderBase,它使用了Disruptor的RingBuffer来进行异步,其默认的WaitStrategy为BlockingWaitStrategy,默认的ringBufferSize为8192,默认的producerType为ProducerType.MULTI,droppedWarnFrequency为1000
start
public void start() {
if (addDefaultStatusListener && getStatusManager() != null && getStatusManager().getCopyOfStatusListenerList().isEmpty()) {
LevelFilteringStatusListener statusListener = new LevelFilteringStatusListener();
statusListener.setLevelValue(Status.WARN);
statusListener.setDelegate(new OnConsoleStatusListener());
statusListener.setContext(getContext());
statusListener.start();
getStatusManager().add(statusListener);
}
this.disruptor = new Disruptor<>(
this.eventFactory,
this.ringBufferSize,
this.threadFactory,
this.producerType,
this.waitStrategy);
/*
* Define the exceptionHandler first, so that it applies
* to all future eventHandlers.
*/
this.disruptor.setDefaultExceptionHandler(this.exceptionHandler);
this.disruptor.handleEventsWith(new EventClearingEventHandler<>(createEventHandler()));
this.disruptor.start();
super.start();
fireAppenderStarted();
}
其start方法根据eventFactory、ringBufferSize、threadFactory、producerType、waitStrategy创建Disruptor,然后设置defaultExceptionHandler,设置EventHandler为EventClearingEventHandler,然后执行disruptor.start(),再执行super.start()
stop
public void stop() {
/*
* Check super.isStarted() instead of isStarted() because subclasses
* might override isStarted() to perform other comparisons that we don't
* want to check here. Those should be checked by subclasses
* prior to calling super.stop()
*/
if (!super.isStarted()) {
return;
}
/*
* Don't allow any more events to be appended.
*/
super.stop();
/*
* Shutdown Disruptor
*
* Calling Disruptor#shutdown() will wait until all enqueued events are fully processed,
* but this waiting happens in a busy-spin. To avoid wasting CPU we wait for at most the configured
* grace period before asking the Disruptor for an immediate shutdown.
*/
long deadline = getShutdownGracePeriod().getMilliseconds() < 0 ? Long.MAX_VALUE : System.currentTimeMillis() + getShutdownGracePeriod().getMilliseconds();
while (!isRingBufferEmpty() && (System.currentTimeMillis() < deadline)) {
LockSupport.parkNanos(SLEEP_TIME_DURING_SHUTDOWN);
}
this.disruptor.halt();
if (!isRingBufferEmpty()) {
addWarn("Some queued events have not been logged due to requested shutdown");
}
fireAppenderStopped();
}
stop方法先执行super.stop()不让event再进来,然后根据shutdownGracePeriod计算deadline,在isRingBufferEmpty为false的时候进行等待,最后执行disruptor.halt()
append
protected void append(Event event) {
long startTime = System.nanoTime();
try {
prepareForDeferredProcessing(event);
} catch (RuntimeException e) {
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
}
try {
if (enqueue(event)) {
// Log warning if we had drop before
//
long consecutiveDropped = this.consecutiveDroppedCount.get();
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}
// Notify listeners
//
fireEventAppended(event, System.nanoTime() - startTime);
} else {
// Log a warning status about the failure
//
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
if ((consecutiveDropped % this.droppedWarnFrequency) == 1) {
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
}
// Notify listeners
//
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
}
} catch (ShutdownInProgressException e) {
// Same message as if Appender#append is called after the appender is stopped...
addWarn("Attempted to append to non started appender [" + this.getName() + "].");
} catch (InterruptedException e) {
// be silent but re-interrupt the thread
Thread.currentThread().interrupt();
}
}
其append方法先执行prepareForDeferredProcessing,再执行enqueue,enqueue不成功则递增consecutiveDroppedCount,再根据droppedWarnFrequency判断是否需要打印warning日志
enqueue
private boolean enqueue(Event event) throws ShutdownInProgressException, InterruptedException {
// Try enqueue the "normal" way
//
if (this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
return true;
}
// Drop event immediately when no retry
//
if (this.appendTimeout.getMilliseconds() == 0) {
return false;
}
// Limit retries to a single thread at once to avoid burning CPU cycles "for nothing"
// in CPU constraint environments.
//
long deadline = Long.MAX_VALUE;
if (this.appendTimeout.getMilliseconds() < 0) {
lock.lockInterruptibly();
} else {
deadline = System.currentTimeMillis() + this.appendTimeout.getMilliseconds();
if (!lock.tryLock(this.appendTimeout.getMilliseconds(), TimeUnit.MILLISECONDS)) {
return false;
}
}
// Retry until deadline
//
long backoff = 1L;
long backoffLimit = TimeUnit.MILLISECONDS.toNanos(this.appendRetryFrequency.getMilliseconds());
try {
do {
if (!isStarted()) {
throw new ShutdownInProgressException();
}
if (deadline <= System.currentTimeMillis()) {
return false;
}
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
LockSupport.parkNanos(backoff);
backoff = Math.min(backoff * 2, backoffLimit);
} while (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event));
return true;
} finally {
lock.unlock();
}
}
enqueue方法先执行disruptor.getRingBuffer().tryPublishEvent,成功则返回true,否则根据appendTimeout判断是否需要重试,为0则返回false,否则根据appendTimeout计算deadline,然后进行lock.tryLock,再循环尝试disruptor.getRingBuffer().tryPublishEvent,直到成功或者deadline小于等于当前时间
小结
reactor-logback已经不在维护了EOL reactor-logback in 3.3+ #204,官方推荐使用logstash-logback-encoder。LoggingEventAsyncDisruptorAppender继承了DelegatingAsyncDisruptorAppender,主要是根据includeCallerData属性判断是否需要计算callerData;DelegatingAsyncDisruptorAppender继承了AsyncDisruptorAppender,它主要是组合了AppenderAttachableImpl实现了AppenderAttachable接口;AsyncDisruptorAppender则是使用了Disruptor的RingBuffer来进行异步,其默认的WaitStrategy为BlockingWaitStrategy,默认的ringBufferSize为8192,默认的producerType为ProducerType.MULTI,droppedWarnFrequency为1000。
网友评论