美文网首页
聊聊logback的AsyncAppender

聊聊logback的AsyncAppender

作者: go4it | 来源:发表于2023-10-27 19:21 被阅读0次

    本文主要研究一下logback的AsyncAppender

    AsyncAppender

    ch/qos/logback/classic/AsyncAppender.java

    public class AsyncAppender extends AsyncAppenderBase<ILoggingEvent> {
    
        boolean includeCallerData = false;
    
        /**
         * Events of level TRACE, DEBUG and INFO are deemed to be discardable.
         * @param event
         * @return true if the event is of level TRACE, DEBUG or INFO false otherwise.
         */
        protected boolean isDiscardable(ILoggingEvent event) {
            Level level = event.getLevel();
            return level.toInt() <= Level.INFO_INT;
        }
    
        protected void preprocess(ILoggingEvent eventObject) {
            eventObject.prepareForDeferredProcessing();
            if (includeCallerData)
                eventObject.getCallerData();
        }
    
        public boolean isIncludeCallerData() {
            return includeCallerData;
        }
    
        public void setIncludeCallerData(boolean includeCallerData) {
            this.includeCallerData = includeCallerData;
        }
    
    }
    

    AsyncAppender继承了AsyncAppenderBase,它新增了includeCallerData配置,另外覆盖了isDiscardable、preprocess方法,isDiscardable针对TRACE、DEBUG的级别返回true,INFO返回false;preprocess则判断是否includeCallerData,是的话则执行eventObject.getCallerData()

    AsyncAppenderBase

    ch/qos/logback/core/AsyncAppenderBase.java

    public class AsyncAppenderBase<E> extends UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> {
    
        AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>();
        BlockingQueue<E> blockingQueue;
    
        /**
         * The default buffer size.
         */
        public static final int DEFAULT_QUEUE_SIZE = 256;
        int queueSize = DEFAULT_QUEUE_SIZE;
    
        int appenderCount = 0;
    
        static final int UNDEFINED = -1;
        int discardingThreshold = UNDEFINED;
        boolean neverBlock = false;
    
        Worker worker = new Worker();
    
        /**
         * The default maximum queue flush time allowed during appender stop. If the 
         * worker takes longer than this time it will exit, discarding any remaining 
         * items in the queue
         */
        public static final int DEFAULT_MAX_FLUSH_TIME = 1000;
        int maxFlushTime = DEFAULT_MAX_FLUSH_TIME;
    
        /**
         * Is the eventObject passed as parameter discardable? The base class's implementation of this method always returns
         * 'false' but sub-classes may (and do) override this method.
         * <p/>
         * <p>Note that only if the buffer is nearly full are events discarded. Otherwise, when the buffer is "not full"
         * all events are logged.
         *
         * @param eventObject
         * @return - true if the event can be discarded, false otherwise
         */
        protected boolean isDiscardable(E eventObject) {
            return false;
        }
    
        /**
         * Pre-process the event prior to queueing. The base class does no pre-processing but sub-classes can
         * override this behavior.
         *
         * @param eventObject
         */
        protected void preprocess(E eventObject) {
        }
    
        @Override
        public void start() {
            if (isStarted())
                return;
            if (appenderCount == 0) {
                addError("No attached appenders found.");
                return;
            }
            if (queueSize < 1) {
                addError("Invalid queue size [" + queueSize + "]");
                return;
            }
            blockingQueue = new ArrayBlockingQueue<E>(queueSize);
    
            if (discardingThreshold == UNDEFINED)
                discardingThreshold = queueSize / 5;
            addInfo("Setting discardingThreshold to " + discardingThreshold);
            worker.setDaemon(true);
            worker.setName("AsyncAppender-Worker-" + getName());
            // make sure this instance is marked as "started" before staring the worker Thread
            super.start();
            worker.start();
        }
    
        @Override
        public void stop() {
            if (!isStarted())
                return;
    
            // mark this appender as stopped so that Worker can also processPriorToRemoval if it is invoking
            // aii.appendLoopOnAppenders
            // and sub-appenders consume the interruption
            super.stop();
    
            // interrupt the worker thread so that it can terminate. Note that the interruption can be consumed
            // by sub-appenders
            worker.interrupt();
    
            InterruptUtil interruptUtil = new InterruptUtil(context);
    
            try {
                interruptUtil.maskInterruptFlag();
    
                worker.join(maxFlushTime);
    
                // check to see if the thread ended and if not add a warning message
                if (worker.isAlive()) {
                    addWarn("Max queue flush timeout (" + maxFlushTime + " ms) exceeded. Approximately " + blockingQueue.size()
                                    + " queued events were possibly discarded.");
                } else {
                    addInfo("Queue flush finished successfully within timeout.");
                }
    
            } catch (InterruptedException e) {
                int remaining = blockingQueue.size();
                addError("Failed to join worker thread. " + remaining + " queued events may be discarded.", e);
            } finally {
                interruptUtil.unmaskInterruptFlag();
            }
        }
    
    
        @Override
        protected void append(E eventObject) {
            if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
                return;
            }
            preprocess(eventObject);
            put(eventObject);
        }
    
        protected boolean isDiscardable(E eventObject) {
            return false;
        }
    
        protected void preprocess(E eventObject) {
        }
    
        private boolean isQueueBelowDiscardingThreshold() {
            return (blockingQueue.remainingCapacity() < discardingThreshold);
        }
    
        private void put(E eventObject) {
            if (neverBlock) {
                blockingQueue.offer(eventObject);
            } else {
                putUninterruptibly(eventObject);
            }
        }
    
        private void putUninterruptibly(E eventObject) {
            boolean interrupted = false;
            try {
                while (true) {
                    try {
                        blockingQueue.put(eventObject);
                        break;
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
            } finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    
        //......
    }    
    

    AsyncAppenderBase继承了UnsynchronizedAppenderBase,实现了AppenderAttachable接口,它定义了queueSize、discardingThreshold、neverBlock等属性,其start方法会根据queueSize创建ArrayBlockingQueue,discardingThreshold默认为queueSize / 5,之后启动Wroker;stop方法则执行worker.interrupt(),然后等待maxFlushTime让log进行flush;其append方法会先判断isQueueBelowDiscardingThreshold及isDiscardable,都为true则直接返回,否则执行preprocess、put方法

    Worker

    ch/qos/logback/core/AsyncAppenderBase.java

        class Worker extends Thread {
    
            public void run() {
                AsyncAppenderBase<E> parent = AsyncAppenderBase.this;
                AppenderAttachableImpl<E> aai = parent.aai;
    
                // loop while the parent is started
                while (parent.isStarted()) {
                    try {
                        E e = parent.blockingQueue.take();
                        aai.appendLoopOnAppenders(e);
                    } catch (InterruptedException ie) {
                        break;
                    }
                }
    
                addInfo("Worker thread will flush remaining events before exiting. ");
    
                for (E e : parent.blockingQueue) {
                    aai.appendLoopOnAppenders(e);
                    parent.blockingQueue.remove(e);
                }
    
                aai.detachAndStopAllAppenders();
            }
        }
    

    Worker的run方法会不断循环从blockingQueue阻塞取出原生,然后添加到AppenderAttachableImpl;在started为false的时候跳槽循环,然后遍历blockingQueue,添加到AppenderAttachableImpl,然后将其从blockingQueue;最后执行detachAndStopAllAppenders

    AppenderAttachableImpl

    ch/qos/logback/core/spi/AppenderAttachableImpl.java

        public int appendLoopOnAppenders(E e) {
            int size = 0;
            final Appender<E>[] appenderArray = appenderList.asTypedArray();
            final int len = appenderArray.length;
            for (int i = 0; i < len; i++) {
                appenderArray[i].doAppend(e);
                size++;
            }
            return size;
        }
    
        /**
         * Remove and processPriorToRemoval all previously attached appenders.
         */
        public void detachAndStopAllAppenders() {
            for (Appender<E> a : appenderList) {
                a.stop();
            }
            appenderList.clear();
        }
    

    AppenderAttachableImpl的appendLoopOnAppenders方法会遍历所有的appenderList执行doAppend方法;其detachAndStopAllAppenders则遍历appenderList,挨个执行stop,最后clear掉整个appenderList

    小结

    logback的AsyncAppender使用ArrayBlockingQueue(默认size为256)来进行缓冲,每次append的时候会先判断isQueueBelowDiscardingThreshold及isDiscardable,为true则直接返回/丢弃,之后执行preprocess,最后执行put,put的时候有个参数neverBlock,为true则使用的是offer方法,队列满的时候会被丢弃,为false则是阻塞的方法,等到put成功才返回;另外它有个worker线程,不断从blockingQueue阻塞take元素出来然后写入到appenderList,在关闭时还会遍历队列写入到appenderList然后从队列移除,最后清空队列。

    相关文章

      网友评论

          本文标题:聊聊logback的AsyncAppender

          本文链接:https://www.haomeiwen.com/subject/gjwuidtx.html