美文网首页
Apache common pool2 对象驱逐解析

Apache common pool2 对象驱逐解析

作者: zh_harry | 来源:发表于2020-03-15 22:55 被阅读0次
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.commons.pool2.impl;
    
    import org.apache.commons.pool2.PooledObject;
    
    /**
     * To provide a custom eviction policy (i.e. something other than {@link
     * DefaultEvictionPolicy} for a pool, users must provide an implementation of
     * this interface that provides the required eviction policy.
     *
     * @param <T> the type of objects in the pool
     *
     * @since 2.0
     */
    public interface EvictionPolicy<T> {
    
        /**
         * This method is called to test if an idle object in the pool should be
         * evicted or not.
         *
         * @param config    The pool configuration settings related to eviction
         * @param underTest The pooled object being tested for eviction
         * @param idleCount The current number of idle objects in the pool including
         *                      the object under test
         * @return {@code true} if the object should be evicted, otherwise
         *             {@code false}
         */
        boolean evict(EvictionConfig config, PooledObject<T> underTest, int idleCount);
    }
    
    

    默认实现

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.commons.pool2.impl;
    
    import org.apache.commons.pool2.PooledObject;
    
    /**
     * Provides the default implementation of {@link EvictionPolicy} used by the
     * pools. Objects will be evicted if the following conditions are met:
     * <ul>
     * <li>the object has been idle longer than
     *     {@link GenericObjectPool#getMinEvictableIdleTimeMillis()} /
     *     {@link GenericKeyedObjectPool#getMinEvictableIdleTimeMillis()}</li>
     * <li>there are more than {@link GenericObjectPool#getMinIdle()} /
     *     {@link GenericKeyedObjectPoolConfig#getMinIdlePerKey()} idle objects in
     *     the pool and the object has been idle for longer than
     *     {@link GenericObjectPool#getSoftMinEvictableIdleTimeMillis()} /
     *     {@link GenericKeyedObjectPool#getSoftMinEvictableIdleTimeMillis()}
     * </ul>
     * <p>
     * This class is immutable and thread-safe.
     * </p>
     *
     * @param <T> the type of objects in the pool
     *
     * @since 2.0
     */
    public class DefaultEvictionPolicy<T> implements EvictionPolicy<T> {
    
        @Override
        public boolean evict(final EvictionConfig config, final PooledObject<T> underTest,
                final int idleCount) {
    
            if ((config.getIdleSoftEvictTime() < underTest.getIdleTimeMillis() &&
                    config.getMinIdle() < idleCount) ||
                    config.getIdleEvictTime() < underTest.getIdleTimeMillis()) {
                return true;
            }
            return false;
        }
    }
    
    

    对应配置

     /**
         * Create a new eviction configuration with the specified parameters.
         * Instances are immutable.
         *
         * @param poolIdleEvictTime Expected to be provided by
         *        {@link BaseGenericObjectPool#getMinEvictableIdleTimeMillis()}
         * @param poolIdleSoftEvictTime Expected to be provided by
         *        {@link BaseGenericObjectPool#getSoftMinEvictableIdleTimeMillis()}
         * @param minIdle Expected to be provided by
         *        {@link GenericObjectPool#getMinIdle()} or
         *        {@link GenericKeyedObjectPool#getMinIdlePerKey()}
         */
        public EvictionConfig(final long poolIdleEvictTime, final long poolIdleSoftEvictTime,
                final int minIdle) {
            if (poolIdleEvictTime > 0) {
                idleEvictTime = poolIdleEvictTime;
            } else {
                idleEvictTime = Long.MAX_VALUE;
            }
            if (poolIdleSoftEvictTime > 0) {
                idleSoftEvictTime = poolIdleSoftEvictTime;
            } else {
                idleSoftEvictTime  = Long.MAX_VALUE;
            }
            this.minIdle = minIdle;
        }
    
     final EvictionConfig evictionConfig = new EvictionConfig(
                        getMinEvictableIdleTimeMillis(),
                        getSoftMinEvictableIdleTimeMillis(),
                        getMinIdlePerKey());
    

    启动驱逐器

     /**
         * Sets the number of milliseconds to sleep between runs of the idle object evictor thread.
         * <ul>
         * <li>When positive, the idle object evictor thread starts.</li>
         * <li>When non-positive, no idle object evictor thread runs.</li>
         * </ul>
         *
         * @param timeBetweenEvictionRunsMillis
         *            number of milliseconds to sleep between evictor runs
         *
         * @see #getTimeBetweenEvictionRunsMillis
         */
        public final void setTimeBetweenEvictionRunsMillis(
                final long timeBetweenEvictionRunsMillis) {
            this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
            startEvictor(timeBetweenEvictionRunsMillis);
        }
    

    timeBetweenEvictionRunsMillis >0时才启动

         * <p>Starts the evictor with the given delay. If there is an evictor
         * running when this method is called, it is stopped and replaced with a
         * new evictor with the specified delay.</p>
         *
         * <p>This method needs to be final, since it is called from a constructor.
         * See POOL-195.</p>
         *
         * @param delay time in milliseconds before start and between eviction runs
         */
        final void startEvictor(final long delay) {
            synchronized (evictionLock) {
                EvictionTimer.cancel(evictor, evictorShutdownTimeoutMillis, TimeUnit.MILLISECONDS);
                evictor = null;
                evictionIterator = null;
                if (delay > 0) {
                    evictor = new Evictor();
                    EvictionTimer.schedule(evictor, delay, delay);
                }
            }
        }
    

    驱逐定时器

    /**
         * Adds the specified eviction task to the timer. Tasks that are added with a
         * call to this method *must* call {@link #cancel()} to cancel the
         * task to prevent memory and/or thread leaks in application server
         * environments.
         *
         * @param task      Task to be scheduled.
         * @param delay     Delay in milliseconds before task is executed.
         * @param period    Time in milliseconds between executions.
         */
        static synchronized void schedule(
                final BaseGenericObjectPool<?>.Evictor task, final long delay, final long period) {
            if (null == executor) {
                executor = new ScheduledThreadPoolExecutor(1, new EvictorThreadFactory());
                executor.setRemoveOnCancelPolicy(true);
            }
            final ScheduledFuture<?> scheduledFuture =
                    executor.scheduleWithFixedDelay(task, delay, period, TimeUnit.MILLISECONDS);
            task.setScheduledFuture(scheduledFuture);
        }
    

    驱逐器线程name commons-pool-evictor-thread 并只有一个线程,多个对象共享

    类 EvictionTimer
     /** Executor instance */
        private static ScheduledThreadPoolExecutor executor; 
    
     /**
         * Thread factory that creates a daemon thread, with the context class loader from this class.
         */
        private static class EvictorThreadFactory implements ThreadFactory {
    
            @Override
            public Thread newThread(final Runnable runnable) {
                final Thread thread = new Thread(null, runnable, "commons-pool-evictor-thread");
                thread.setDaemon(true); // POOL-363 - Required for applications using Runtime.addShutdownHook().
                AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
                    thread.setContextClassLoader(EvictorThreadFactory.class.getClassLoader());
                    return null;
                });
    
                return thread;
            }
        }
    

    不足minIdle 时补足 ensureMinIdle ( Inner classes)

    
    
        /**
         * The idle object evictor {@link TimerTask}.
         *
         * @see GenericKeyedObjectPool#setTimeBetweenEvictionRunsMillis
         */
        class Evictor implements Runnable {
    
            private ScheduledFuture<?> scheduledFuture;
    
            /**
             * Run pool maintenance.  Evict objects qualifying for eviction and then
             * ensure that the minimum number of idle instances are available.
             * Since the Timer that invokes Evictors is shared for all Pools but
             * pools may exist in different class loaders, the Evictor ensures that
             * any actions taken are under the class loader of the factory
             * associated with the pool.
             */
            @Override
            public void run() {
                final ClassLoader savedClassLoader =
                        Thread.currentThread().getContextClassLoader();
                try {
                    if (factoryClassLoader != null) {
                        // Set the class loader for the factory
                        final ClassLoader cl = factoryClassLoader.get();
                        if (cl == null) {
                            // The pool has been dereferenced and the class loader
                            // GC'd. Cancel this timer so the pool can be GC'd as
                            // well.
                            cancel();
                            return;
                        }
                        Thread.currentThread().setContextClassLoader(cl);
                    }
    
                    // Evict from the pool
                    try {
                        evict();
                    } catch(final Exception e) {
                        swallowException(e);
                    } catch(final OutOfMemoryError oome) {
                        // Log problem but give evictor thread a chance to continue
                        // in case error is recoverable
                        oome.printStackTrace(System.err);
                    }
                    // Re-create idle instances.  补足min idle
                    try {
                        ensureMinIdle();
                    } catch (final Exception e) {
                        swallowException(e);
                    }
                } finally {
                    // Restore the previous CCL
                    Thread.currentThread().setContextClassLoader(savedClassLoader);
                }
            }
    

    空闲链接超过maxIdle 时会直接destroy
    maxIdleSave <= idleObjects.size() [ idleObjects.size()>=maxIdleSave]

     final int maxIdleSave = getMaxIdle();
            if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
                try {
                    destroy(p);
                } catch (final Exception e) {
                    swallowException(e);
                }
                try {
                    ensureIdle(1, false);
                } catch (final Exception e) {
                    swallowException(e);
                }
            }
    

    连接池dbcp 和luttence redis pool都是common pool实现

    连接池druid 与dbcp 不同

     public class DestroyTask implements Runnable {
    
            @Override
            public void run() {
                shrink(true, keepAlive);
    
                if (isRemoveAbandoned()) {
                    removeAbandoned();
                }
            }
        }
    
    • 缩身 驱逐方法

    idleMillis < minEvictableIdleTimeMillis 不驱逐

      if (idleMillis < minEvictableIdleTimeMillis) {
                            break;
                        }
    

    idleMillis > maxEvictableIdleTimeMillis 时驱逐

     if (idleMillis > maxEvictableIdleTimeMillis) {
                            evictConnections[evictCount++] = connection;
    
    public void shrink(boolean checkTime, boolean keepAlive) {
            try {
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                return;
            }
    
            int evictCount = 0;
            int keepAliveCount = 0;
            try {
                if (!inited) {
                    return;
                }
    
                final int checkCount = poolingCount - minIdle;
                final long currentTimeMillis = System.currentTimeMillis();
                for (int i = 0; i < poolingCount; ++i) {
                    DruidConnectionHolder connection = connections[i];
    
                    if (checkTime) {
                        if (phyTimeoutMillis > 0) {
                            long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
                            if (phyConnectTimeMillis > phyTimeoutMillis) {
                                evictConnections[evictCount++] = connection;
                                continue;
                            }
                        }
    
                        long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;
    
                        if (idleMillis < minEvictableIdleTimeMillis) {
                            break;
                        }
    
                        if (checkTime && i < checkCount) {
                            evictConnections[evictCount++] = connection;
                        } else if (idleMillis > maxEvictableIdleTimeMillis) {
                            evictConnections[evictCount++] = connection;
                        } else if (keepAlive) {
                            keepAliveConnections[keepAliveCount++] = connection;
                        }
                    } else {
                        if (i < checkCount) {
                            evictConnections[evictCount++] = connection;
                        } else {
                            break;
                        }
                    }
                }
    
                int removeCount = evictCount + keepAliveCount;
                if (removeCount > 0) {
                    System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
                    Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
                    poolingCount -= removeCount;
                }
                keepAliveCheckCount += keepAliveCount;
            } finally {
                lock.unlock();
            }
    
            if (evictCount > 0) {
                for (int i = 0; i < evictCount; ++i) {
                    DruidConnectionHolder item = evictConnections[i];
                    Connection connection = item.getConnection();
                    JdbcUtils.close(connection);
                    destroyCountUpdater.incrementAndGet(this);
                }
                Arrays.fill(evictConnections, null);
            }
    
            if (keepAliveCount > 0) {
                this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
                // keep order
                for (int i = keepAliveCount - 1; i >= 0; --i) {
                    DruidConnectionHolder holer = keepAliveConnections[i];
                    Connection connection = holer.getConnection();
                    holer.incrementKeepAliveCheckCount();
    
                    boolean validate = false;
                    try {
                        this.validateConnection(connection);
                        validate = true;
                    } catch (Throwable error) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("keepAliveErr", error);
                        }
                        // skip
                    }
    
                    if (validate) {
                        holer.lastActiveTimeMillis = System.currentTimeMillis();
                        put(holer);
                    } else {
                        JdbcUtils.close(connection);
                    }
                }
                Arrays.fill(keepAliveConnections, null);
            }
        }
    

    mysql内存泄露 5.1.46

    /**
         * Creates a connection to a MySQL Server.
         * 
         * @param hostToConnectTo
         *            the hostname of the database server
         * @param portToConnectTo
         *            the port number the server is listening on
         * @param info
         *            a Properties[] list holding the user and password
         * @param databaseToConnectTo
         *            the database to connect to
         * @param url
         *            the URL of the connection
         * @param d
         *            the Driver instantation of the connection
         * @exception SQLException
         *                if a database access error occurs
         */
        protected ConnectionImpl(String hostToConnectTo, int portToConnectTo, Properties info,
                String databaseToConnectTo, String url)
                throws SQLException {
        
            ...
    
            NonRegisteringDriver.trackConnection(this);
        }
    
    protected static void trackConnection(Connection newConn) {
            
            ConnectionPhantomReference phantomRef = new ConnectionPhantomReference((ConnectionImpl) newConn, refQueue);
            connectionPhantomRefs.put(phantomRef, phantomRef);
        }
    
    
    protected static final ConcurrentHashMap<ConnectionPhantomReference, ConnectionPhantomReference> connectionPhantomRefs = new ConcurrentHashMap<ConnectionPhantomReference, ConnectionPhantomReference>();
        protected static final ReferenceQueue<ConnectionImpl> refQueue = new ReferenceQueue<ConnectionImpl>();
    

    清除线程

    Thread referenceThread = new Thread("Abandoned connection cleanup thread") {
                  public void run() {
                      while (true) {
                          try {
                              Reference<? extends ConnectionImpl> ref = refQueue.remove();
                              try {
                                  ((ConnectionPhantomReference) ref).cleanup();
                              } finally {
                                  connectionPhantomRefs.remove(ref);
                              }
                          } catch (Exception ex) {
                            // no where to really log this if we're static
                          }
                      }
                  }
              };
              
              referenceThread.setDaemon(true);
              referenceThread.start();
        }
    

    清除线程会处理物理链接 ((ConnectionPhantomReference) ref).cleanup();

    static class ConnectionPhantomReference extends PhantomReference<ConnectionImpl> {
            private NetworkResources io;
            
            ConnectionPhantomReference(ConnectionImpl connectionImpl, ReferenceQueue<ConnectionImpl> q) {
                super(connectionImpl, q);
                
                try {
                    io = connectionImpl.getIO().getNetworkResources();
                } catch (SQLException e) {
                    // if we somehow got here and there's really no i/o, we deal with it later
                }
            }
            
            void cleanup() {
                if (io != null) {
                    try {
                        io.forceClose();
                    } finally {
                        io = null;
                    }
                }
            }
        }
    

    驱逐线程会调用 ConnectionImpl close方法

    public void close() throws SQLException {
            try {
                synchronized(this.getConnectionMutex()) {
                    if (this.connectionLifecycleInterceptors != null) {
                        Iterator var2 = this.connectionLifecycleInterceptors.iterator();
    
                        while(var2.hasNext()) {
                            ConnectionLifecycleInterceptor cli = (ConnectionLifecycleInterceptor)var2.next();
                            cli.close();
                        }
                    }
    
                    this.realClose(true, true, false, (Throwable)null);
                }
            } catch (CJException var7) {
                throw SQLExceptionsMapping.translateException(var7, this.getExceptionInterceptor());
            }
        }
    

    相关文章

      网友评论

          本文标题:Apache common pool2 对象驱逐解析

          本文链接:https://www.haomeiwen.com/subject/ygibehtx.html