美文网首页Java并发编程
Java自定义带阻塞策略线程池

Java自定义带阻塞策略线程池

作者: Chermack | 来源:发表于2020-10-18 17:58 被阅读0次
    package com.example.concurrenttest.pool;
    
    import java.util.ArrayDeque;
    import java.util.Deque;
    import java.util.HashSet;
    import java.util.Optional;
    import java.util.Set;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class TestPool {
        public static void main(String[] args) {
            //带超时的线程池,超过指定时间没有任务会结束
            ThreadPool pool = new ThreadPool(2, 1, TimeUnit.SECONDS, 5, (queue, task) -> {
                //1 阻塞放入
    //            return queue.put(task);
                //2 带超时放入
                return queue.put(task, 1, TimeUnit.SECONDS);
                //3 放弃丢弃
    //            System.out.println("丢弃任务" + task);
    //            return false;
                //4 抛出异常
    //            throw new RuntimeException("队列已满!");
                //5 调用者自己执行
    //            task.run();
    //            return true;
            });
    
            for (int i = 0; i < 10; i++) {
                int j = i;
                pool.execute(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(j);
                });
            }
        }
    }
    
    /**
     * 拒绝策略
     * @param <T>
     */
    @FunctionalInterface
    interface RejectStrategy<T> {
        boolean reject(BlockingQueue<T> queue, T task);
    }
    
    class ThreadPool {
        private final BlockingQueue<Runnable> taskQueue;
        private final Set<Worker> workers = new HashSet<>();
        private final int coreSize;
        private final long times;
        private final TimeUnit timeUnit;
        private final RejectStrategy<Runnable> rejectStrategy;
    
    
        /**
         * 
         * @param coreSize          线程数
         * @param times             超时时间(超过times时间后没有任务结束线程,times=0则永不关闭)
         * @param timeUnit          时间单位
         * @param queueSize         排队任务最大数量
         * @param rejectStrategy    超过队列容量时的拒绝策略
         */
        public ThreadPool(int coreSize, long times, TimeUnit timeUnit, int queueSize,
                          RejectStrategy<Runnable> rejectStrategy) {
            this.coreSize = coreSize;
            this.times = times;
            this.timeUnit = timeUnit;
            this.rejectStrategy = rejectStrategy;
            taskQueue = new BlockingQueue<>(queueSize);
        }
    
        public void execute(Runnable task) {
            synchronized (workers) {
                if (workers.size() < coreSize) {
                    Worker worker = new Worker(task);
                    System.out.println("create worker:" + worker + "," + task);
                    workers.add(worker);
                    worker.start();
                } else {
                    System.out.println("try put to wait queue:" + task);
                    try {
                        boolean result = taskQueue.tryPut(task, rejectStrategy);
                        if (!result) {
                            System.out.println("任务"+task+"被拒绝");
                        }
                    } catch (RuntimeException e) {
                        System.out.println("任务"+task+"被拒绝");
                    }
                }
            }
        }
    
        /**
         * 线程包装类
         */
        class Worker extends Thread {
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            @Override
            public void run() {
                if (times == 0) {
                    runTask();
                } else {
                    runTimeoutTask();
                }
            }
    
            private void runTask() {
                while (true) {
                    System.out.println("begin task:" + task);
                    task.run();
                    task = taskQueue.take();
                }
            }
    
            private void runTimeoutTask() {
                Optional<Runnable> nextTask = Optional.of(task);
                while (nextTask.isPresent()) {
                    task = nextTask.get();
                    System.out.println("begin task:" + task);
                    task.run();
                    nextTask = taskQueue.take(times, timeUnit);
                }
                synchronized (workers) {
                    System.out.println("remove worker:" + this);
                    workers.remove(this);
                }
            }
        }
    }
    
    /**
     * 任务队列
     * @param <T>
     */
    class BlockingQueue<T> {
        //1.任务队列
        private final Deque<T> queue = new ArrayDeque<>();
        //2.lock
        private final ReentrantLock lock = new ReentrantLock();
        //3.消费者条件变量
        private final Condition fullWaitSet = lock.newCondition();
        //4.生产者条件变量
        private final Condition emptyWaitSet = lock.newCondition();
        //5.容量
        private final int capcity;
    
        public BlockingQueue(int capcity) {
            this.capcity = capcity;
        }
    
        //阻塞获取元素
        public T take() {
            lock.lock();
    
            try {
                while (isEmpty()) {
                    try {
                        emptyWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T t = queue.removeFirst();
                fullWaitSet.signal();
                return t;
            } finally {
                lock.unlock();
            }
        }
    
        //带超时的获取元素
        public Optional<T> take(long timeout, TimeUnit unit) {
            lock.lock();
            try {
                long nanos = unit.toNanos(timeout);
                while (isEmpty()) {
                    try {
                        if (nanos <= 0) {
                            return Optional.empty();
                        }
                        nanos = emptyWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T t = queue.removeFirst();
                fullWaitSet.signal();
                return Optional.of(t);
            } finally {
                lock.unlock();
            }
        }
    
        //放入元素
        public boolean put(T element) {
            lock.lock();
            try {
                while (size() == capcity) {
                    try {
                        fullWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                queue.addLast(element);
                emptyWaitSet.signal();
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        //带超时的放入元素
        public boolean put(T t, long timeout, TimeUnit unit) {
            lock.lock();
            try {
                long nanos = unit.toNanos(timeout);
                while (isFull()) {
                    try {
                        if (nanos <= 0) {
                            return false;
                        }
                        nanos = emptyWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                queue.addLast(t);
                emptyWaitSet.signal();
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        //尝试放入元素,如果未满则直接放入,否则
        public boolean tryPut(T task, RejectStrategy<T> rejectStrategy) {
            lock.lock();
            try {
                if (isFull()) {
                    return rejectStrategy.reject(this, task);
                } else {
                    queue.addLast(task);
                    emptyWaitSet.signal();
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    
        public int size() {
            return queue.size();
        }
    
        public boolean isEmpty() {
            return queue.size() == 0;
        }
    
        public boolean isFull() {
            return queue.size() == capcity;
        }
    }
    

    相关文章

      网友评论

        本文标题:Java自定义带阻塞策略线程池

        本文链接:https://www.haomeiwen.com/subject/zfmcmktx.html