美文网首页Java技术Java多线程
【浅谈Java并发编程中的若干核心技术】

【浅谈Java并发编程中的若干核心技术】

作者: 一字马胡 | 来源:发表于2020-08-23 10:16 被阅读0次

作者: 一字马胡
转载标志1 【2017-11-01】
转载标志2 【内容来自 一字马胡】

更新日志

日期 更新内容 备注
2017-11-01 新建文章 V1
2018-05-21 新增无锁并发设计须知 V2
2020-08-23 重新发布 V3

批注-2020-08-23
文章内容较多,且因为写的比较早,有些内容可能存在错误或者不再正确,文章会被持续迭代,整理下来只是为了后续能不断回头看看,不断迭代,仅此而已。

本文主要内容索引

1、Java线程
2、线程模型
3、Java线程池
4、Future(各种Future)
5、Fork/Join框架
6、volatile
7、CAS(原子操作)
8、AQS(并发同步框架)
9、synchronized(同步锁)
10、并发队列(阻塞队列)
11、无锁并发设计须知

本文仅分析java并发编程中的若干核心问题,对于上面没有提到但是又和java并发编程有密切关系的技术将会不断添加进来完善文章,本文将长期更新,不断迭代。本文试图从一个更高的视觉来总结Java语言中的并发编程内容,希望阅读完本文之后,可以收获一些内容,至少应该知道在java中做并发编程实践的时候应该注意什么,应该关注什么,如何保证线程安全,以及如何选择合适的工具来满足需求。当然,更深层次的内容就会涉及到jvm层面的知识,包括底层对java内存的管理,对线程的管理等较为核心的问题,当然,本文的定位在于抽象与总结,更为具体而深入的内容就需要自己去实践,考虑到可能篇幅过长、重复描述某些内容,以及自身技术深度等原因,本文将在深度和广度上做一些权衡,某些内容会做一些深入的分析,而有些内容会一带而过,点到为止,总之,本文就当是对学习java并发编程内容的一个总结,以及给哪些希望快速了解java并发编程内容的读者抛砖引玉,不足之处还望指正。

Java线程

一般来说,在java中实现高并发是基于多线程编程的,所谓并发,也就是多个线程同时工作,来处理我们的业务,在机器普遍多核心的今天,并发编程的意义极为重大,因为我们有多个cpu供线程使用,如果我们的应用依然只使用单线程模式来工作的话,对极度浪费机器资源的。所以,学习java并发知识的首要问题是:如何创建一个线程,并且让这个线程做一些事情?这是java并发编程内容的起点,下面将分别介绍多个创建线程,并且让线程做一些事情的方法。

继承Thread类

继承Thread类,然后重写run方法,这是第一种创建线程的方法。run方法里面就是我们要做的事情,可以在run方法里面写我们想要在新的线程里面运行的任务,下面是一个小例子,我们继承了Thread类,并且在run方法里面打印出了当然线程的名字,然后sleep1秒中之后就退出了:


/**
 * Created by hujian06 on 2017/10/31.
 * 
 * the demo of thread
 */
public class ThreadDemo {
    
    public static void main(String ... args) {
        
        AThread aThread = new AThread();
        
        //start the thread
        aThread.start();
        
    }
    
}

class AThread extends Thread {
    @Override
    public void run() {
        System.out.println("Current Thread Name:" +
                Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


如果我们想要启动这个线程,只需要像上面代码中那样,调用Thread类的start方法就可以了。

实现Runnable接口

启动一个线程的第二种方法是实现Runnable接口,然后实现其run方法,将你想要在新线程里面执行的业务代码写在run方法里面,下面的例子展示了这种方法启动线程的示例,实现的功能和上面的第一种示例是一样的:


/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of Runnable
 */
public class ARunnableaDemo {
    
    public static void main(String ... args) {
        
        ARunnanle aRunnanle = new ARunnanle();
        Thread thread = new Thread(aRunnanle);
        
        thread.start();
        
    }
    
}

class ARunnanle implements Runnable {

    @Override
    public void run() {
        System.out.println("Current Thread Name:" +
                Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


在启动线程的时候,依然还是使用了Thread这个类,只是我们在构造函数中将我们实现的Runnable对象传递进去了,所以在我们执行Thread类的start方法的时候,实际执行的内容是我们的Runnable的run方法。

使用FutureTask

启动一个新的线程的第三种方法是使用FutureTask,下面来看一下FutureTask的类图,就可以明白为什么可以使用FutureTask来启动一个新的线程了:

FutureTask的类图

从FutureTask的类图中可以看出,FutureTask实现了Runnable接口和Future接口,所以它兼备Runnable和Future两种特性,下面先来看看如何使用FutureTask来启动一个新的线程:


import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of FutureTask
 */
public class FutureTaskDemo {

    public static void main(String ... args) {
        
        ACallAble callAble = new ACallAble();
        
        FutureTask<String> futureTask = new FutureTask<>(callAble);
        
        Thread thread = new Thread(futureTask);
        
        thread.start();
        
        do {
            
        }while (!futureTask.isDone());

        try {
            String result = futureTask.get();
            
            System.out.println("Result:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

    }

}

class ACallAble implements Callable<String> {

    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        return "Thread-Name:" + 
                Thread.currentThread().getName();
    }
}


可以看到,使用FutureTask来启动一个线程之后,我们可以监控这个线程是否完成,上面的示例中主线程会一直等待这个新创建的线程直到它返回,其实只要是Future提供的接口,我们在FutureTask中都可以使用,这极大的方便了我们,Future在并发编程中的意义极为重要,Future代表一个未来会发生的东西,它是一种暗示,一种占位符,它示意我们它可能不会立即得到结果,因为它的任务还在运行,但是我们可以得到一个对这个线程的监控对象,我们可以对线程的执行做一些判断,甚至是控制,比如,如果我们觉得我们等了太久,并且我们觉得没有必要再等待下去的时候,就可以将这个Task取消,还有一点需要提到的是,Future代表它可能正在运行,也可能已经返回,当然Future更多的暗示你可以在等待这个结果的同时可以使用其他的线程做一些其他的事情,当你真的需要这个结果的时候再来获取就可以了,这就是并发,理解这一点非常重要。

本小节通过介绍三种创建并启动一个新线程的方法,为进行并发编程开了一个头,目前,我们还只是在能创建多个线程,然后让多个线程做不同个的事情的阶段,当然,这是学习并发编程最为基础的,无论如何,现在,我们可以让我们的应用运行多个线程了,下面的文章将会基于这个假设(一个应用开启了多个线程)讨论一些并发编程中值得关注的内容。关于本小节更为详细的内容,可以参考文章Java CompletableFuture中的部分内容。

线程模型

我们现在可以启动多个线程,但是好像并没有形成一种类似于模型的东西,非常混乱,并且到目前为止我们的多个线程依然只是各自做各自的事情,互不相干,多个线程之间并没有交互(通信),这是最简单的模型,也是最基础的模型,本小节试图介绍线程模型,一种指导我们的代码组织的思想,线程模型确定了我们需要处理那些多线程的问题,在一个系统中,多个线程之间没有通信是不太可能的,更为一般的情况是,多个线程共享一些资源,然后相互竞争来获取资源权限,多个线程相互配合,来提高系统的处理能力。正因为多个线程之间会有通信交互,所以本文接下来的讨论才有了意义,如果我们的系统里面有几百个线程在工作,但是这些线程互不相干,那么这样的系统要么实现的功能非常单一,要么毫无意义(当然不是绝对的,比如Netty的线程模型)。

继续来讨论线程模型,上面说到线程模型是一种指导代码组织的思想,这是我自己的理解,不同的线程模型需要我们使用不同的代码组织,好的线程模型可以提高系统的并发度,并且可以使得系统的复杂度降低,这里需要提一下Netty 4的线程模型,Netty 4的线程模型使得我们可以很容易的理解Netty的事件处理机制,这种优秀的设计基于Reactor线程模型,Reactor线程模型分为单线程模型、多线程模型以及主从多线程模型,Netty的线程模型类似于Reactor主从多线程模型。

当然线程模型是一种更高级别的并发编程内容,它是一种编程指导思想,尤其在我们进行底层框架设计的时候特别需要注意线程模型,因为一旦线程模型设计不合理,可能会导致后面框架代码过于复杂,并且可能因为线程同步等问题造成问题不可控,最终导致系统运行失控。类似于Netty的线程模型是一种好的线程模型,下面展示了这种模型:

Netty线程模型

简单来说,Netty为每个新建立的Channel分配一个NioEventLoop,而每个NioEventLoop内部仅使用一个线程,这就避免了多线程并发的同步问题,因为为每个Channel处理的线程仅有一个,所以不需要使用锁等线程同步手段来做线程同步,在我们的系统设计的时候应该借鉴这种线程模型的设计思路,可以避免我们走很多弯路。关于线程池以及Netty线程池这部分的内容,可以参考文章Netty线程模型及EventLoop详解

Java线程池

池化技术是一种非常有用的技术,对于线程来说,创建一个线程的代价是很高的,如果我们在创建了一个线程,并且让这个线程做一个任务之后就回收的话,那么下次要使用线程来执行我们的任务的时候又需要创建一个新的线程,是否可以在创建完成一个线程之后一直缓冲,直到系统关闭的时候再进行回收呢?java线程池就是这样的组件,使用线程池,就没必要频繁创建线程,线程池会为我们管理线程,当我们需要一个新的线程来执行我们的任务的时候,就向线程池申请,而线程池会从池子里面找到一个空闲的线程返回给请求者,如果池子里面没有可用的线程,那么线程池会根据一些参数指标来创建一个新的线程,或者将我们的任务提交到任务队列中去,等待一个空闲的线程来执行这个任务。细节内容在下文中进行分析,目前我们只需要明白,线程池里面有很多线程,这些线程会一直到系统关系才会被回收,否则一直会处于处理任务或者等待处理任务的状态。

首先,如何使用线程池呢?下面的代码展示了如何使用java线程池的例子:


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by hujian06 on 2017/10/31.
 *
 * the demo of Executors
 */
public class ExecutorsDemo {

    public static void main(String ... args) {

        int cpuCoreCount = Runtime.getRuntime().availableProcessors();
        AThreadFactory threadFactory = new AThreadFactory();
        ARunnanle runnanle = new ARunnanle();
        
        ExecutorService fixedThreadPool=
                Executors.newFixedThreadPool(cpuCoreCount, threadFactory);

        ExecutorService cachedThreadPool = 
                Executors.newCachedThreadPool(threadFactory);

        ScheduledExecutorService newScheduledThreadPool = 
                Executors.newScheduledThreadPool(cpuCoreCount, threadFactory);

        ScheduledExecutorService singleThreadExecutor = 
                Executors.newSingleThreadScheduledExecutor(threadFactory);
        
        fixedThreadPool.submit(runnanle);
        cachedThreadPool.submit(runnanle);
        newScheduledThreadPool.scheduleAtFixedRate(runnanle, 0, 1, TimeUnit.SECONDS);
        singleThreadExecutor.scheduleWithFixedDelay(runnanle, 0, 100, TimeUnit.MILLISECONDS);

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        fixedThreadPool.shutdownNow();
        cachedThreadPool.shutdownNow();
        newScheduledThreadPool.shutdownNow();
        singleThreadExecutor.shutdownNow();
    }

}

class ARunnable implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("Current Thread Name:" + 
                Thread.currentThread().getName());
    }
}

/**
 * the thread factory
 */
class AThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    @Override
    public Thread newThread(Runnable r) {
        return new Thread("aThread-" + threadNumber.incrementAndGet());
    }
}

更为丰富的应用应该自己去探索,结合自身的需求来借助线程池来实现,下面来分析一下Java线程池实现中几个较为重要的内容。

ThreadPoolExecutor和ScheduledThreadPoolExecutor

ThreadPoolExecutor和ScheduledThreadPoolExecutor是java实现线程池的核心类,不同类型的线程池其实就是在使用不同的构造函数,以及不同的参数来构造出ThreadPoolExecutor或者ScheduledThreadPoolExecutor,所以,学习java线程池的重点也在于学习这两个核心类。前者适用于构造一般的线程池,而后者继承了前者,并且很多内容是通用的,但是ScheduledThreadPoolExecutor增加了schedule功能,也就是说,ScheduledThreadPoolExecutor使用于构造具有调度功能的线程池,在需要周期性调度执行的场景下就可以使用ScheduledThreadPoolExecutor。关于ThreadPoolExecutor与ScheduledThreadPoolExecutor较为详细深入的分析可以参考下面的文章:

下面展示了ThreadPoolExecutor和ScheduledThreadPoolExecutor的类图,可以看出他们的关系,以及他们的继承关系:

ThreadPoolExecutor类图 ScheduledThreadPoolExecutor类图

关于较为细节的内容不再本文的叙述范围之内,如果想要了解这些内容的详细内容,可以参考文章中给出的链接,这些文章较为深入的分析和总结了相关的内容。

上文中提到,线程池会管理着一些线程,这些线程要么处于运行状态,要么处于等待任务的状态,当然这只是我们较为形象的描述,一个线程的状态不仅有运行态与等待状态,还有其他的状态,但是对我我们来说,线程池里面的线程确实是要么处于运行状态,要么处于等待任务的状态,这体现在,当我们向一个线程池提交一个任务的时候,可能会被等待任务的线程立即执行,但是可能线程池里面的线程都处于忙碌状态,那么我们提交的任务就会被加入到等待运行的任务队列中去,当有空闲线程了,或者队列也满了,那么线程池就会采用一些策略来执行任务,并且在某些时刻会拒绝提交的任务,这些细节都可以在ThreadPoolExecutor的实现中找到。

在线程池的实现中,有一个角色特别重要,那就是任务队列,当线程池里面没有空闲的线程来执行我们的任务的时候,我们的任务就会被添加到任务队列中去等待执行,而这个任务队列可能会被多个线程并发读写,所以需要支持多线程安全访问,java提供了一类支持并发环境的队列,称为阻塞队列,这是一类特殊的队列,他们的使用时非常广泛的,特别是在jdk自身的类库建设上,当然在我们实际的工作中也是有很多使用场景的。

关于ThreadPoolExecutor是如何处理一个提交的任务的细节,可以参考下面的代码:


   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

下面来看一下java中借助ThreadPoolExecutor来构造的几个线程池的特性:

1、newFixedThreadPool

使用ThreadPoolExecutor构造一个newCachedThreadPool的流程如下:


    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }


    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }    

在任意时刻,newFixedThreadPool构造出来的线程池中最多只可能存活着nThreads个线程,如果所有的线程都在运行任务,那么这个时候提交的任务将会被添加到任务队列中去等待执行。我们可以控制corePoolSize和maximumPoolSize来使得通过ThreadPoolExecutor构造出来的线程池具有一些不一样的特性,但是需要注意的是,当我们设置的maximumPoolSize大于corePoolSize的时候,如果当前线程池里面的线程数量已经达到了corePoolSize了,并且当前所以线程都处于运行任务的状态,那么在这个时候提交的任务会被添加到任务队列中去,只有在任务队列满了的时候,才会去创建新的线程,如果线程数量已经达到了maximumPoolSize了,那么到此就会拒绝提交的任务,这些流程可以参考上面展示出来的execute方法的实现。该类型的线程池使用的任务队列是LinkedBlockingQueue类型的阻塞队列。

2、newCachedThreadPool

通过ThreadPoolExecutor构造一个newCachedThreadPool线程池的流程如下:


    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
        

newCachedThreadPool适合于类似秒杀系统中,它可以按需创建线程。每个线程在空闲了一段时间之后会被回收,然后需要创建的时候再创建出来,在使用的时候应该使用合适的构造参数。该类型使用的任务队列是SynchronousQueue这种同步队列,这是一种特别的队列,每个线程都是有使命的,每个线程都会等待另外一个线程和自己交易,在交易完成之前都会阻塞住线程,他们之间有一种传递关系,数据是从一个线程直接传递到例外一个线程中去的,SynchronousQueue这种队列不存储实际的数据,而是存储着一些线程的信息,而SynchronousQueue管理着这些线程之间的交易,更为详细的细节参考后面的文章。

上面提到,ScheduleThreadPoolExecutor是继承自ThreadPoolExecutor的,而且从类图中也可以看出来这种关系,所以其实ScheduleThreadPoolExecutor是对ThreadPoolExecutor的增强,它增加了schedule功能,使用与那些需要周期性调度执行,或者是延时执行的任务,在ScheduleThreadPoolExecutor中使用了一种阻塞队列称为延时阻塞队列,这种队列有能力持有一段时间数据,我们可以设定这种时间,时间没到的时候尝试获取数据的线程会被阻塞,直到设定的时间到了,线程才会被唤醒来消费数据。而关于ScheduleThreadPoolExecutor是如何运作的,包括他的周期性任务调度是如何工作的,可以参考上面提到的链接。

Future

Future代表一种未来某个时刻会发生的事情,在并发环境下使用Future是非常重要的,使用Future的前提是我们可以容许线程执行一段时间来完成这个任务,但是需要在我们提交了任务的时候就返回一个Future,这样在接下来的时间程序员可以根据实际情况来取消任务或者获取任务,在多个任务没有相互依赖关系的时候,使用Future可以实现多线程的并发执行,多个线程可以执行在不同的处理器上,然后在某个时间点来统一获取结果就可以了。上文中已经提到了FutureTask,FutureTask既是一种Runnable,也是一种Future,并且结合了两种类型的特性。下面展示了Future提供的一些方法,使用这些方法可以很方便的进行任务控制:


public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在java 8中增加了一个新的类CompletableFuture,这是对Future的极大增强,CompletableFuture提供了非常丰富的操作可以来控制我们的任务,并且可以根据多种规则来关联多个Future。关于CompletableFuture的详细分析总结可以参考文章Java CompletableFuture

Fork/Join框架

Fork/Join框架是一种并行框架,它可以将一个较大的任务切分成一些小任务来执行,并且多个线程之间会相互配合,每个线程都会有一个任务队列,对于某些线程来说它们可能很快完成了自己的任务队列中的任务,但是其他的线程还没有完成,那么这些线程就会去窃取那些还没有完成任务执行的线程的任务来执行,这成为“工作窃取”算法,关于Fork/Join中的工作窃取,其实现还是较为复杂的,如果想对Fork/Join框架有一个大致的认识,可以参考文章Java Fork/Join并行框架。下面展示了Fork/Join框架的工作模式:

Fork/Join工作模式

可以从上面的图中看出,一个较大的任务会被切分为一个小任务,并且小任务还会继续切分,直到符合我们设定的执行阈值,然后就会执行,执行完成之后会进行join,也就是将小任务的结果组合起来,组装出我们提交的整个任务的结果,这是一种非常先进的工作模式,非常有借鉴意义。当然,使用Fork/Join框架的前提是我们的任务时可以拆分成小任务来执行的,并且小人物的结果可以组装出整个大任务的结果,归并排序是一种可以借助Fork/Join框架来提供处理速度的算法,下面展示了使用Fork/Join框架来执行归并排序的代码,可以试着调整参数来进行性能测试:


import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * Created by hujian06 on 2017/10/23.
 *
 * merge sort by fork/join
 */
public class ForkJoinMergeSortDemo {

    public static void main(String ... args) {
        new Worker().runWork();
    }

}

class Worker {

    private static final boolean isDebug = false;

    public void runWork() {

        int[] array = mockArray(200000000, 1000000); // mock the data

        forkJoinCase(array);
        normalCase(array);

    }

    private void printArray(int[] arr) {

        if (isDebug == false) {
            return;
        }

        for (int i = 0; i < arr.length; i ++) {
            System.out.print(arr[i] + " ");
        }

        System.out.println();
    }

    private void forkJoinCase(int[] array) {
        ForkJoinPool pool = new ForkJoinPool();

        MergeSortTask mergeSortTask = new MergeSortTask(array, 0, array.length - 1);

        long start = System.currentTimeMillis();

        pool.invoke(mergeSortTask);

        long end = System.currentTimeMillis();

        printArray(array);

        System.out.println("[for/join mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
                array.length + " items' sort work.");
    }

    private void normalCase(int[] array) {

        long start = System.currentTimeMillis();

        new MergeSortWorker().sort(array, 0, array.length - 1);

        long end = System.currentTimeMillis();

        printArray(array);

        System.out.println("[normal mode]Total cost: " + (end - start) / 1000.0 + " s, for " +
                array.length + " items' sort work.");
    }

    private static final  int[] mockArray(int length, int up) {
        if (length <= 0) {
            return null;
        }

        int[] array = new int[length];

        Random random = new Random(47);

        for (int i = 0; i < length; i ++) {
            array[i] = random.nextInt(up);
        }

        return array;
    }
}

class MergeSortTask extends RecursiveAction {

    private static final int threshold = 100000;
    private final MergeSortWorker mergeSortWorker = new MergeSortWorker();

    private int[] data;

    private int left;
    private int right;

    public MergeSortTask(int[] array, int l, int r) {
        this.data = array;
        this.left = l;
        this.right = r;
    }

    @Override
    protected void compute() {
        if (right - left < threshold) {
            mergeSortWorker.sort(data, left, right);
        } else {
            int mid = left + (right - left) / 2;
            MergeSortTask l = new MergeSortTask(data, left, mid);
            MergeSortTask r = new MergeSortTask(data, mid + 1, right);

            invokeAll(l, r);

            mergeSortWorker.merge(data, left, mid, right);
        }
    }
}

class MergeSortWorker {

    // Merges two subarrays of arr[].
    // First subarray is arr[l..m]
    // Second subarray is arr[m+1..r]
    void merge(int arr[], int l, int m, int r) {
        // Find sizes of two subarrays to be merged
        int n1 = m - l + 1;
        int n2 = r - m;

        /* Create temp arrays */
        int L[] = new int[n1];
        int R[] = new int[n2];

        /*Copy data to temp arrays*/
        for (int i = 0; i < n1; ++i)
            L[i] = arr[l + i];
        for (int j = 0; j < n2; ++j)
            R[j] = arr[m + 1 + j];

        /* Merge the temp arrays */

        // Initial indexes of first and second subarrays
        int i = 0, j = 0;

        // Initial index of merged subarry array
        int k = l;
        while (i < n1 && j < n2) {
            if (L[i] <= R[j]) {
                arr[k ++] = L[i ++];
            } else {
                arr[k ++] = R[j ++];
            }
        }

        /* Copy remaining elements of L[] if any */
        while (i < n1) {
            arr[k ++] = L[i ++];
        }

        /* Copy remaining elements of R[] if any */
        while (j < n2) {
            arr[k ++] = R[j ++];
        }
    }

    // Main function that sorts arr[l..r] using
    // merge()
    void sort(int arr[], int l, int r) {
        if (l < r) {
            // Find the middle point
            int m = l + (r - l) / 2;

            // Sort first and second halves
            sort(arr, l, m);
            sort(arr, m + 1, r);

            // Merge the sorted halves
            merge(arr, l, m, r);
        }
    }
}

在jdk中,使用Fork/Join框架的一个典型案例是Streams API,Streams API试图简化我们的并发编程,可以使用很简单的流式API来处理我们的数据流,在我们无感知的状态下,其实Streams的实现上借助了Fork/Join框架来实现了并发计算,所以强烈建议使用Streams API来处理我们的流式数据,这样可以充分的利用机器的多核心资源,来提高数据处理的速度。关于java的Streams API的分析总结可以参考文章Java Streams API,关于Stream API是如何使用Fork/Join框架来实现并行计算的内容可以参考文章Java Stream的并行实现。鉴于Fork/Join框架的先进思想,理解并且学会使用Fork/Join框架来处理我们的实际问题是非常有必要的。

Java volatile关键字

volatile解决的问题是多个线程的内存可见性问题,在并发环境下,每个线程都会有自己的工作空间,每个线程只能访问各自的工作空间,而一些共享变量会被加载到每个线程的工作空间中,所以这里面就有一个问题,内存中的数据什么时候被加载到线程的工作缓存中,而线程工作空间中的内容什么时候会回写到内存中去。这两个步骤处理不当就会造成内存可加性问题,也就是数据的不一致,比如某个共享变量被线程A修改了,但是没有回写到内存中去,而线程B在加载了内存中的数据之后读取到的共享变量是脏数据,正确的做法应该是线程A的修改应该对线程B是可见的,更为通用一些,就是在并发环境下共享变量对多个线程是一致的。

对于内存可见性的一点补充是,之所以会造成多个线程看到的共享变量的值不一样,是因为线程在占用CPU时间的时候,cpu为了提高处理速度不会直接和内存交互,而是会先将内存中的共享内容读取到内部缓存中(L1,L2),然后cpu在处理的过程中就只会和内部缓存交互,在多核心的机器中这样的处理方式就会造成内存可见性问题。

volatile可以解决并发环境下的内存可见性问题,只需要在共享变量前面加上volatile关键字就可以解决,但是需要说明的是,volatile仅仅是解决内存可见性问题,对于像i++这样的问题还是需要使用其他的方式来保证线程安全。使用volatile解决内存可见性问题的原理是,如果对被volatile修饰的共享变量执行写操作的话,JVM就会向cpu发送一条Lock前缀的指令,cpu将会这个变量所在的缓存行(缓存中可以分配的最小缓存单位)写回到内存中去。但是在多处理器的情况下,将某个cpu上的缓存行写回到系统内存之后,其他cpu上该变量的缓存还是旧的,这样再进行后面的操作的时候就会出现问题,所以为了使得所有线程看到的内容都是一致的,就需要实现缓存一致性协议,cpu将会通过监控总线上传递过来的数据来判断自己的缓存是否过期,如果过期,就需要使得缓存失效,如果cpu再来访问该缓存的时候,就会发现缓存失效了,这时候就会重新从内存加载缓存。
总结一下,volatile的实现原则有两条:
1、JVM的Lock前缀的指令将使得cpu缓存写回到系统内存中去
2、为了保证缓存一致性原则,在多cpu的情景下,一个cpu的缓存回写内存会导致其他的cpu上的缓存都失效,再次访问会重新从系统内存加载新的缓存内容。

原子操作CAS

原子操作表达的意思是要么一个操作成功,要么失败,中间过程不会被其他的线程中断,这一点对于并发编程来说非常重要,在java中使用了大量的CAS来做并发编程,包括jdk的ConcurrentHsahMap的实现,还有AtomicXXX的实现等其他一些并发工具的实现都使用了CAS这种技术,CAS包括两部分,也就是Compare and swap,首先是比较,然后再交互,这样做的原因是,在并发环境下,可能不止一个线程想要来改变某个共享变量的值,那么在进行操作之前使用一个比较,而这个比较的值是当前线程认为(知道)该共享变量最新的值,但是可能其他线程已经改变了这个值,那么此时CAS操作就会失败,只有在共享变量的值等于线程提供的用于比较的值的时候才会进行原子改变操作。

java中有一个类是专门用于提供CAS操作支持的,那就是Unsafe类,但是我们不能直接使用Unsafe类,因为Unsafe类提供的一些底层的操作,需要非常专业的人才能使用好,并且Unsafe类可能会造成一些安全问题,所以不建议直接使用Unsafe类,但是如果想使用Unsafe类的话还是有方法的,那就是通过反射来获取Unsafe实例,类似于下面的代码:


class UnsafeHolder {

    private static Unsafe U = null;

    public static Unsafe getUnsafe() {
        if (U == null) {
            synchronized (UnsafeHolder.class) {
                if (U == null) {

                    List<Exception> exception = null;
                    try {
                        Field field = Unsafe.class.getDeclaredField("theUnsafe");

                        field.setAccessible(true);

                        try {
                            U = (Unsafe) field.get(null);
                        } catch (IllegalAccessException e) {

                            exception.add(e);
                        }
                    } catch (NoSuchFieldException e) {

                        exception.add(e);
                    } finally {

                        if (exception != null) {
                            reportException(exception);
                        }

                    }
                }
            }
        }

        return U;
    }

    /**
     * handler the exception in this method .
     * @param e The exception
     */
    private static void reportException(List<Exception> e) {
        e.forEach(System.out::println);
    }

}

如果想要了解Unsafe类到底提供了哪些较为底层的操作,可以直接参考Unsafe的源码。CAS操作解决了原子操作问题,只要进行操作,CAS就会保证操作会成功,不会被中断,这是一种非常好非常强大的特性,下面就java 8中的ConcurrentHashMap的size实现来谈谈CAS操作在并发环境下的使用案例。

在java 7中,ConcurrentHashMap的实现是基于分段锁协议的实现,本质上还是使用了锁,只是基于一种考虑,就是多个线程访问哈希桶具有随机性,基于这种考虑来将数据存储在不同的哈希段上面,然后每一个段配有一把锁,在需要写某个段的时候需要加锁,而在这个时候,其他访问其他段的线程是不需要阻塞的,但是对于该段的线程访问就需要等待,直到这个加锁的线程释放了锁,其他线程才能进行访问。在java 8中,ConcurrentHashMap的实现抛弃了这种复杂的架构设计,但是继承了这种分散线程竞争压力的思想,其实就提高系统的并发度这一维度来说,分散竞争压力是一种最为直接明了的解决方案,而java 8在实现ConcurrentHashMap的时候大量使用了CAS操作,减少了使用锁的频度来提高系统的响应度,其实使用锁和使用CAS来做并发在复杂度上不是一个数量级的,使用锁在很大程度上假设了多个线程的排斥性,并且使用锁会将线程阻塞等待,也就是说使用锁来做线程同步的时候,线程的状态是会改变的,但是使用CAS是不会改变线程的状态的(不太严谨的说),所以使用CAS比起使用synchronized或者使用Lcok来说更为轻量级。

现在就ConcurrentHashMap的size方法来分析一下如何将线程竞争的压力分散出去。在java 7的实现上,在调用size方法之后,ConcurrentHashMap会进行两次对哈希桶中的记录累加的操作,这两次累加的操作是不加锁的,然后判断两次结果是否一致,如果一致就说明目前的系统是读多写少的场景,并且可能目前没有线程竞争,所以直接返回就可以,这就避免了使用锁,但是如果两次累加结果不一致,那就说明此时可能写的线程较多,或者线程竞争较为严重,那么此时ConcurrentHashMap就会进行一个重量级的操作,对所有段进行加锁,然后对每一个段进行记录计数,然后求得最终的结果返回。在最有情况下,size方法需要做两次累加计数,最坏情况需要三次,并且会涉及全局加锁这种重量级的加锁操作,性能肯定是不高的。而在java 8的实现上,ConcurrentHashMap的size方法实际上是与ConcurrentHashMap是解耦的,size方法更像是接入了一个额外的并发计数系统,在进行size方法调用的时候是不会影响数据的存取的,这其实是一种非常先进的思想,就是一个系统模块化,然后模块可以进行更新,系统解耦,比如java 8中接入了并发计数组件Striped64来作为size方法的支撑,可能未来出现了比Striped64更为高效的算法来计数,那么只需要将Striped64模块换成新的模块就可以了,对原来的核心操作是不影响的,这种模块化系统设定的思想应该在我们的项目中具体实践。

上面说到java 8在进行size方法的设计上引入了Striped64这种并发计数组件,这种组件的计数思想其实也是分散竞争,Striped64的实现上使用了volatile和CAS,在Striped64的实现中是看不到锁的使用的,但是Striped64确实是一种高效的适用于并发环境下的计数组件,它会基于请求计数的线程,Striped64的计数会根据两部分的内容来得到最后的结果,类似于java 7中ConcurrentHashMap的size方法的实现,在Striped64的实现上也是借鉴了这种思想的,Striped64会首先尝试将某个线程的计数请求累加到一个base共享变量上,如果成功了,那么说明目前的竞争不是很激烈,也就没必要后面的操作了,但是很多情况下,并发环境下的线程竞争是很激烈的,所以尝试累加到base上的计数请求很大概率是会失败的,那么Striped64会维护一个Cell数组,每个Cell是一个计数组件,Striped64会为每个请求计数的线程计算一个哈希值,然后哈希到Cell数组中的某个位置上,然后这个线程的计数就会累加到该Cell上面去。当然,Striped64的实现细节是非常复杂的,想要了解Striped64的实现细节的读者可以参考文章Java 并发计数组件Striped64详解,配合Striped64的源码效果更佳。

并发同步框架AQS

AQS是java中实现Lock的基础,也是实现线程同步的基础,AQS提供了锁的语义,并且支持独占模式和共享模式,对应于悲观锁和乐观锁,独占模式的含义是说同一时刻只能有一个线程获取锁,而其他试图获取锁的线程都需要阻塞等待,而共享锁的含义是说可以有多个线程获得锁,两种模式在不同的场景下使用。

而锁在并发编程中的地位不言而喻,多个线程的同步很多时候是需要锁来做同步的,比如对于某些资源,我们希望可以有多个线程获得锁来读取,但是只允许有一个线程获得锁来执行写操作,这种锁称为读写锁,它的实现上结合了AQS的共享模式和独占模式,共享模式对应于可以使得多个线程获得锁来进行读操作,独占模式对应于只允许有一个线程获得锁来进行写操作。关于java中多个Lock的实现细节,以及是如何借助AQS来实现其具体逻辑的内容,可以参考文章ava可重入锁详解。该文章详细讲述了多个Lock接口的实现类,以及他们是如何借助AQS来实现的具体细节。

某些时候,我们需要定制我们自己的线程同步策略,个性化的线程同步借助AQS可以很容易的实现,比如我们的需求是允许限定个数的线程获得锁来进行一些操作,想要实现这样的语义,只需要实现一个类,继承AQS,然后重写方法下面两个方法:


    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }    

关于AQS的具体分析,可以参考文章Java同步框架AbstractQueuedSynchronizer

还需要提到的一点是,锁分为公平锁和非公平锁,java中大多数时候会使用队列来实现公平锁,而使用栈来实现非公平锁,当然这是基于队列和栈这两种数据结构的特点来实现的,直观的来说,使用队列的FIFO的特性就可以实现类似排队的效果,也就保证了公平性,而栈是一个后进先出的数据结构,它的这种结构造成的结果就是,最新进入的线程可能比那些等待过一段时间的线程更早的获得锁,更为具体的内容可以参考上面的文章进行了解。

synchronized(同步锁)

相对于volatile,synchronized就显得比较重量级了。
首先,我们应该知道,在java中,所有的对象都可以作为锁。可以分为下面三种情况:
1、普通方法同步,锁是当前对象
2、静态方法同步,锁是当前类的Class对象
3、普通块同步,锁是synchronize里面配置的对象
当一个线程试图访问同步代码时,必须要先获得锁,退出或者抛出异常时必须要释放锁。
JVM基于进入和退出Monitor对象来实现方法同步和代码块同步,可以使用monitorenter和monitorexit指令实现。monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit指令则插入到方法结束和异常处,JVM保证每个monitorenter都有一个monitorexit阈值相对应。线程执行到monitorenter的时候,会尝试获得对象所对应的monitor的锁,然后才能获得访问权限,synchronize使用的锁保存在Java对象头中。

并发队列(阻塞队列,同步队列)

并发队列,也就是可以在并发环境下使用的队列,为什么一般的队列不能再并发环境下使用呢?因为在并发环境下,可能会有多个线程同时来访问一个队列,这个时候因为上下文切换的原因可能会造成数据不一致的情况,并发队列解决了这个问题,并且java中的并发队列的使用时非常广泛的,比如在java的线程池的实现上使用了多种不同特性的阻塞队列来做任务队列,对于阻塞队列来说,它要解决的首要的两个问题是:

  1. 多线程环境支持,多个线程可以安全的访问队列
  2. 支持生产和消费等待,多个线程之间互相配合,当队列为空的时候,消费线程会阻塞等待队列不为空;当队列满了的时候,生产线程就会阻塞直到队列不满。

java中提供了丰富的并发队列实现,下面展示了这些并发队列的概览:

java并发队列概览

根据上面的图可以将java中实现的并发队列分为几类:

  1. 一般的阻塞队列
  2. 支持双端存取的并发队列
  3. 支持延时获取数据的延时阻塞队列
  4. 支持优先级的阻塞队列

这些队列的区别就在于从队列中存取数据时的具体表现,比如对于延时队列来说,获取数据的线程可能被阻塞等待一段时间,也可能立刻返回,对于优先级阻塞队列,获取的数据是根据一定的优先级取到的。下面展示了一些队列操作的具体表现:

操作类型 Throws Exception Special Value Blocked Timed out
插入 add(o) offer(o) put(o) offer(o, timeout, unit)
取出(删除) remove(o) poll() take() poll(timeout, unit)
  • Throws Exception 类型的插入和取出在不能立即被执行的时候就会抛出异常。
  • Special Value 类型的插入和取出在不能被立即执行的情况下会返回一个特殊的值(true 或者 false)
  • Blocked 类型的插入和取出操作在不能被立即执行的时候会阻塞线程直到可以操作的时候会被其他线程唤醒
  • Timed out 类型的插入和取出操作在不能立即执行的时候会被阻塞一定的时候,如果在指定的时间内没有被执行,那么会返回一个特殊值

关于更为具体的分析总结可以参考文章Java 并发队列详解

无锁并发设计须知

在并发系统设计的时候,为了数据安全等原因需要对共享数据进行加锁访问,但是使用锁必然会有开销,在并并发系统较为繁忙的时候,这个开销就变得很可观了,为了规避这个问题,无锁并发系统设计成为一种趋势。

所谓无锁并发,即在进行并发系统设计的时候不使用锁,而使用一些其他的技术来达到锁的效果,在java中,CAS技术是无锁并发系统设计的基础技术,结合CAS和spin即可实现无锁并发系统的设计,但是,因为涉及spin,也就是自旋,所以需要特别注意CPU 100%的问题,以及因为使用无锁,如果没有做好线程竞争管理,就会出现线程饥饿的问题,下面是在使用CAS进行无锁并发系统设计时需要注意的一些问题:

  • (1)、使用队列来管理竞争线程,解决线程饥饿的问题
  • (2)、在多次CAS无果之后,请让线程休息一会,让出CPU使得一些其他的事情得到处理
  • (3)、避免出现CPU 100%的问题

如果没有足够的CAS经验,不推荐使用CAS来进行无锁并发设计,使用锁可以方便快速的解决并发问题,并且性能问题逐渐得到解决,所以,如果对性能不是要求特别严苛,使用锁即可,当然,锁的使用也需要合理,否则性能依然会成为一个很大的问题。

总结

本文总结了java并发编程中的若干核心技术,并且对每一个核心技术都做了一些分析,并给出了参考链接,可以在参考链接中查找到更为具体深入的分析总结内容。java并发编程需要解决一些问题,比如线程间同步问题,如何保证数据可见性问题,以及如何高效的协调多个线程工作等内容,本文在这些维度上都有所设计,本文作为对阅读java.util.Concurrent包的源码阅读的一个总结,同时本文也作为一个起点,一个开始更高层次分析总结的起点,之前的分析都是基于jdk源码来进行的,并且某些细节的内容还没有完全搞明白,其实在阅读了一些源码之后就会发现,如果想要深入分析某个方面的内容,就需要一些底层的知识,否则很难完整的分析总结出来,但是这种不彻底的分析又是很有必要的,至少可以对这些内容有一些大概的了解,并且知道自己的不足,以及未来需要了解的底层内容。对于java并发包的分析研究,深入到底层就是对jvm如何管理内容,如何管理线程的分析,在深入下去,就是操作系统对内存的管理,对线程的管理等内容,从操作系统再深入下去,就是去理解cpu的指令系统,学习磁盘知识等内容,当然,知识的关联是无止境的,学习也是无止境的,目前来说,首要解决的问题是可以熟练的使用java提供的并发包内容来进行并发编程,在业务上提高并发处理能力,在出现问题的时候可以很快找到问题并且解决问题,在达到这个要求之后,可以去了解一些jvm层次的内容,比如jvm的内存模型,以及线程的实现,并且可以与学习操作系统的相关内容并行进行。

相关文章

网友评论

    本文标题:【浅谈Java并发编程中的若干核心技术】

    本文链接:https://www.haomeiwen.com/subject/pajijktx.html