美文网首页
2019-03-13 自定义连接池

2019-03-13 自定义连接池

作者: 江江江123 | 来源:发表于2019-11-18 18:56 被阅读0次

    连接池:即线程池
    要自定义先要了解线程池模型,即线程池的核心参数
    1.coresize核心线程池,即运行的线程
    2.maxsize最大线程数,即线程池的容量
    3.ThreadFactory线程池工程,但在连接池中是很多connect 即实现autoclosed的连接

    1. queue 消息队列 , 当加入的任务过多线程无法及时处理时,将任务加入队列等待
    2. maxTime 即一个线程允许相应的最大时长,超出自动中断
    3. handle 超出应对策略:即如果线程池的队列满了,还有请求进来该如何处理

    关于为啥不用java中excutors直接构建线程池的问题:

    核心的问题:

    内存溢出;

    为什么会出现内存溢出:

    情况1:maxInt的最大线程数 如cachePool
    情况2:maxInt的消息队列 如fixPool的blockLinkQueue
    为了防止这些问题,所以希望通过基础方式合理创建

    附上大神neo的代码

    public class Pool<T extends AutoCloseable> {
        final BlockingDeque<PoolItem<T>> idleItems = new LinkedBlockingDeque<>();
        final String name;
        final AtomicInteger size = new AtomicInteger(0);
        private final Logger logger = LoggerFactory.getLogger(Pool.class);
        private final Supplier<T> factory;
        public Duration maxIdleTime = Duration.ofMinutes(30);
        private int minSize = 1;
        private int maxSize = 50;
        private long checkoutTimeoutInMs = Duration.ofSeconds(30).toMillis();
    
        public Pool(Supplier<T> factory, String name) {
            this.factory = factory;
            this.name = name;
        }
    
        public void size(int minSize, int maxSize) {
            this.minSize = minSize;
            this.maxSize = maxSize;
        }
    
        public void checkoutTimeout(Duration timeout) {
            checkoutTimeoutInMs = timeout.toMillis();
        }
    
        public PoolItem<T> borrowItem() {
            PoolItem<T> item = idleItems.poll();
            if (item != null) return item;
    
            if (size.get() < maxSize) {
                return createNewItem();
            } else {
                return waitNextAvailableItem();
            }
        }
    
        public void returnItem(PoolItem<T> item) {
            if (item.broken) {
                closeResource(item.resource);
            } else {
                item.returnTime = System.currentTimeMillis();
                idleItems.push(item);
            }
        }
    
        private PoolItem<T> waitNextAvailableItem() {
            var watch = new StopWatch();
            try {
                PoolItem<T> item = idleItems.poll(checkoutTimeoutInMs, TimeUnit.MILLISECONDS);
                if (item == null) throw new PoolException("timeout to wait for next available resource", "POOL_TIME_OUT");
                return item;
            } catch (InterruptedException e) {
                throw new Error("interrupted during waiting for next available resource", e);
            } finally {
                logger.debug("wait for next available resource, pool={}, elapsed={}", name, watch.elapsed());
            }
        }
    
        private PoolItem<T> createNewItem() {
            var watch = new StopWatch();
            size.incrementAndGet();
            try {
                return new PoolItem<>(factory.get());
            } catch (Throwable e) {
                size.getAndDecrement();
                throw e;
            } finally {
                logger.debug("create new resource, pool={}, elapsed={}", name, watch.elapsed());
            }
        }
    
        public void refresh() {
            logger.info("refresh resource pool, pool={}", name);
            recycleIdleItems();
            replenish();
        }
    
        int activeCount() {
            return totalCount() - idleItems.size();
        }
    
        int totalCount() {
            return size.get();
        }
    
        private void recycleIdleItems() {
            Iterator<PoolItem<T>> iterator = idleItems.descendingIterator();
            long maxIdleTimeInMs = maxIdleTime.toMillis();
            long now = System.currentTimeMillis();
    
            while (iterator.hasNext()) {
                PoolItem<T> item = iterator.next();
                if (now - item.returnTime >= maxIdleTimeInMs) {
                    boolean removed = idleItems.remove(item);
                    if (!removed) return;
                    closeResource(item.resource);
                } else {
                    return;
                }
            }
        }
    
        private void replenish() {
            while (size.get() < minSize) {
                returnItem(createNewItem());
            }
        }
    
        private void closeResource(T resource) {
            size.decrementAndGet();
            try {
                resource.close();
            } catch (Exception e) {
                logger.warn("failed to close resource, pool={}", name, e);
            }
        }
    
        public void close() {
            size.set(maxSize);   // make sure no more new resource will be created
            while (true) {
                PoolItem<T> item = idleItems.poll();
                if (item == null) return;
                closeResource(item.resource);
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:2019-03-13 自定义连接池

          本文链接:https://www.haomeiwen.com/subject/pcmtmqtx.html