美文网首页
聊聊reactor-logback的AsyncAppender

聊聊reactor-logback的AsyncAppender

作者: go4it | 来源:发表于2023-12-17 09:33 被阅读0次

本文主要研究一下reactor-logback的AsyncAppender

AsyncAppender

reactor-logback/src/main/java/reactor/logback/AsyncAppender.java

public class AsyncAppender extends ContextAwareBase
        implements Appender<ILoggingEvent>, AppenderAttachable<ILoggingEvent>,
                   CoreSubscriber<ILoggingEvent> {

    private final AppenderAttachableImpl<ILoggingEvent>    aai      =
            new AppenderAttachableImpl<ILoggingEvent>();
    private final FilterAttachableImpl<ILoggingEvent>      fai      =
            new FilterAttachableImpl<ILoggingEvent>();
    private final AtomicReference<Appender<ILoggingEvent>> delegate =
            new AtomicReference<Appender<ILoggingEvent>>();

    private String                            name;
    private WorkQueueProcessor<ILoggingEvent> processor;

    private int     backlog           = 1024 * 1024;
    private boolean includeCallerData = false;
    private boolean started           = false;

    //......
}   

AsyncAppender继承了ContextAwareBase,同时实现了Appender、AppenderAttachable、CoreSubscriber接口

CoreSubscriber

reactor/core/CoreSubscriber.java

public interface CoreSubscriber<T> extends Subscriber<T> {

    /**
     * Request a {@link Context} from dependent components which can include downstream
     * operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
     *
     * @return a resolved context or {@link Context#empty()}
     */
    default Context currentContext(){
        return Context.empty();
    }

    /**
     * Implementors should initialize any state used by {@link #onNext(Object)} before
     * calling {@link Subscription#request(long)}. Should further {@code onNext} related
     * state modification occur, thread-safety will be required.
     * <p>
     *    Note that an invalid request {@code <= 0} will not produce an onError and
     *    will simply be ignored or reported through a debug-enabled
     *    {@link reactor.util.Logger}.
     *
     * {@inheritDoc}
     */
    @Override
    void onSubscribe(Subscription s);
}

CoreSubscriber继承了Subscriber接口,Subscriber接口定义了onSubscribe(Subscription s)、onNext、onError、onComplete方法

onSubscribe

    public void onSubscribe(Subscription s) {
        try {
            doStart();
        }
        catch (Throwable t) {
            addError(t.getMessage(), t);
        }
        finally {
            started = true;
            s.request(Long.MAX_VALUE);
        }
    }

    protected void doStart() {
    }   

onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE)

onNext

    public void onNext(ILoggingEvent iLoggingEvent) {
        aai.appendLoopOnAppenders(iLoggingEvent);
    }

onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法

onError

    public void onError(Throwable t) {
        addError(t.getMessage(), t);
    }

onError主要是添加错误信息到logback的status

onComplete

    public void onComplete() {
        try {
            Appender<ILoggingEvent> appender = delegate.getAndSet(null);
            if (appender != null){
                doStop();
                appender.stop();
                aai.detachAndStopAllAppenders();
            }
        }
        catch (Throwable t) {
            addError(t.getMessage(), t);
        }
        finally {
            started = false;
        }
    }

    protected void doStop() {
    }   

onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false

Appender.doAppend

    public void doAppend(ILoggingEvent evt) throws LogbackException {
        if (getFilterChainDecision(evt) == FilterReply.DENY) {
            return;
        }
        evt.prepareForDeferredProcessing();
        if (includeCallerData) {
            evt.getCallerData();
        }
        try {
            queueLoggingEvent(evt);
        }
        catch (Throwable t) {
            addError(t.getMessage(), t);
        }
    }

    protected void queueLoggingEvent(ILoggingEvent evt) {
        if (null != delegate.get()) {
            processor.onNext(evt);
        }
    }   

doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)

LifeCycle.start

    public void start() {
        startDelegateAppender();

        processor = WorkQueueProcessor.<ILoggingEvent>builder().name("logger")
                                                               .bufferSize(backlog)
                                                               .autoCancel(false)
                                                               .build();
        processor.subscribe(this);
    }

    private void startDelegateAppender() {
        Appender<ILoggingEvent> delegateAppender = delegate.get();
        if (null != delegateAppender && !delegateAppender.isStarted()) {
            delegateAppender.start();
        }
    }

    public void addAppender(Appender<ILoggingEvent> newAppender) {
        if (delegate.compareAndSet(null, newAppender)) {
            aai.addAppender(newAppender);
        }
        else {
            throw new IllegalArgumentException(delegate.get() + " already attached.");
        }
    }       

start方法执行startDelegateAppender,然后创建WorkQueueProcessor(默认bufferSize为1024 * 1024),并subscribe当前实例;addAppender方法会设置delegate,并往AppenderAttachableImpl添加appender

stop

    public void stop() {
        processor.onComplete();
    }

stop方法执行processor.onComplete()

小结

reactor-logback基于WorkQueueProcessor提供了另外一种AsyncAppender,它不是基于BlockingQueue而是基于RingBuffer来实现的。其onSubscribe方法执行doStart,标记started为true,同时触发s.request(Long.MAX_VALUE);onNext调用AppenderAttachableImpl的appendLoopOnAppenders方法;onComplete则执行doStop、appender.stop()、aai.detachAndStopAllAppenders(),最后标记started为false;doAppend方法先判断是否需要DENY,是则直接返回,之后主要执行queueLoggingEvent,它在delegate不为null时执行processor.onNext(evt)。

相关文章

  • 既然使用Logback,应该对它多些了解(三)

    1. AsyncAppender 异步记录日志 AsyncAppender仅仅是做为一个日志分发器存在,因此,它...

  • LogBack AsyncAppender

    工作原理 AsyncAppender并不处理日志, 只是将日志缓冲到一个BlockingQueue里面去, 并在内...

  • logback性能优化 AsyncAppender && Fil

    导读 cache vs buffer 在系统设计中通常会有cache 及 buffer的设计: cache :设备...

  • 聊聊…聊聊?

    世界不大,一座城市里,用高楼大厦圈出来的的圈子更小了… 心再大,也会被城市里喧嚣的汽笛压抑自己 不记得有多久没有好...

  • 聊聊聊

    今天主要的时间是和阿q过的,非常开心我们有了这么一次聊天! 我觉得自己不孤单了。我俩目前拥有的感情非常相似,是比较...

  • 聊聊聊出来的感情!

    刚好回学校那天晚上,我有个比赛,以此草草结束了聊天。等忙完,我吱了一声,就直接洗洗睡了,用行动加强自己的决心。 没...

  • 无聊聊聊

  • 聊聊,聊聊选择

    今早梦到一杯豆浆15元,我给自己的孩子买了一杯50元的奶茶,对她感叹“在我们那个年代一杯奶茶才10元”孩子问我那么...

  • 聊聊,聊聊闲时

    有段时间着了迷一样的看伍迪艾伦电影,印象最深的就是电影开场他一张大脸挤满了屏幕,絮絮叨叨两分钟,正片开始。 后来得...

  • 聊聊聊的一天

    今天的更新就算是一篇日记吧。 早上接到妹妹的电话说想买衣服让我陪,早上十点多见面,先喝杯奶茶聊会...

网友评论

      本文标题:聊聊reactor-logback的AsyncAppender

      本文链接:https://www.haomeiwen.com/subject/rhfogdtx.html