美文网首页
tomcat 8.x NioEndpoint之Acceptor组

tomcat 8.x NioEndpoint之Acceptor组

作者: 杭州_mina | 来源:发表于2017-06-27 22:03 被阅读0次

    简书 杭州_mina

    tomcat 8.x NioEndpoint核心组件浅析1

    1. Acceptor 浅析

       /**
         * The background thread that listens for incoming TCP/IP connections and
         * hands them off to an appropriate processor.
         */
        protected class Acceptor extends AbstractEndpoint.Acceptor {
    
            @Override
            public void run() {
    
                int errorDelay = 0;
    
                // Loop until we receive a shutdown command
                // 一直循环直到接收到关闭命令
                while (running) {
    
                    // paused 只有在unbind的时候会设置
                    while (paused && running) {
                        //变更Acceptor的状态
                        state = AcceptorState.PAUSED;
                        try {
                            Thread.sleep(50);
                        } catch (InterruptedException e) {
                            // Ignore
                        }
                    }
                    
                    if (!running) {
                        break;
                    }
                    // Acceptor 设置成running 
                    state = AcceptorState.RUNNING;
    
                    try {
                        //if we have reached max connections, wait
                        //连接数达到最大,暂停线程。
                        //这里用到的是connectionLimitLatch锁,可以理解为一个闭锁
                        //我理解connectionLimitLatch和coundownlatch类似
                        //补充一点这个是先增加计数器、如果超过最大连接数则减少计时器、然后线程暂停,你这个计时器就是当前连接数
                        //程序的下面我会简单的讲一下这个方法的实现
                        countUpOrAwaitConnection();
    
                        SocketChannel socket = null;
                        try {
                            // Accept the next incoming connection from the server
                            // socket
                            //调用serversocket的accept方法
                            socket = serverSock.accept();
                        } catch (IOException ioe) {
                            // We didn't get a socket
                            // 出错了要减去连接数
                            countDownConnection();
                            if (running) {
                                // Introduce delay if necessary
                                errorDelay = handleExceptionWithDelay(errorDelay);
                                // re-throw
                                throw ioe;
                            } else {
                                break;
                            }
                        }
                        // Successful accept, reset the error delay
                        errorDelay = 0;
    
                        // Configure the socket
                        if (running && !paused) {
                            // setSocketOptions() will hand the socket off to
                            // an appropriate processor if successful
                            // 把socket扔到poller中
                            if (!setSocketOptions(socket)) {
                                // 如果加入poller失败关闭连接
                                closeSocket(socket);
                            }
                        } else {
                            closeSocket(socket);
                        }
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        log.error(sm.getString("endpoint.accept.fail"), t);
                    }
                }
                state = AcceptorState.ENDED;
            }
        }
    
    

    下面来看setSocketOptions方法中如何把SocketChannel加入到poller中

    protected boolean setSocketOptions(SocketChannel socket) {
            // Process the connection
            try {
                //disable blocking, APR style, we are gonna be polling it
                socket.configureBlocking(false); //设置成非阻塞
                //获取socket
                Socket sock = socket.socket();
               //配置socket信息
                socketProperties.setProperties(sock);
                //创建一个NioChannel 他封装了SocketChannel
                NioChannel channel = nioChannels.pop();
                if (channel == null) {
                    //如果为null 创建一个NioChannel 这里使用系统内存
                   //使用系统内存可以省去一步从系统内存拷贝到堆内存的动作、性能上会有很大的提升,nioChannels初始化默认为128个 
                   //当socket 关闭的重新清理NioChannel而不是销毁这个对象可以达到对象复用的效果、因为申请系统内存的开销比申请堆内存的开销要大很多
                    SocketBufferHandler bufhandler = new SocketBufferHandler(
                            socketProperties.getAppReadBufSize(),
                            socketProperties.getAppWriteBufSize(),
                            socketProperties.getDirectBuffer());
                    if (isSSLEnabled()) {
                        channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                    } else {
                        channel = new NioChannel(socket, bufhandler);
                    }
                } else {
                    //如果不为null设置SocketChannel
                    channel.setIOChannel(socket);
                    //将channle复位 以上就是重置系统内存把指针重新定位到buff的开始位置
                    channel.reset();
                }
                //丢入poller队列中
                getPoller0().register(channel);
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                try {
                    log.error("",t);
                } catch (Throwable tt) {
                    ExceptionUtils.handleThrowable(tt);
                }
                // Tell to close the socket
                return false;
            }
            return true;
        }
    

    最后一步register方法

     public void register(final NioChannel socket) {
                 //设置poller 对象
                socket.setPoller(this); 
                //设置一个附件待会把这个NioSocketWrapper注册到poller对象中的selector上去           
                NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);//获取
                socket.setSocketWrapper(ka);
                ka.setPoller(this);
                ka.setReadTimeout(getSocketProperties().getSoTimeout());
                ka.setWriteTimeout(getSocketProperties().getSoTimeout());
                ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
                ka.setSecure(isSSLEnabled());
                ka.setReadTimeout(getSoTimeout());
                ka.setWriteTimeout(getSoTimeout());
                PollerEvent r = eventCache.pop();
                ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
                //生成一个PollerEvent对象 这个对象继承了Runnable
                //这个对象的主要作用就是把NioSocketWrapper注册到poller对象中的selector上去           
                //还有就是获取SelectionKey 该对象是用于跟踪这些被注册事件的句柄
                if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
                else r.reset(socket,ka,OP_REGISTER);
                //把pollerEvent放入到poller的队列中
               addEvent(r);
            }
    

    2. LimitLatch 浅析

    • 2.1 AbstractEndpoint 封装了LimitLatch一些方法
        //默认最大连接数为10000
        private int maxConnections = 10000;
        //获取最大连接数
        public int  getMaxConnections() {
            return this.maxConnections;
        }
        //初始化闭锁
        protected LimitLatch initializeConnectionLatch() {
            if (maxConnections==-1) return null;
            if (connectionLimitLatch==null) {
                //
                connectionLimitLatch = new LimitLatch(getMaxConnections());
            }
            return connectionLimitLatch;
        }
        //将会释放所有的线程
        protected void releaseConnectionLatch() {
            LimitLatch latch = connectionLimitLatch;
            if (latch!=null) latch.releaseAll();
            connectionLimitLatch = null;
        }
        //增加计数,如果太大,那么等待
        protected void countUpOrAwaitConnection() throws InterruptedException {
            if (maxConnections==-1) return;
            LimitLatch latch = connectionLimitLatch;
            if (latch!=null) latch.countUpOrAwait();
        }
        //减少计数
        protected long countDownConnection() {
            if (maxConnections==-1) return -1;
            LimitLatch latch = connectionLimitLatch;
            if (latch!=null) {
                long result = latch.countDown();
                if (result<0) {
                    getLog().warn(sm.getString("endpoint.warn.incorrectConnectionCount"));
                }
                return result;
            } else return -1
        }
    
    • 2.2 LimitLatch 源码浅析
    /*
     *  Licensed to the Apache Software Foundation (ASF) under one or more
     *  contributor license agreements.  See the NOTICE file distributed with
     *  this work for additional information regarding copyright ownership.
     *  The ASF licenses this file to You under the Apache License, Version 2.0
     *  (the "License"); you may not use this file except in compliance with
     *  the License.  You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     *  Unless required by applicable law or agreed to in writing, software
     *  distributed under the License is distributed on an "AS IS" BASIS,
     *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     *  See the License for the specific language governing permissions and
     *  limitations under the License.
     */
    package org.apache.tomcat.util.threads;
    
    import org.apache.juli.logging.Log;
    import org.apache.juli.logging.LogFactory;
    
    import java.util.Collection;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    
    /**
     * Shared latch that allows the latch to be acquired a limited number of times
     * after which all subsequent requests to acquire the latch will be placed in a
     * FIFO queue until one of the shares is returned.
     */
    public class LimitLatch {
    
        private static final Log log = LogFactory.getLog(LimitLatch.class);
         //构建sync对象、主要用来同步、阻塞两个功能
        private class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1L;
    
            public Sync() {
            }
            @Override
            protected int tryAcquireShared(int ignored) {
                 //增加计数器
                long newCount = count.incrementAndGet();
                //如果计数器大于最大limit 则计数器减一 返回-1 否则返回 1 
                //这里的limit 其实就是maxConnections
                if (!released && newCount > limit) {
                    // Limit exceeded
                    count.decrementAndGet();
                    return -1;
                } else {
                    return 1;
                }
            }
    
            @Override
            protected boolean tryReleaseShared(int arg) {
                //计数器减一
                count.decrementAndGet();
                return true;
            }
        }
    
        private final Sync sync;
        //计数器
        private final AtomicLong count;
        //最大连接数
        private volatile long limit;
        //是否全部释放
        private volatile boolean released = false;
    
        /**
         * Instantiates a LimitLatch object with an initial limit.
         *
         * @param limit - maximum number of concurrent acquisitions of this latch
         */
        public LimitLatch(long limit) {
            this.limit = limit;
            this.count = new AtomicLong(0);
            this.sync = new Sync();
        }
    
        /**
         * Returns the current count for the latch
         *
         * @return the current count for latch
         */
        //获取当前计数器
        public long getCount() {
            return count.get();
        }
    
        /**
         * Obtain the current limit.
         *
         * @return the limit
         */
       //获取最大连接数
        public long getLimit() {
            return limit;
        }
    
    
        /**
         * Sets a new limit. If the limit is decreased there may be a period where
         * more shares of the latch are acquired than the limit. In this case no
         * more shares of the latch will be issued until sufficient shares have been
         * returned to reduce the number of acquired shares of the latch to below
         * the new limit. If the limit is increased, threads currently in the queue
         * may not be issued one of the newly available shares until the next
         * request is made for a latch.
         *
         * @param limit The new limit
         */
       //设置最大连接数
        public void setLimit(long limit) {
            this.limit = limit;
        }
    
    
        /**
         * Acquires a shared latch if one is available or waits for one if no shared
         * latch is current available.
         *
         * @throws InterruptedException If the current thread is interrupted
         */
        //增加计数器如果超过最大连接数、则等待并且计数器减一
        public void countUpOrAwait() throws InterruptedException {
            if (log.isDebugEnabled()) {
                log.debug("Counting up[" + Thread.currentThread().getName() + "] latch=" + getCount());
            }
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * Releases a shared latch, making it available for another thread to use.
         *
         * @return the previous counter value
         */
        //减少计数器其实就是减少连接数
        public long countDown() {
            sync.releaseShared(0);
            long result = getCount();
            if (log.isDebugEnabled()) {
                log.debug("Counting down[" + Thread.currentThread().getName() + "] latch=" + result);
            }
            return result;
        }
    
        /**
         * Releases all waiting threads and causes the {@link #limit} to be ignored
         * until {@link #reset()} is called.
         *
         * @return <code>true</code> if release was done
         */
        //释放所有线程
        public boolean releaseAll() {
            released = true;
            return sync.releaseShared(0);
        }
    
        /**
         * Resets the latch and initializes the shared acquisition counter to zero.
         *
         * @see #releaseAll()
         */
        //重置计数器
        public void reset() {
            this.count.set(0);
            released = false;
        }
    
        /**
         * Returns <code>true</code> if there is at least one thread waiting to
         * acquire the shared lock, otherwise returns <code>false</code>.
         *
         * @return <code>true</code> if threads are waiting
         */
        //是否有等待线程
        public boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
        /**
         * Provide access to the list of threads waiting to acquire this limited
         * shared latch.
         *
         * @return a collection of threads
         */
        //获取所有等待线程
        public Collection<Thread> getQueuedThreads() {
            return sync.getQueuedThreads();
        }
    }
    
    

    3.总结Acceptor流程

    虽然看上去代码有点复杂,但是实际上就是一句话概括。获取socket、并且对socket进行封装,扔到Poller的队列中。

    相关文章

      网友评论

          本文标题:tomcat 8.x NioEndpoint之Acceptor组

          本文链接:https://www.haomeiwen.com/subject/tcaecxtx.html