美文网首页
《实战高并发程序设计》读书笔记-无锁

《实战高并发程序设计》读书笔记-无锁

作者: 乙腾 | 来源:发表于2021-06-21 06:36 被阅读0次

无锁

保护共享资源的方式:

  • 控制资源的访问:加锁:

    • 保护共享资源的前提是:同一把锁,锁,此时锁分为:

      • 对象锁,这种最好通过单例模式来控制多线程使用的是同一个对象锁。

      • 类锁

  • 增加资源:无锁:

    • 锁分离思想,这种思想可能不需要锁,比如ThreadLocal,队列等。

    • CAS

CAS

介绍

如果<font color=red>线程的期望值</font>和<font color=red>物理内存的真实值</font>一样,则修改更新值

Compare And Set 比较并交换

先为原子变量赋值,即主物理内存值,下次操作的时候,通过和预期值比较(主物理内存值),如果不同,线程不安全。

使用


public static void main(String[] args) {

    //主物理内存值(原值是5)

    AtomicInteger atomicInteger = new AtomicInteger(5);

    //下次操作的时候,将atomicInteger于期望值(原值)比较,如果和期望值一直,则线程安全

    System.out.println(atomicInteger.compareAndSet(5, 2019)+"\t current data: "+atomicInteger.get());

    System.out.println(atomicInteger.compareAndset(5, 1024)+"\t current data: "+atomicInteger.get());

}


true    2019

false   2019

原理

实现角度

  CAS通过UnSafe类保证操作的原子性,java无法直接操作内存,通过UnSafe类调用本地方法即本地动态库,来操作内存。CAS是CPU的并发原语,判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子性的,都是基于CPPU的原子指令,不会造成所谓的数据不一致问题。

计算机角度

  先声明一个变量在主物理内存中,其他线程将之赋值到自己的工作内存中,操作后将之写回主物理内存中,如果发现和期望值相同,则将为其修改值。

[图片上传失败...(image-852378-1624228194305)]

如果线程的期望值和物理内存的真实值一样,则修改更新值。

** jdk中提供了一系列的无锁但线程安全的操作***

一.无锁的线程安全整数:AtomicInteger

JDK并发包中有一个atomic包,里面实现了一些直接使用CAS操作的线程安全的类型,<font color=red>对其进行修改等任何操作,都是用CAS指令进行的</font>。

AtomicInteger的一些主要方法

对于其他原子类,操作也是非常类似的:


public final int get()                                 //取得当前值

public final void set(int newValue)                    //设置当前值

public final int getAndSet(int newValue)               //设置新值,并返回旧值

public final boolean compareAndSet(int expect, int u)  //如果当前值为expect,则设置为u

public final int getAndIncrement()                     //当前值加1,返回旧值

public final int getAndDecrement()                     //当前值减1,返回旧值

public final int getAndAdd(int delta)                  //当前值增加delta,返回旧值

public final int incrementAndGet()                     //当前值加1,返回新值

public final int decrementAndGet()                     //当前值减1,返回新值

public final int addAndGet(int delta)                  //当前值增加delta,返回新值

AtomicInteger的incrementAndGet()和getAndIncrement()原理

在介绍这两个方法之前,先说明就内部实现上来说,AtomicInteger中保存一个核心字段:


private volatile int value;

它就代表了AtomicInteger的当前实际取值。此外还有一个:


private static final long valueOffset;


incrementAndGet()的内部实现

书中基于JDK 1.7分析,JDK 1.8与1.7的实现有所不同


1 public final int incrementAndGet() {

2     for (;;) {

3         int current = get();//取得当前值

4         int next = current + 1;

5         if (compareAndSet(current, next))

6             return next;

7     }

8 }

  这里让人映像深刻的,应该是incrementAndGet()方法的第2行for循环吧!如果你是初次看到这样的代码,可能会觉得很奇怪,为什么连设置一个值那么简单的操作都需要一个死循环呢?原因就是:<font color=red>CAS操作未必是成功的,因此对于不成功的情况,我们就需要进行不断的尝试</font>。第3行的get()取得当前值,接着加1后得到新值next。这里,我们就得到了CAS必需的两个参数:期望值以及新值。<font color=red>使用compareAndSet()方法将新值next写入,成功的条件是在写入的时刻,当前的值应该要等于刚刚取得的current。如果不是这样,就说明AtomicInteger的值在第3行到第5行代码之间,又被其他线程修改过了。当前线程看到的状态就是一个过期状态</font>。

因此,compareAndSet返回失败,需要进行下一次重试,直到成功。

getAndIncrement()自增原理


AtomicInteger i=new AtomicInteger(0);

//自增

i.getAndIncrement();

其实AtomicInteger的getAndIncrement方法实现安全的自增,底层原理就是利用了cas。

AtomicInteger类getAndIncrement相关部分源码


public class AtomicInteger extends Number implements java.io.Serializable {

    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates

    private static final Unsafe unsafe = Unsafe.getUnsafe();

    private static final long valueOffset;

    static {

        try {

            valueOffset = unsafe.objectFieldOffset

                (AtomicInteger.class.getDeclaredField("value"));

        } catch (Exception ex) { throw new Error(ex); }

    }

    private volatile int value;

    /**

     * Atomically increments by one the current value.

     *

     * @return the previous value

     */

    public final int getAndIncrement() {

        return unsafe.getAndAddInt(this, valueOffset, 1);

    }

    ...

}

getAndAddInt源码


/**

* Atomically increments by one the current value.

*

* @return the previous value

*/

public final int getAndIncrement() {

    return unsafe.getAndAddInt(this, valueOffset, 1);

}

getAndAddInt源码


//var1 AtomicInteger对象本身,var2 该对象值的引用地址,var4 需要变动的数量,var5是用过var1,var2找出的主内存中真实的值

public final int getAndAddInt(Object var1, long var2, int var4) {

    int var5;

    do {

        var5 = this.getIntVolatile(var1, var2);

    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;

}

代码逻辑:

用该对象当前的值与var5比较:

  • 如果相同,更新var5+var4并且返回true,

  • 如果不同,继续取值然后再比较,直到更新完成。

实际上就是通过cas循环比较,直到拿到最新的物理值后才进行自增。

这里打个比方更易于理解:

假设线程A和线程B两个线程同时执行getAndAddInt操作(分别跑在不同CPU上) :

  • Atomiclnteger里面的value原始值为3,即主内存中Atomiclnteger的value为3,根据JMM模型,线程A和线程B各自持有一份值为3的value的副本分别到各自的工作内存。

  • 线程A通过getIntVolatile(var1, var2)拿到value值3,这时线程A被挂起。

  • 线程B也通过getintVolatile(var1, var2)方法获取到value值3,此时刚好线程B没有被挂起并执行compareAndSwapInt方法比较内存值也为3,成功修改内存值为4,线程B打完收工,一切OK。

  • 这时线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的值数字3和主内存的值数字4不一致,说明该值己经被其它线程抢先一步修改过了,那A线程本次修改失败,只能重新读取重新来一遍了。

  • 线程A重新获取value值,因为变量value被volatile修饰,所以其它线程对它的修改,线程A总是能够看到,线程A继续执行compareAndSwaplnt进行比较替换,直到成功。


  以上就是CAS操作的基本思想。在后面我们会看到,无论程序多么复杂,其基本原理总是不变的。

和AtomicInteger类似的类还有AtomicLong用来代表long型,AtomicBoolean表示boolean型,AtomicReference表示对象引用。

AtomicInteger的使用示例:


01 public class AtomicIntegerDemo {

02     static AtomicInteger i=new AtomicInteger();

03     public static class AddThread implements Runnable{

04         public void run(){

05            for(int k=0;k<10000;k++)

06                i.incrementAndGet();

07         }

08     }

09     public static void main(String[] args) throws InterruptedException {

10         Thread[] ts=new Thread[10];

11         for(int k=0;k<10;k++){

12             ts[k]=new Thread(new AddThread());

13         }

14         for(int k=0;k<10;k++){ts[k].start();}

15         for(int k=0;k<10;k++){ts[k].join();}

16         System.out.println(i);

17     }

18 }

第6行的AtomicInteger.incrementAndGet()方法会使用CAS操作将自己加1,同时也会返回当前值(这里忽略了当前值)。如果你执行这段代码,你会看到程序输出了100000。这说明程序正常执行,没有错误。如果不是线程安全,i的值应该会小于100000才对。

Java中的指针:Unsafe类

compareAndSet()的实现


public final boolean compareAndSet(int expect, int update) {

    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);

}   

  在这里,我们看到一个特殊的变量unsafe,它是sun.misc.Unsafe类型。从名字看,这个类应该是封装了一些不安全的操作。那什么操作是不安全的呢?学习过C或者C++的话,大家应该知道,<font color=red>指针是不安全的,这也是在Java中把指针去除的重要原因</font>。如果指针指错了位置,或者计算指针偏移量时出错,结果可能是灾难性的,你很有可能会覆盖别人的内存,导致系统崩溃。

compareAndSwapInt()

  而这里的Unsafe就是封装了一些类似指针的操作。compareAndSwapInt()方法是一个navtive(本地方法)方法,它的几个参数含义如下:


public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);

  第一个参数o为给定的对象,offset为对象内的偏移量(其实就是一个字段到对象头部的偏移量,通过这个偏移量可以快速定位字段),expected表示期望值,x表示要设置的值。如果指定的字段的值等于expected,那么就会把它设置为x。

  不难看出,compareAndSwapInt()方法的内部,必然是使用CAS原子指令来完成的。

Unsafe的一些主要方法

Unsafe类还提供了一些方法,主要有以下几个(以Int操作为例,其他数据类型是类似的):


//获得给定对象偏移量上的int值

public native int getInt(Object o, long offset);

//设置给定对象偏移量上的int值

public native void putInt(Object o, long offset, int x);

//获得字段在对象中的偏移量

public native long objectFieldOffset(Field f);

//设置给定对象的int值,使用volatile语义

public native void putIntVolatile(Object o, long offset, int x);

//获得给定对象对象的int值,使用volatile语义

public native int     getIntVolatile(Object o, long offset);

//和putIntVolatile()一样,但是它要求被操作字段就是volatile类型的

public native void putOrderedInt(Object o, long offset, int x);

  ConcurrentLinkedQueue中的Node的一些CAS操作也都是使用Unsafe类来实现的。

应用程序无法直接使用Unsafe类

  这里就可以看到,虽然Java抛弃了指针。但是在关键时刻,类似指针的技术还是必不可少的。这里底层的Unsafe实现就是最好的例子。但是很不幸,JDK的开发人员并不希望大家使用这个类。获得Unsafe实例的方法是调动其工厂方法getUnsafe()。但是,它的实现却是这样:


public static Unsafe getUnsafe() {

    Class cc = Reflection.getCallerClass();

    //下面代码检查调用getUnsafe()函数的类,如果这个类的ClassLoader不为null,就直接抛出异常,拒绝工作。因此,这也使得我们自己的应用程序无法直接使用Unsafe类。它是一个JDK内部使用的专属类

    if (cc.getClassLoader() != null)

        throw new SecurityException("Unsafe");//

    return theUnsafe;

}

注意:根据Java类加载器的工作原理,应用程序的类由App Loader加载。而系统核心类,如rt.jar中的类由Bootstrap类加载器加载。Bootstrap加载器没有Java对象的对象,因此试图获得这个类加载器会返回null。所以,当一个类的类加载器为null时,说明它是由Bootstrap加载的,而这个类也极有可能是rt.jar中的类。

二.无锁的对象引用

AtomicReference

AtomicReference和AtomicInteger非常类似,不同之处就在于AtomicInteger是对整数的封装,而AtomicReference则对应普通的对象引用。也就是它可以保证你在修改对象引用时的线程安全性。

ABA问题

  在介绍AtomicReference的同时,我希望同时提出一个有关原子操作的逻辑上的不足。

  之前我们说过,线程判断被修改对象是否可以正确写入的条件是对象的当前值和期望值是否一致。这个逻辑从一般意义上来说是正确的。但有可能出现一个小小的例外,就是当你获得对象当前数据后,在准备修改为新值前,对象的值被其他线程连续修改了两次,而经过这两次修改后,对象的值又恢复为旧值。这样,当前线程就无法正确判断这个对象究竟是否被修改过。

image.png

两个线程同时从物理内存中取值,但是其中一个线程操作的比较快,利用这段时间差,进行了两次操作,先将A换成了B,后又将B换成了A,这之后,另一个线程进行CAS操作依旧成功。

这个过程,对于线程来说并不是原子操作。

这个CAS只是比较开始和结束这段过程中的两个端的值,只管开头和结尾,比较并交换,没有管中间操作。

虽然说这种情况出现的概率不大,但是依然是有可能出现的。因此,当业务上确实可能出现这种情况时,我们也必须多加防范。体贴的JDK也已经为我们考虑到了这种情况,使用AtomicStampedReference就可以很好地解决这个问题。

带有时间戳的对象引用:AtomicStampedReference

  AtomicStampedReference内部不仅维护了<font color=red>对象值,还维护了一个时间戳(我这里把它称为时间戳,实际上它可以使任何一个整数来表示状态值)</font>。当AtomicStampedReference对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要时间戳发生变化,就能防止不恰当的写入。


解决ABA的思路

原子引用 + 新增一种机制,那就是修改版本号(类似时间戳),它用来解决ABA问题。

原子引用+时间戳/版本号


AtomicStampedReference的几个API在AtomicReference的基础上新增了有关时间戳的信息:


//比较设置 参数依次为:期望值 写入新值 期望时间戳 新时间戳

public boolean compareAndSet(V expectedReference,V

newReference,int expectedStamp,int newStamp)

//获得当前对象引用

public V getReference()

//获得当前时间戳

public int getStamp()

//设置当前对象引用和时间戳

public void set(V newReference, int newStamp)

使用示例


import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicReference;

import java.util.concurrent.atomic.AtomicStampedReference;

public class ABADemo {

    /**

     * 普通的原子引用包装类

     */

    static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);

    // 传递两个值,一个是初始值,一个是初始版本号

    static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 1);

    public static void main(String[] args) {

        System.out.println("============以下是ABA问题的产生==========");

        new Thread(() -> {

            // 把100 改成 101 然后在改成100,也就是ABA

            atomicReference.compareAndSet(100, 101);

            atomicReference.compareAndSet(101, 100);

        }, "t1").start();

        new Thread(() -> {

            try {

                // 睡眠一秒,保证t1线程,完成了ABA操作

                TimeUnit.SECONDS.sleep(1);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            // 把100 改成 101 然后在改成100,也就是ABA

            System.out.println(atomicReference.compareAndSet(100, 2019) + "\t" + atomicReference.get());

        }, "t2").start();

        /

        try {

            TimeUnit.SECONDS.sleep(2);

        } catch (Exception e) {

            e.printStackTrace();

        }

        /

        System.out.println("============以下是ABA问题的解决==========");

        new Thread(() -> {

            // 获取版本号

            int stamp = atomicStampedReference.getStamp();

            System.out.println(Thread.currentThread().getName() + "\t 第一次版本号" + stamp);

            // 暂停t3一秒钟

            try {

                TimeUnit.SECONDS.sleep(1);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            // 传入4个值,期望值,更新值,期望版本号,更新版本号

            atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(),

                    atomicStampedReference.getStamp() + 1);

            System.out.println(Thread.currentThread().getName() + "\t 第二次版本号" + atomicStampedReference.getStamp());

            atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(),

                    atomicStampedReference.getStamp() + 1);

            System.out.println(Thread.currentThread().getName() + "\t 第三次版本号" + atomicStampedReference.getStamp());

        }, "t3").start();

        new Thread(() -> {

            // 获取版本号

            int stamp = atomicStampedReference.getStamp();

            System.out.println(Thread.currentThread().getName() + "\t 第一次版本号" + stamp);

            // 暂停t4 3秒钟,保证t3线程也进行一次ABA问题

            try {

                TimeUnit.SECONDS.sleep(3);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);

            System.out.println(Thread.currentThread().getName() + "\t 修改成功否:" + result + "\t 当前最新实际版本号:"

                    + atomicStampedReference.getStamp());

            System.out.println(Thread.currentThread().getName() + "\t 当前实际最新值" + atomicStampedReference.getReference());

        }, "t4").start();

    }

}


============以下是ABA问题的产生==========

true    2019

============以下是ABA问题的解决==========

t3     第一次版本号1

t4     第一次版本号1

t3     第二次版本号2

t3     第三次版本号3

t4     修改成功否:false     当前最新实际版本号:3

t4     当前实际最新值100

三.数组也能无锁:AtomicIntegerArray

除了提供基本数据类型外,JDK还为我们准备了数组等复合结构。当前可用的原子数组有:AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray,分别表示整数数组、long型数组和普通的对象数组。

这里以AtomicIntegerArray为例,展示原子数组的使用方式。

AtomicIntegerArray本质上是对int[]类型的封装,使用Unsafe类通过CAS的方式控制int[]在多线程下的安全性。它提供了以下几个核心API:


//获得数组第i个下标的元素

public final int get(int i)

//获得数组的长度

public final int length()

//将数组第i个下标设置为newValue,并返回旧的值

public final int getAndSet(int i, int newValue)

//进行CAS操作,如果第i个下标的元素等于expect,则设置为update,设置成功返回true

public final boolean compareAndSet(int i, int expect, int update)

//将第i个下标的元素加1

public final int getAndIncrement(int i)

//将第i个下标的元素减1

public final int getAndDecrement(int i)

//将第i个下标的元素增加delta(delta可以是负数)

public final int getAndAdd(int i, int delta)

AtomicIntegerArray的使用示例


01 public class AtomicIntegerArrayDemo {

02     static AtomicIntegerArray arr = new AtomicIntegerArray(10);

03     public static class AddThread implements Runnable{

04         public void run(){

05            for(int k=0;k<10000;k++)

06                arr.getAndIncrement(k%arr.length());

07         }

08     }

09     public static void main(String[] args) throws InterruptedException {

10         Thread[] ts=new Thread[10];

11         for(int k=0;k<10;k++){

12             ts[k]=new Thread(new AddThread());

13         }

14         for(int k=0;k<10;k++){ts[k].start();}

14         for(int k=0;k<10;k++){ts[k].start();}

15         for(int k=0;k<10;k++){ts[k].join();}

16         System.out.println(arr);

17     }

18 }

上述代码第2行,申明了一个内含10个元素的数组。第3行定义的线程对数组内10个元素进行累加操作,每个元素各加1000次。第11行,开启10个这样的线程。因此,可以预测,如果线程安全,数组内10个元素的值必然都是10000。反之,如果线程不安全,则部分或者全部数值会小于10000。

程序的输出结果如下:


[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

这说明AtomicIntegerArray确实合理地保证了数组的线程安全性。

四.让普通变量也享受原子操作:AtomicIntegerFieldUpdater

  有时候,由于初期考虑不周,或者后期的需求变化,一些普通变量可能也会有线程安全的需求。如果改动不大,我们可以简单地修改程序中每一个使用或者读取这个变量的地方。但显然,这样并不符合软件设计中的一条重要原则——开闭原则。也就是系统对功能的增加应该是开放的,而对修改应该是相对保守的。而且,  如果系统里使用到这个变量的地方特别多,一个一个修改也是一件令人厌烦的事情(况且很多使用场景下可能只是只读的,并无线程安全的强烈要求,完全可以保持原样)。

  如果你有这种困扰,在这里根本不需要担心,因为在原子包里还有一个实用的工具类AtomicIntegerFieldUpdater。它可以让你在不改动(或者极少改动)原有代码的基础上,让普通的变量也享受CAS操作带来的线程安全性,这样你可以修改极少的代码,来获得线程安全的保证。这听起来是不是让人很激动呢?

  根据数据类型不同,这个Updater有三种,分别是AtomicIntegerFieldUpdater、AtomicLong- FieldUpdater和AtomicReferenceFieldUpdater。顾名思义,它们分别可以对int、long和普通对象进行CAS修改。

  现在来思考这么一个场景。假设某地要进行一次选举。现在模拟这个投票场景,如果选民投了候选人一票,就记为1,否则记为0。最终的选票显然就是所有数据的简单求和。


01 public class AtomicIntegerFieldUpdaterDemo {

02     public static class Candidate{

03         int id;

04         volatile int score;

05     }

06     public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater

07         = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

08     //检查Updater是否工作正确

09     public static AtomicInteger allScore=new AtomicInteger(0);

10     public static void main(String[] args) throws InterruptedException {

11         final Candidate stu=new Candidate();

12         Thread[] t=new Thread[10000];

13         for(int i = 0 ; i < 10000 ; i++) {

14             t[i]=new Thread() {

15                 public void run() {

16                     if(Math.random()>0.4){

17                         scoreUpdater.incrementAndGet(stu);

18                         allScore.incrementAndGet();

19                     }

20                 }

21             };

22             t[i].start();

23         }

24         for(int i = 0 ; i < 10000 ; i++) {  t[i].join();}

25         System.out.println("score="+stu.score);

26         System.out.println("allScore="+allScore);

27     }

28 }

上述代码模拟了这个计票场景,候选人的得票数量记录在Candidate.score中。注意,它是一个普通的volatile变量。而volatile变量并不是线程安全的。第6~7行定义了AtomicIntegerFieldUpdater实例,用来对Candidate.score进行写入。而后续的allScore我们用来检查AtomicIntegerFieldUpdater的正确性。如果AtomicIntegerFieldUpdater真的保证了线程安全,那么最终Candidate.score和allScore的值必然是相等的。否则,就说明AtomicIntegerFieldUpdater根本没有确保线程安全的写入。第12~21行模拟了计票过程,这里假设有大约60%的人投赞成票,并且投票是随机进行的。第17行使用Updater修改Candidate.score(这里应该是线程安全的),第18行使用AtomicInteger计数,作为参考基准。

大家如果运行这段程序,不难发现,最终的Candidate.score总是和allScore绝对相等。这说明AtomicIntegerFieldUpdater很好地保证了Candidate.score的线程安全。

虽然AtomicIntegerFieldUpdater很好用,但是还是有几个注意事项:

  • 第一,<font color=red>Updater只能修改它可见范围内的变量。因为Updater使用反射得到这个变量</font>。如果变量不可见,就会出错。比如如果score申明为private,就是不可行的。

  • 第二,<font color=red>为了确保变量被正确的读取,它必须是volatile类型的</font>。如果我们原有代码中未申明这个类型,那么简单地申明一下就行,这不会引起什么问题。

  • 第三,由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它<font color=red>不支持static字段(Unsafe. objectFieldOffset()不支持静态变量)</font>。

相关文章

网友评论

      本文标题:《实战高并发程序设计》读书笔记-无锁

      本文链接:https://www.haomeiwen.com/subject/oobeyltx.html