美文网首页
Java 并发编程

Java 并发编程

作者: 清雨季 | 来源:发表于2019-04-15 22:22 被阅读0次

    一 多线程基础

    1.1 线程的状态及其变更

    线程的状态及其变更

    1.2涉及到的各种方法

    • Thread.start()
      让线程进入就绪状态,不是立即执行
    • Thread.yield()
      Thread类的静态方法,让当前线程放弃CPU时间,从运行中状态变为就绪状态(可能又马上被CPU执行了)
    • Thread.join()
      在当前线程中调用另一线程对象的join方法,表示当前线程会等待另一线程执行结束之后才继续执行。
    • Object.wait()
      虽然是Object的方法,但是必需是"锁"才能调用此方法,调用完成后当前线程会放弃该锁的所有权,并进入等待状态,方法可以带有参数,时间到了之后会进入就绪状态。
    • Object.notify()
      同上,只有锁才能调用,调用后会随机唤醒一个此锁上等待线程,进入就绪状态。不会暂停此线程。
    • Object.notifyAll()
      与notify方法的区别是,这个方法会唤醒所有的等待线程。

    二 并发包之Executor

    2.1 Future

    Future的体系结构:https://blog.csdn.net/u014209205/article/details/80598209

    1.1.1 RunnableFuture

    Runnalbe接口和Future接口的组合类,可以被线程执行,执行成功后可以获取结果

    • 通过CAS操作保证状态的变更
    • 通过自旋锁+线程挂起+唤醒的方式来实现阻塞实现类:FutureTask

    2.1.2 CompleteableFuture

    用于处理Future之间的组合调用,依赖等问题,参考文档20种CompletableFuture使用方式

    2.2 线程池

    • newCachedThreadPool 使用ThreadPoolExecutor实现,缓存空闲线程,默认最小线程数为0,最大为无限,探活时间为60s
    • newFixedThreadPool 也是使用ThreadPoolExecutor实现,定长
    • newSingleThreadExecutor 也是使用ThreadPoolExecutor实现,定长为1,可以指定执行任务的顺序FIFO或者LIFO
    • newScheduledThreadPool 内部由ScheduledThreadPoolExecutor实现,用于延时调用,周期性调用等
      关于线程池的原理,可以参考:https://blog.csdn.net/luanmousheng/article/details/77688356

    2.3 锁

    自旋锁:一种通过循环检测方式实现的锁。

    class SpinLock {
            //java中原子(CAS)操作
        AtomicReference<Thread> owner = new AtomicReference<Thread>();//持有自旋锁的线程对象
        private int count;
        public void lock() {
            Thread cur = Thread.currentThread();
            while (!owner.compareAndSet(null, cur)){
            }
        }
        public void unLock() {
            Thread cur = Thread.currentThread();
                owner.compareAndSet(cur, null);
            }
        }
    }
    

    重入锁:可以重复获取的锁,如ReentrantLock和synchronized。
    重量级锁:底层依赖操作系统互斥量来实现,成本比较高(用户态与内核态切换,线程切换),但是不需要自旋,适合锁竞争激烈的时候使用。
    轻量级锁:通过自旋来实现,避免内核切换和线程切换,但是锁竞争激烈的时候会膨胀成重量级锁,适合锁竞争不激烈的时候适用。
    偏向锁:轻量级锁进一步优化得到的锁,假定只有一个线程会获取锁,只有一次CAS操作。
    公平锁:公平锁是指当锁可用时,在锁上等待时间最长的线程将获得锁的使用权。而非公平锁则随机分配这种使用权。公平锁可以避免饥饿,但是非公平锁更高效。
    参考资料

    2.4 java中的锁实现

    • synchronized 锁定的是对象,如果在方法上,则锁定的是this对象,如果在静态方法上,则锁定的是类的Class对象。在JVM层面上实现,出现异常可以自动释放,但是不能指定获取锁的时间。
    • ReentrantLock 类似于synchronized,可以指定获取锁的超时时间,支持锁投票,支持公平锁,支持重入,但是需要自行释放锁。死锁的情况下可以手动中断,使用比较灵活。
    • ReadWritelock 读写锁,读读不互斥,读写互斥,写写互斥,支持锁降级,但是不支持升级。

    AbstractQueuedSynchronizer

    所有同步对象的基础,基于原子操作和valitale关键字实现最基础的同步。
    获取独占锁流程:
    1)获取同步状态,原子操作设置状态
    2)若成功,则获取成功,走下面的代码
    3)不成功,则创建新结点,使用CAS加入到队尾中
    4)进入自旋,判断是否获取到了锁,没有则进入阻塞状态,被唤醒后进入下一次自旋,直到获取到锁为止
    5)获取到锁则把自己设置为头节点,释放锁时唤醒下一个节点

    Reentranklock

    使用AQS完成,相比普通的锁,多处理了重入的过程
    使用AQS中的state表示重入次数,lock 时加1 unlock时减1
    内部使用两个内部类(都是AQS的子类)来完成,分别是Sync(公平锁) NonfairSync(非公平锁)
    公平锁相对多了一个判断:加锁时要求无线程等待,或者当前线程是队头节点

    ReadWriteLock

    读写锁,使用AQS完成,AQS的state的高16位表示读锁,低16位表示写锁

    获取读锁的条件:没有线程获取了写锁,等待队列中头节点的等待类型不是写类型,获取写锁的次数没达到上限

    获取写锁的条件:没有线程获取了读锁,没有线程获取读锁,或者仅有当前线程获取了写锁

    Condition

    用于线程之间的通信,相比Object中的方法,可以控制到线程级别的粒度,可以支持超时等待。

    Condition中维护了一个等待队列,线程调用Condition.await时,把当前线程节点从同步队列移动至等待队列,然后LockSupport.park()

    当调用Condition.signal 方法时,会把等待队列的头节点移动至同步队列中,然后LockSupport.unpark唤醒该节点

    ConcurrentHashMap

    线程安全,效率比Hashtable高,底层使用Segment实现,一个Segment的结构类似于一个HashMap,同时它也是Lock的子类,相当于做了一个锁粒度的变更

    添加元素的过程:

    • 计算hash:取key的hashCode中高16位的1和低16位的0组成的新值,即:(h ^ (h >>> 16))
    • 如果Node数组没有初始化,则初始化桶,为了防止多次初始化,使用一个int类型的sizeCtl的CAS操作来控制并发
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); // lost initialization race; just spin
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    doRealInit();
                }
    
    • 使用hash定位元素在桶中的下标,定位方式为:(nodes.size - 1) & hash,如果对应下标的Node为null,说明这个位置之前没有加入过任何元素,直接new一个Node(key, value)即可。
    • 如果对应下标的已经加入过元素了,则synchronized把此节点锁住,然后链式查找,如果存在某个节点的key与新加入的key一致,则替换原一的value
    • 如果key不存在,则创建新节点加入至队尾
    • 如果这个链的长度大于8,则把链表改成红黑树

    阻塞队列

    BlockingQueue接口中定义的方法

    记忆方法:带字母t的是阻塞的,除add外,有连续两个字母相同的是返回特殊值的

    LinkedBlockingQueue

    底层采用链表实现:

        static class Node<E> {
            E item;
            Node<E> next;
            Node(E x) { item = x; }
        }
    

    使用两个Condition来实现阻塞功能

        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();
    

    put方法中,会先putLock.lock,然后如果当前队列满了,则会notFull.await(),此外如果原来队列中没有元素,则还需要notEmpty.signal();

    由于可能有多个线程block在put方法中,因此put完成后如果发现队列还没有满,还需要发一下notFull.signal()

    get方法中,先判断是否为空,是则notEmpty.await(),如果不为空,则取数据。如果取之前队列是满的,则取完后需要发一下notFull.signal()。

    同理,取完后如果队列不为空,则需要发一个notEmpty.signal()

        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    
    ArrayBlockingQueue

    与LinkedBlockingQueue的区别是,底层采用的是数组的方式实现,并且ArrayBlockingQueue读写共用一把锁,因此读写是串行的,阻塞的实现方式与LinkedBlockingQueue是一样的

    PriorityBlockingQueue

    先复习PriorityQueue:优先级队列,并不是"先进先出"的队列,而按照优先级出队,底层采用数组实现,元素的排序方式可以使用Comparetor对象,也可以让元素实现Compareable

    PriorityBlockingQueue 底层同样使用数组实现,同样需要为元素指定排序规则。
    与前两种Blocking的区别是,这是一个无界队列,会自动扩容,因此它只会阻塞读,但是不会阻塞写。

    由于只需要阻塞读,因此只使用了一个notEmpty Condition。

    DelayQueue

    延迟队列,底层由PriorityQueue实现,是一个无界队列,会对数据排序,越早完成延期的数组排在前面。

    读取数据时会读队头节点,即最先完成延时的节点,因此如果队头节点没有完成延时,则会阻塞

    SynchronousQueue

    不储存元素的阻塞队列,相当于只有一个容量,只做了元素的传递,从写线程传到消费线程。

    并发工具

    CountDownLatch

    创建对象是会指定一个int类型初始值count,执行await()方法时会阻塞住,其他线程执行countDown()时会把count减1,当count为0时,被await()阻塞的线程会被唤醒继续执行

    private static void testCDL() throws Exception{
        CountDownLatch countDownLatch = new CountDownLatch(5);
        new Thread(){
            @Override
            public void run() {
                try {
                    while(countDownLatch.getCount() > 0) {
                        TimeUnit.SECONDS.sleep(1);
                        countDownLatch.countDown();
                        System.out.println("剩余时间:" + countDownLatch.getCount());
                    }
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();
        countDownLatch.await();
        System.out.println("Finish");
    }
    

    实现方式:

    • 底层使用AQS实现,初始化的count值即为AQS中state的初始值
    • 执行countDown方法时会把state值减1,如果减之后为0,则唤醒AQS同步队列中的头节点。
    • 执行await方法时,会判断线程是否为0,如果为0则可以直接通过,如果不为0,需要把当前线程加入至同步队列,然后LockSupport.park()住。

    通过源码可以发现:

    • 可以多个线程共同await在同一个CountDownLatch对象上
    • 任何线程都可以执行CountDownLatch的countDown方法
    • 一个线程可以多次countDown

    CyclicBarrier

    可循环使用的屏障,让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时再唤醒所有阻塞线程

        private static void testCyclicBarrier() throws Exception{
            CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
            executeOnNewThread(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        cyclicBarrier.await();
                        System.out.println(1);
                        cyclicBarrier.await();
                        System.out.println(2);
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "Thread-1");
            cyclicBarrier.await();
            System.out.println(1);
            cyclicBarrier.await();
            System.out.println(2);
        }
    

    实现方式应该与CountDownLatch类似,就不再看源码了。从示例中可以看出与CountDownLatch的不同之处:

    • CyclicBarrier可以循环使用
    • CyclicBarrier在一次使用中,一个线程只能await一次,即只能使记数器减1

    信号量

    用于做流量控制的,同一时间内只允许有限个线程执行,示例代码如下:

        private static void testSemaphore() throws Exception{
            Semaphore semaphore = new Semaphore(3);
            for (int j = 0; j < 10; j++) {
                executeOnNewThread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            semaphore.acquire();
                            TimeUnit.SECONDS.sleep(1);
                            System.out.println(Thread.currentThread().getName());
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            semaphore.release();
                        }
                    }
                }, "Thread-" + j);
            };
        }
    

    同一时间只有三个线程可以执行。
    依然是使用AQS锁实现的,使用state表实当前剩余的信号量,acquire时,如果当前信号量够,则减1返回,如果信号量不够,则把线程加入同步列队中并且LockSupport.park()暂停住。
    release时,如果release成功,则唤醒同步队列中的头节点

    Exchanger

    用于交换线程的数据

    线程池

    线程池中的几个参数

    corePoolSize:核心线程数
    runnableTaskQueue:任务队列,BlockingQueue的实现类

    • ArrayBlockingQueue
    • LinkedBlockingQueue (Fixed线程池使用)
    • SynchronousQueue 不存储元素的阻塞队列(Cached线程池使用)

    maximumPoolSize :最大线程数
    ThreadFactory :线程工厂,用于创建线程
    RejectedExecutionHandler 饱和策略,指工作队列满了之后的策略,可选以下几种

    • AbortPolicy : 直接抛出异常
    • CallerRunsPolicy : 只用调用者所在的线程在执行
    • DiscardOldestPolicy:去除队列中最近的一个任务
    • DiscardPolicy : 不处理,丢弃掉

    也可以自己实现一个RejectedExecutionHandler

    keepAliveTime : 线程的存活时间

    提交任务的两种方式

    execute: 提交线程,无返回值
    submit: 提交线程,要返回值

    关闭线程池的方式

    shutdown: 将线程池至为SHUTDOWN状态,然后遍历并中断所有没有正在执行任务的线程。
    shutdownNow: 立即将线程池状态至为STOP状态,然后尝试停止所有任务。
    也就是说shutdown会让已经开始的任务执行完,而shutdownNow则会立即停止所有线程,可能有的任务会只执行到一半。

    Executor 框架

    ThreadPoolExecutor

    线程池的状态及流转:
    • RUNNING:被创建之后的状态,可以接收新的任务,并且会循环处理队列中的任务
    • SHUTDOWN:调用shutdown方法后的状态,不再接受新的任务,但是还会循环处理队列中的任务
    • STOP:调用shutdownNow方法后的状态,不再接受新的任务,不会处理队列中的任务,还会中断正在执行的任务
    • TIDYING:从SHUTDOWN或STOP中流转,当线程池中的线程已经被清理时,自动流转到这个状态。
    • TERMINATED:线程池处于TIDYING状态后会马上调用terminated方法,调用完成后就会转到TERMINATED状态。
    几个关键的字段:
    • ctl:这是一个AtomicInteger变量,用于保存工作线程数和线程池状态两部分数据,int类型共32位,这里使用前三位表示状态,后面29位表示工作线程数量。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static int runStateOf(int c)  { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    • workQueue :任务队列,必需是一个BlockingQueue
    • workers :一个HashSet,保存线程
    线程池收后一个新任务后的处理流程

    1)判断core thread 是否满了,不是,则直接创建线程执行,否则转2)
    2)工作队列是否满了,没满则加入,否则转3)
    3)工作线程是否是上限,没有则创建新的工作线程执行,否则转4)
    4)使用饱合策略
    只有第1,3步会加锁,第一步通常都是满了,所以通常只有第三步会加锁,使用prestartAllCoreThread方法可以提前创建好所有的核心线程。
    源码如下:

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    
    添加线程的流程

    步骤一:检查。只有线程池处理RUNNING状态,或者处于SHUTDOWN状态并且任务队列不为空的情况下才能继续添加线程。
    步骤二:检查。检查线程池数量是否超过上限,如果添加的是核心线程,上限即corePoolSize,如果不是核心线程,上限即为maximumPoolSize。
    步骤三:将工作线程数量原子加1,如果加成功,则继续步骤四添加工作线程。如果不成功,说明ctl字段已经被其他线程修改过了,所以要回到步聚一重新检查。
    步骤四:创建一个Worker对象(Worker是一个内部类,可以认为就是一个线程)
    步骤五:加锁 mainLock.lock()
    步骤六:重新检查线程池状态
    步骤七:把创建好的Worker对象加入到workers中,并且更新数量

            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
    Worker内部类工作逻辑

    Worker类的定义如下:

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable {
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
            
            public void run() {
                runWorker(this);
            }
        }
    

    Worker类封闭了一个Thread对象,同时实现了AQS锁,这个锁主要是在中断线程时使用。
    Worker的主要作用是让线程不断循环,从任务队列中取任务执行,核心循环代码如下:

        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    从源码上看,这个循环其实就是处理了三件事:

    • 从队列中取任务
    • 检查线程池状态,检查线程是否被中断
    • 执行任务

    Java中提供的四种线程池

    • FixedThreadPool:线程数不可伸缩,队列采用LinkedBlockingQueue,回收线程。
    new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    
    • SingleThreadExecutor:单线程,不可伸缩,使用LinkedBlockingQueue
    new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>())
    
    • CachedThreadPool :核心线程数为0,不限最大线程数,线程探活时间为60秒,使用SynchronousQueue
    • ScheduledThreadPool:这个比较特殊,底层采用的是ThreadPoolExecutor的子类ScheduledThreadPoolExecutor,可以实现一些特殊的功能,例如周期调用某个任务。
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    

    ScheduledThreadPool 使用

    不怎么用这个类,这里就暂时先不看它的源码了,使用的几个关键方法如下:

    • schedule :只执行一次,可以给定一个延时时间,延时完成后再执行。
    public ScheduledFuture<?> schedule(Runnable command,
                                         long delay, TimeUnit unit);
    

    command: 任务, delay和unit:指定延时时间

    • scheduleAtFixedRate:第一次延时initialDelay时间执行,以后每隔period时间执行一次
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    

    示例代码如下:3秒后每隔1秒输出一次当前时间

            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(System.currentTimeMillis());
                        Thread.sleep(500);
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, 3, 1, TimeUnit.SECONDS);
    
    • scheduleWithFixedDelay:第一次延时initialDelay时间执行,以后每隔period时间执行一次
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);
    

    它与上一个的区别在于,这个方法是执行完成任务后才开始算间隔时间,而scheduleAtFixedRate是从任务开始执行时就计算间隔时间。
    如果上面的示例代码改使用scheduleWithFixedDelay方法,则会变成3秒后每隔1.5秒输出一次系统时间

    内存模型

    三种重排序类型

    1. 编译器优化重排序
      例如下面的代码:
    a = 1;
    b = true;
    

    由于这两行代码在单线程的条件下没有任何关系。因此可以被优化成:

    b = true;
    a = 1;
    

    这样改动两行代码的顺序,在单线程的情况下效果是一样的。

    1. 指令级并行重排序
      现代CPU采用了指令级并行技术来将多条指令重叠执行,如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。

    2. 内存系统的重排序
      由于处理器使用了缓存和读/写缓冲区,这使用加载和存储操作看上去可能是乱序的。
      如下图所示:假如有两个处理器分别执行A1,A2和B1,B2操作,可能得到的结果是x=y=0。



      执行的过程可以如下图所示:


    按照时间顺序,可能的执行顺序是:A1 -> B1 -> A2 -> B2 -> A3 -> B3
    由于A2,B2在执行时A3和B3没有被执行,所以读到的数据还都是0。

    虽然处理器执行的顺序是A1 -> A2,但是实际内存操作的顺序却成了A2 -> A1了。

    以上说的三种重排序中,只有1属于编译器重排序,而2,3属于处理器重排序。

    处理器重排序和内存屏障

    处理器重排序规则:


    可以看出所有的处理器都允许Store-Load 重排序。

    为了保证内存可见性,Java编译器在生成指令序列的适当位置插入内存屏障指令来禁止特定类型的处理器重排序:


    happens-before原则

    如果A操作happens-before B操作,指的是A操作执行的结果对B操作一定可见(并不意味着A操作要在B操作之前执行)。

    Java语言中定义的happens-before原则(以下称先行发生)有以下八个,最重要的是前面四个:

    • 程序次序规则:在一个线程内,书写在前面的操作先行发生于书写在后面的操作。
    • 管程锁定规则:一个unlock操作先行发生于\color{red}{后面}对同一个锁的lock操作。
    • volatile变量规则:对一个volatile的写操作先行发生于\color{red}{后面}对这个变量的读操作
    • 传递规则:A先行发生于B,B先行发生于C,则A先行发生于C。
    • 线程启动规则:Thread对象的start方法先行发生于此线程的每一个动作。
    • 线程终止规则:线程中所有操作先行发生于对此线程的终止检测。
    • 线程中断规则:对线程interrupt方法的调用先行发生于被中断线程的检测。
    • 对象终止规则:一个对象的初始化完成先行发生于此对象的finalize方法。

    以上加红的两个后面指的是时间上的先后。

    数据依赖性

    如果两个操作存在数据依赖性,则这两个操作不被重排序,如下代码所示:

    //情况一
    a = 1;
    b = a;
    //情况二
    a = 1;
    a = 2;
    //情况三
    a = b;
    b = 1;
    

    这里说的数据依赖指的是单线程的依赖。

    happens-before 原则,数据依赖性和重排序

    • 重排序不能改变程序的执行结果(单线程执行的结果)
    • A先行发生于B也可能先执行A,即A先行发生于B不等于A比B先执行。

    如以下代码:

    a = 1;          //A
    b = 2;          //B
    c = a + b;    //C
    

    根据程序次序规则,以上三行代码存在三个先行发生原则:
    A happens-before B
    B happens-before C
    A happens-before C

    同时还存在两个数据依赖:
    C依赖A
    C依赖B

    注意A和B不存在数据依赖
    处理器的执行顺序可能有两种情况:
    A -> B -> C
    B -> A -> C

    即虽然A happens-before B ,但是还是会被重排序。

    volatile关键字

    volatile变量自身具有以下特性:

    • 可见性:对一个volatile变量的读,总是能看到任意线程最后对他的写入
    • 原子性:对任意单个volatile变量的读/写具有原子性(i++这种复合操作除外)

    volatile写的内存语义:当写一个volatile变量时,会把写后值刷新到主内存。
    volatile读的内存语义:当读一个volatile变量时,会把线程本地内存置为无效,直接从主内存读取数据。

    volatile的实现:JMM使用内存屏障来禁用特定的重排序来实现volatile关键字

    • 禁用所有volatile读操作与他后面的任意操作的重排序
    • 禁用所有volatile写操作与他前面的任意操作的重排序
    • 如果第一个操作是volatile读,第二个操作是volatile写,也要禁用重排序

    内存屏障插入策略:

    • 在每个volatile写操作前面插入一个SS屏障
    • 在每个volatile写操作后面插入一个SL屏障
    • 在每个volatile读操作后面插入一个LL屏障
    • 在每个volatile读操作后面插入一个LS屏障

    相关文章

      网友评论

          本文标题:Java 并发编程

          本文链接:https://www.haomeiwen.com/subject/snpzbqtx.html