美文网首页
多线程与并发(九):线程池相关

多线程与并发(九):线程池相关

作者: lilykeke | 来源:发表于2021-09-10 09:21 被阅读0次

为什么说频繁的创建和销毁线程会浪费大量的系统资源?

线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间。在线程销毁时需要回收这些系统资源。频繁的创建和销毁线程会浪费大量的系统资源

线程池的作用

  • 利用线程池管理并复用线程、控制最大并发数等
  • 实现任务线程队列缓存策略和拒绝机制
  • 实现某些与时间相关的功能,如定时执行、周期执行等
  • 隔离线程环境。通过配置独立的线程池,将一些服务隔开,避免个服务相互影响

合理的使用线程池能够带来三个好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁的消耗
  • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行
  • 提高线程的可管理性。

1. 自定义线程池-阻塞队列

自定义线程池.jpg
public class ThreadPool {
    public static void main(String[] args) {
        Pool pool = new Pool(1, 1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
            //拒绝策略
            //queue.put(task); //死等
            //queue.offer(task,500,TimeUnit.MILLISECONDS); //带超时等待
            //System.out.println("放弃");
        });

        for (int i = 0; i < 3; i++) {
            int j = i;
            pool.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " execute " + j);

            });
        }
    }
}

@FunctionalInterface
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}

class Pool {
    private BlockingQueue<Runnable> taskQueue;

    //线程集合
    private HashSet<Worker> workers = new HashSet<>();

    //线程数
    private int coreSize;

    //获取任务的超时时间
    private long timeout;

    private TimeUnit unit;

    private RejectPolicy<Runnable> rejectPolicy;

    public Pool(int coreSize, long timeout, TimeUnit unit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.unit = unit;
        taskQueue = new BlockingQueue<>(queueCapacity);
        this.rejectPolicy = rejectPolicy;
    }

    //执行的方法
    public void execute(Runnable task) {
        synchronized (workers) {
            if (workers.size() < coreSize) {
                Worker worker = new Worker(task);
                System.out.println(Thread.currentThread().getName() + " add workers" + task);
                workers.add(worker);
                worker.start();
            } else {
                //taskQueue.put(task);

                //策略模式
                //1 死等 2. 带超时等待 3. 放弃任务执行 4. 抛出异常 5. 让调用者自己执行任务
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

    class Worker extends Thread {

        private Runnable task;

        public Worker(Runnable task) {

            this.task = task;
        }

        @Override
        public void run() {
            //while (task!= null || (task = taskQueue.take()) != null){
            while (task != null || (task = taskQueue.poll(timeout, unit)) != null) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 执行 " + task );
                    task.run();
                } catch (Exception e) {
                } finally {
                    task = null;
                }
            }
            synchronized (workers) {
                System.out.println(Thread.currentThread().getName() + " 移除");
                workers.remove(this);
            }
        }
    }

}

class BlockingQueue<T> {
    Deque<T> queue = new ArrayDeque<>();

    int capacity;

    Lock lock = new ReentrantLock();

    Condition emptyWaitSet = lock.newCondition();
    Condition fullWaitSet = lock.newCondition();

    public BlockingQueue(int queueCapacity) {
        this.capacity = queueCapacity;
    }

    //添加带超时的等待
    public T poll(long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while (queue.isEmpty()) {
                try {
                    //返回的是剩余的时间
                    if (nanos <= 0) {
                        System.out.println(Thread.currentThread().getName() + " 没等到,返回");
                        return null; //没等到
                    }
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T task = queue.removeFirst();
            System.out.println(Thread.currentThread().getName() + " 出队" + task);
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    //取任务
    public T take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 队列为空,等待");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            T task = queue.removeFirst();
            System.out.println(Thread.currentThread().getName() + " 出队 " + task);
            fullWaitSet.signal();
            return task;
        } finally {
            lock.unlock();
        }
    }

    //添加任务
    public void put(T task) {

        lock.lock();
        try {
            while (queue.size() == capacity) {
                try {
                    System.out.println(Thread.currentThread().getName() + " 队列已满,等待进入队列");
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            queue.addLast(task);
            System.out.println(Thread.currentThread().getName() + " 入队 " + task);
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    //带超时的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit unit) {
        lock.lock();
        try {
            long nanos = unit.toNanos(timeout);
            while (queue.size() == capacity) {
                try {
                    if (nanos <= 0) {
                        return false;
                    }
                    System.out.println(Thread.currentThread().getName() + " 队列已满,等待进入队列");
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(task);
            System.out.println(Thread.currentThread().getName() + " 入队 " + task);
            emptyWaitSet.signal();
            return true;
        } finally {
            lock.unlock();
        }
    }


    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            //判断队列是否已满
            if (queue.size() == capacity) {
                rejectPolicy.reject(this, task);
            } else {
                queue.addLast(task);
                emptyWaitSet.signal();
                System.out.println(Thread.currentThread().getName() + " 入队 " + task);
            }
        } finally {
            lock.unlock();
        }
    }
}

执行结果:
(放弃任务)


线程池.png
  • 自定义拒绝策略接口
@FunctionalInterface
interface RejectPolicy{
    void reject(BlockingQueue<T> queue,  T task);
}

2. ThreadPoolExecutor

2.1线程池状态

ThreadPoolExecutor 使用int 高三位来表示线程池状态,低29位表示线程池数量

状态名 高3位 接受新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接受新任务,会处理阻塞队列剩余任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为0即将进入终结
TERMINATED 011 - - 终结状态

这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次CAS原子操作进行赋值

//c为旧值,ctlOf返回的结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
//rs为高三位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

2.2 构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间--针对救急线程
  • unit 时间单位--针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂--可以为线程创建时起名字
  • handler 拒绝策略

最大线程数= 核心线程数+ 救急线程数

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务

  • 当线程数达到corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的线程

  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建maximunPoolSize-corePoolSize数目的线程来救急

  • 如果线程达到了maximunPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略jdk提供了四种实现,其实著名框架也提供了实现

    • AbortPolicy 让调用者抛出RejectExecutionException异常,这是默认策略
    • CallerRunsPolicy 让调用者运行任务
    • DiscardPolicy 放弃本次任务
    • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
    • Dubbo 的实现,在抛出 RejectExecutionException 异常之前会记录日志,并dump线程栈信息,方便定位问题
    • Netty 的实现,是创建一个新线程来执行任务
    • ActiveMQ的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
    • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中的每种拒绝策略
  • 当高峰过去后,超过coreSize的救急线程如果一段时间没有任务做,需要结束资源,这个时间由keepAliveTime 和 unit来控制。

根据这个构造方法,JDK Executors类中提供了众多工厂方法来创建各种用途的线程池

2.3 newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

特点:

  • 核心线程数== 最大线程数(没有救急线程被创建),因此无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

评价:

  • 适合于任务量已知,相对耗时的任务

2.4 newCacheThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

特点:

  • 核心线程数是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着
    • 全部是救急线程(60s后可以回收)
    • 救急线程可以无限创建
  • 队列采用了SynchronousQueue实现的特点是,它没有容量,没有线程来取是放不进去的(一手交钱一手交货)

评价:

  • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程。
  • 适合任务数比较密集,但每个任务执行时间短的情况

2.5 newSingleThreadPool

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
}

使用场景:
希望多个任务排队执行。线程数固定为1,任务数多于1时,会放入无界队列排队。任务执行完成,这唯一的线程不会被释放

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
  • 线程个数始终为1,不能修改
    • FinalizableDelegateExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor 中特有的方法
  • Executors.newFixedThreadPool(1) 初始为1,后面还可以修改
    • 对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改

2.6 提交任务

//执行任务
void execute(Runnable command);
//提交任务task, 用返回值Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
//提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
//提交tasks中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
//提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
//提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其他任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

2.7 关闭线程池

shutdown

/**
 * 线程池状态变为SHUTDOWN
 * 不会接收新任务
 * 但已提交任务会执行完
 * 此方法不会阻塞调用线程的执行
 */
void shutdown();
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改线程池状态
        advanceRunState(SHUTDOWN);
        //仅会打断空闲线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
    tryTerminate();
}

shutdownNow

/**
 * 线程池状态变为STOP
 * 不会接收新任务
 * 会将队列中的任务返回
 * 并用 interrupt 的方式中断正在执行的任务
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //修改线程池状态
        advanceRunState(STOP);
        //打断所有线程
        interruptWorkers();
        //获取队列中剩余任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

其他方法

//不在RUNNING 状态的线程池,此方法返回true
boolean isShutDown();

//线程池状态是否是TERMINATED
boolean isTerminated();

//调用shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池TERMINATED 后做一些事情,可以用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
public class ShutDownTest {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future1 = pool.submit(()->{
            System.out.println("running 1");
            Thread.sleep(1000);
            System.out.println("finish 1");
            return "1";

        });
        Future<String> future2 = pool.submit(()->{
            System.out.println("running 2");
            Thread.sleep(1000);
            System.out.println("finish 2");
            return "2";

        });

        Future<String> future3 = pool.submit(()->{
            System.out.println("running 3");
            Thread.sleep(1000);
            System.out.println("finish 3");
            return "3";

        });

        System.out.println("shutdown");

        pool.shutdown();

    }
}

执行结果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3


public class ShutDownTest {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future1 = pool.submit(()->{
            System.out.println("running 1");
            Thread.sleep(1000);
            System.out.println("finish 1");
            return "1";

        });
        Future<String> future2 = pool.submit(()->{
            System.out.println("running 2");
            Thread.sleep(1000);
            System.out.println("finish 2");
            return "2";

        });

        Future<String> future3 = pool.submit(()->{
            System.out.println("running 3");
            Thread.sleep(1000);
            System.out.println("finish 3");
            return "3";

        });

        System.out.println("shutdown");

        try {
            pool.awaitTermination(3, TimeUnit.SECONDS); //不用,因为不知道等多久合适
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("other...");

    }
}

执行结果:
shutdown
running 1
running 2
finish 1
finish 2
running 3
finish 3
other...


public class ShutDownTest {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future1 = pool.submit(()->{
            System.out.println("running 1");
            Thread.sleep(1000);
            System.out.println("finish 1");
            return "1";

        });
        Future<String> future2 = pool.submit(()->{
            System.out.println("running 2");
            Thread.sleep(1000);
            System.out.println("finish 2");
            return "2";

        });

        Future<String> future3 = pool.submit(()->{
            System.out.println("running 3");
            Thread.sleep(1000);
            System.out.println("finish 3");
            return "3";

        });

        System.out.println("shutdownNow");

        List<Runnable> task = pool.shutdownNow();//返回队列中任务

    }
}

执行结果:
shutdownNow
running 1
running 2


2.8. 池大小

  • 过小会导致程序不能充分利用系统资源、容易导致饥饿
  • 过大会导致更多的线程上下文切换,占用更多的资源

2.8.1 CPU密集型

通常采用 CPU核数 + 1 能够实现最优的CPU利用率,+1 是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,额外的这个线程就能顶上去,保证CPU时钟周期不被浪费

2.8.2 I/O密集型

CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当你执行I/O操作时,远程RPC调用时,包括进行数据库操作时,这时候CPU就闲下来了,你可以利用多线程提高他的利用率

经验公式如下
线程数 = 核数 * CPU利用率 * 总时间(CPU计算时间 + 等待时间)/ CPU计算时间

例如:4核CPU计算时间是50%,其他等待时间是50%,期望CPU被100%利用,套用公式

4*100% *100% / 50% = 8

2.9 任务调度线程池

在任务调度线程池功能加入之前,可以使用java.util.Timer来实现定时功能,Timer 的优点在于简单易用,但由于所有的任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

public class TimerTest {

    public static void main(String[] args){
        Timer timer = new Timer();

        TimerTask timerTask1 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(" task1");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        TimerTask timerTask2 = new TimerTask() {
            @Override
            public void run() {
                System.out.println(" task2");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        //使用Timer添加两个任务,希望他们都在 1s 后执行
        //Timer 内只有一个线程来顺序执行队列中的任务

        timer.schedule(timerTask1, 1000);
        timer.schedule(timerTask2, 1000);
    }
}

2.9.1 使用任务调度线程池改进

public class TimerTest {

    public static void main(String[] args){
        
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);

        pool.schedule(()->{
            System.out.println(" task1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1, TimeUnit.SECONDS);

        pool.schedule(()->{
            System.out.println(" task2");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 1, TimeUnit.SECONDS);

    }
}

3. Fork/Join

Fork/Join 是JDK 1.7 新加入的线程池实现,

相关文章

网友评论

      本文标题:多线程与并发(九):线程池相关

      本文链接:https://www.haomeiwen.com/subject/oqqpuktx.html