并发编程漫谈

作者: 竹羔 | 来源:发表于2017-06-13 21:24 被阅读73次

    几个基本概念

    • 为什么需要并发?
      这些年。并发。多线程这些词不绝于耳。
      感觉现在没有没有搭上这些词都感觉落后了。但是想专注学一个东西时要先搞清为什么。
      那么。为什么需要并发?
      我看过一些资料上有几个原因。但我个人偏看好以下这点是主要原因。
      由于硬件的发展。近年来的cpu主频已经到达了瓶颈。为了提高性能。而转向多核的集成。这时候把传统串行程序改造成串行+并行的程序(并行部分就能够合理利用多核的优势)。
      那有哪些领域需要这些性能呢?
      毫无疑问,就是计算量大,访问量大的领域需要这些性能。自然而然也就是图像处理和服务端了。
      除了服务端和图像处理领域。其他领域不建议使用并发编程。毕竟并发编程就是典型的把简单问题复杂化的做法。

    • 概念之间的关系

    概念关系图
    • 并发、并行
      并发:在单核处理器上实现多线程编程,即多个任务一起执行,表面上我们感受到是一起执行的,但实际上是交替执行的(串行)
      并行:在多核处理器上能把并发上交替执行的部分变成同时执行。

    • 同步、异步
      同步:主线程在同步调用时,只有等到子线程线程完成方法调用返回后,主线程才能开始下面的任务。
      异步:异步调用时,主线程会开辟一个单独的线程来处理这个任务,然后直接处理主线程接下来的任务,而不等待子线程调用返回

    • 同步、互斥
      在操作系统课上。谈到进程时就有讲到进程之间的关系,同步和互斥。作为『轻量级的进程』--线程,同样也有这样的关系。具体概念如下:
      互斥:在一个线程执行某一任务时,不允许其他线程一起来执行这个任务,只能等待他执行完成才能接着执行该任务
      同步:多个线程之间按照规定的次序来执行一项任务,在某一线程运行期间同样具有排他性(即是一种更为复杂的互斥)。这里和上面同步的概念并没有矛盾。只是看待角度不一样。其实是同一个东西。

    • 临界区
      表示一种共享资源/数据。在多线程环境中,临界资源应该只有一个线程能调用他,其他线程必须等待其释放这个资源。

    • 堵塞、非堵塞
      由于临界区的存在,并发需要得到控制。也就有了堵塞的概念。
      堵塞:一个线程被堵塞,那么在其他线程释放资源前,他是无法运行的。
      非堵塞:具有三种级别:无障碍,无锁,无等待。(非堵塞级别逐步增高)。他们之间的差别这里不谈,只是会说下非堵塞是采取某些手段使得临界区资源可以被多个线程同时访问,某种意义上提高了程序的并行比例(注1),从而提高性能。

    • 死锁、饥饿
      由于临界区的存在,线程需要等待,在等待时会出现以下情况:
      死锁:线程a持有临界资源1,并且需要临界资源2才能处理接下来的任务,与此同时,线程b持有临界资源2,并且需要临界资源1才能处理接下来的任务。这个时候两个线程彼此都在等待,就产生了死锁。
      饥饿:解释饥饿需要引入线程的优先级,对于优先级高的线程会被优先处理,而导致优先级低的线程一直处于等待状态。这时候就是饥饿。

    多线程的协作

    基本操作

    • wait、notify、notifyAll
      在synchronized块中,一个线程A在处理一个临界资源对象object可以调用wait(为了协调多个线程之间的工作),object.wait();这个时候线程A被置入object的等待队列,并且释放object的锁,另外一个线程B由于临界资源的没有被占用,自然也是可以取得该object来做相应的处理,这时候线程B由于业务上的需要可以通知线程A继续任务时,就可以调用object.notifyAll();或者object.notify();(在明确等待队列只有一个时。)。值得注意的是。线程B调用这个函数的同时,并不会马上释放object的锁,而是要等到线程B执行完毕后才会释放,也就是说线程A要等到线程B完全执行完任务后才能被唤醒。

    • interrupt、sleep
      interrupt:一个线程调用这个方法并不会直接使其退出,而是给该线程发送一个退出的消息,至于什么时候退出由线程自己决定。
      sleep:让当前线程休眠一段时间

    • join
      在main函数中,若新建一个线程A并执行,然后要等待A执行完得到某些结果,main函数根据这些结果才接着执行,这时候就可以在main函数中接着调用a.join();。这样就能完成上述需求。
      同样。在A线程需要其他多个线程,例如B,C,D线程的处理结果时,可以在A线程创建好BCD线程并start这些线程后,调用b.join();c.join();d.join();即可

    jdk提供的工具

    一般开发过程中,不会直接使用上述基本方法,而是直接使用jdk提供的工具。这些工具让我们更好的去处理并发问题。

    • ReentrantLock、Condition
      这两个东西分别代替synchronized和wait,notify。
      在一个实现了Runnable的类A中,可以创建一个ReentrantLock lock,当多个实现类A的线程对象在操作临界资源前,可以使用lock.lock();来加锁,主要要在try-catch-finally块中的finally部分调用lock.unlock();来解锁,否则其他线程无法取得锁进而一直处于等待状态。

    在上述情景中,lock.newCondition()可以取得一个Contition c对象,这个对象具有c.await();和c.signal()、c.signalAll()方法,对应wait,notify,notifyAll

    • ReadWriteLock
      读写锁是非堵塞的概念的一个实现,他能够让多个线程访问临界资源时的读与读操作之间不堵塞。但是写-写,读-写,写-读这三个方面仍然是堵塞的。
    ReentrantReadWriteLock lock=new ReentrantReadWriteLock(); 
    lock.readLock();
    lock.writeLock();
    

    可以分别取得读锁和写锁。

    具体使用可以参考
    Java并发编程实战P236
    实战Java高并发程序设计的p85
    JDK8 API中java.util.concurrent.locks包下的ReentrantReadWriteLock类中的sample demo
    (ReentrantLock的使用和这个没有基本差别,至于Condition部分也请自行找具体demo,在下面参考资料上都有基本的用法实例。之后的内容也一样。)

    线程池

    为了避免频繁创建和销毁线程所带来的负担,我们可以让创建的线程进行复用。

    jdk支持
    • 基本用法:
    ExecutorService s=Executors.newFixedThreadPool(3);//Executors为工厂
    for(int i=0;i<10;i++){
       s.execute(new Task());//Task实现了Runnable
    }
    

    即在一个3个线程的线程池里复用线程来解决这10个任务。

    • 4种线程池
      public static ExecutorService newFixedThreadPool(int nThreads)
      public static ExecutorService newSingleThreadExecutor()
      public static ExecutorService newCachedThreadPool()
      public static ScheduledExecutorServicenewScheduledThreadPool(int corePoolSize)
    • jdk8有新增:


      Executors工厂

    至于这些线程池的说明可以参考其他资料,这里就不赘述了。

    内部实现
    • 线程池框架结构图


      线程池框架结构图
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
    //jdk8新增
    public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }
    public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
            if (executor == null)
                throw new NullPointerException();
            return new DelegatedExecutorService(executor);
        }
    

    从源码和类图可以看到,Executors类要么是调用ThreadPoolExecutor,要么调用ScheduledThreadPoolExecutor来创建以上4个线程池。(jdk8里面新增的线程池是通过DelegatedExecutorService和ForkJoinPool来创建的。)

    现在我们着重分析ThreadPoolExecutor的实现:

    ThreadPoolExecutor构造器

    从ThreadPoolExecutor构造器中我们可以看到他有几个重载形式,其中包含全部参数的构造器代码如下:

    /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters and default thread factory.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code handler} is null
         */
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    

    具体参数分析:
    前面4个参数根据javadoc上的注释已经足够了,这里着重看后面两个参数:

    • BlockingQueue<Runnable> workQueue
      这个队列用来存储已提交但并未处理的Runnable对象。至于BlockingQueue有许多类实现其中一个为LinkedBlockingQueue,这个队列如名所述,是一个线程安全的链表实现的队列,也就能够存无限多的Runnable对象,直至资源耗尽。当然也有ArrayListBlockingQueue这个基于数组的实现,也就是等待队列具有最大容量,达到最大容量就执行RejectedExecutionHandler。
      当然,还有其他形式的队列:
      1.比如newScheduledThreadPool中,这个队列传入参数就是DelayedWorkQueue(),这个队列是一个特殊的延迟队列,基于堆的数据结构,在取消该延迟任务所需要的查找能力和移除该任务能力都很出色。也因为实现了BlockingQueue,所以具备应当有的线程安全能力。
      2.在newCachedThreadPool中,这个队列传入参数是SynchronousQueue,这个队列也是一个特殊的BlockingQueue实现,SynchronousQueue没有容量,所提交任务都会直接交于线程池处理。这里的maximumPoolSize被设置为Integer.MAX_VALUE,就是为了防止出现RejectedExecutionHandler。
      3.PriorityBlockingQueue等。。。
    • RejectedExecutionHandler handler
      内置的4种拒绝策略实现如下(解释截取对应实现类的javadoc):
      1.CallerRunsPolicy:A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, unless the executor has been shut down, in which case the task is discarded.
      2.DiscardOldestPolicy:A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded.
      3.AbortPolicy:A handler for rejected tasks that throws a RejectedExecutionException.
      4.DiscardPolicy:A handler for rejected tasks that silently discards the rejected task.
    RejectedExecutionHandler实现

    了解了这些参数后再来看下线程池工作的核心代码
    ThreadPoolExecutor的execute方法:

    /**
         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         *
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@code RejectedExecutionHandler}.
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    

    这里对方法内注释做点自己的理解:
    1.线程池接受到新任务时,若正在运行的线程数量少于corePoolSize,则将任务通过addWork()直接执行并返回;
    2.当workerCountOf(c) > corePoolSize时,会通过workQueue.offer加入等待队列(会做两次检查),但是加入等待队列时有两种情况:a成功加入时,该任务就等待执行,b加入失败时,通过addWorker(null, false);提交给线程池处理,当线程池达到maximumPoolSize就执行RejectedExecutionHandler;若未达到就分配线程来执行。

    具备异常堆栈线程池

    参考《实战Java高并发程序设计》P113章节

    注解

    1.在衡量并发程序比之前串行程序性能提高多少时,有一个概念:并发程序并不是每一个部分都能并发的,有涉及到临界区时也是和原来串行一样,所以并发程序=并发部分+串行部分。一般来说,并发部分越多,其性能也就越高。而非堵塞这个机制就是提高并发部分的一个手段。

    参考资料

    JDK API 并发部分
    JDK源码 线程池部分
    《Java并发编程实战》
    《实战Java高并发程序设计》

    相关文章

      网友评论

        本文标题:并发编程漫谈

        本文链接:https://www.haomeiwen.com/subject/vofjqxtx.html