一、无锁
对于并发控制而言,锁是一种悲观的策略,总是假设每一次进入临界区操作都会产生冲突,如果多线程访问临界区资源,就宁可牺牲性能让线程等待,所以锁会阻塞线程执行。而无锁是一种乐观策略,它会假设对临界区资源的访问是没有冲突的,所有线程都会不停顿的执行,但是如果遇到冲突那该怎么办?无锁策略使用一种叫做比较交换的技术(CAS Compare And Swap)来鉴定线程冲突,一旦检测到冲突,就进行重试直至没有冲突为止。
1.1 比较交换(CAS)
使用无锁的方式没有锁竞争带来的系统开销,也没有线程间上下文频繁调度带来的开销,相对有锁而言具有优越的性能。
CAS算法的过程是这样的:它包含三个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。CAS操作是抱着乐观的态度进行的,它总认为自己可以完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。
1.2 无锁的线程安全整数
JDK并发包中有一个atomic包,里面实现了一些直接使用CAS操作的线程安全的类型。其中,最常用的一个类,应该就是AtomicInteger。可以把它看做是一个整数。但是与Integer不同,它是可变的,并且是线程安全的。对其进行任何修改等操作,都是用CAS指令进行的。AtomicInteger的一些主要方法:
//获取当前的值
public final int get()
//设置当前值
public final void set(int newValue)
//取当前的值,并设置新的值
public final int getAndSet(int newValue)
//如果当前值为expect,则设置为update
public final boolean compareAndSet(int expect,int update)
//获取当前的值,并自增
public final int getAndIncrement()
//获取当前的值,并自减
public final int getAndDecrement()
//,返回当前值,并增加delta
public final int addAndGet(int delta)
//当前值+1,返回新值
public final int incrementAndGet()
//当前值-1,返回新值
public final int decrementAndGet()
AtomicInteger有两个核心字段:
//当前实际取值
private volatile int value;
//value字段在AtomicInteger对象中的偏移量
private static final long valueOffset;
AtomicInteger使用的具体示例如下:
public class AtomicIntegerDemo {
static AtomicInteger i = new AtomicInteger();
public static class AddThred implements Runnable {
@Override
public void run() {
for(int k=0; k<10000; k++) {
i.incrementAndGet();
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] ts = new Thread[10];
for (int k = 0; k < 10; k++) {
ts[k] = new Thread(new AddThred());
}
for(int k=0; k<10; k++) {ts[k].start();}
for(int k=0; k<10; k++) {ts[k].join();}
System.out.println(i);
}
}
如果执行这段代码,程序输出100000。说明程序正常执行,没有错误。如果不是线程安全的,i的值应该会小于100000。
incrementAndGet()的内部实现为:
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}
第2行的死循环是因为CAS操作未必是成功的,因此对于不成功的情况,我们就需要进行不断的尝试。第3行的get()取得当前的值,接着加1后得到新值next。这样,得到了CAS必需的两个参数:期望值以及新值。使用compareAndSet()方法将新值next写入,成功的条件是在写入的时刻,当前的值应该要等于刚刚取得的current。如果不是这样,说明AtomicInteger的值在第3行到第5行代码之间,又被其他线程修改了。当前线程看到的状态就是一个过期状态。因此,compareAndSet返回失败,需要下一次重试,直到成功。
和AtomicInteger类似的类还有AtomicLong用来代表long型,AtomicBoolean表示boolean型,AtomicReference表示对象引用。
1.3 无锁的对象引用:AtomicReference
AtomicReference是对普通的对象引用,也就是它可以保证你在修改对象引用时的线程安全性。线程判断被修改对象是否可以正确写入的条件是对象的当前值和期望值是否一致。但有可能出现例外,当获得对象当前数据后,对象的值又恢复为旧值。这样,当前线程就无法正确判断这个对象究竟是否被修改过。有这样一个案例:打一个比方,如果有一家蛋糕店,为了挽留客户,决定为贵宾卡里余额小于20元的客户一次性赠送20元。但条件是,每一位客户只能被赠送一次。
定义用户账户余额:
static AtomicReference<Integer> money = new AtomicReference<Integer>();
money.set(19);
接着,需要若干个后台线程,它们不断扫描数据,并为满足条件的客户充值:
for(int i=0; i<3; i++) {
new Thread() {
public void run() {
while(true) {
while(true) {
Integer m = money.get();
if(m<20) {
if(money.compareAndSet(m, m+20)) {
System.out.println("余额小于20元,充值成功,余额:"+ money.get() + "元");
break;
} else {
break;
}
}
}
}
};
}.start();
此时,如果很不幸,就在赠予金额到账的同时,用户进行了一次消费,正好消费20元,这时的金额又小于20元。这时,后台的赠予就会误以为这个账户还没有赠予。所以,存在多次被赠予的可能。下面模拟了这个消费线程:
new Thread() {
public void run() {
for (int j = 0; j < 100; j++) {
while(true) {
Integer m = money.get();
if(m>10) {
System.out.println("大于10元");
if(money.compareAndSet(m, m-10)) {
System.out.println("成功消费10元,余额:" + money.get());
break;
}
} else {
System.out.println("没有足够金额");
break;
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
};
}.start();
上述代码,消费者只要贵宾卡里的钱大于10元,就会立即进行一次10元的消费。执行上述代码,得到的输出如下:
余额小于20元,充值成功,余额:39元
大于10元
成功消费10元,余额:29
大于10元
成功消费10元,余额:19
余额小于20元,充值成功,余额:39元
大于10元
成功消费10元,余额:29
大于10元
成功消费10元,余额:39
余额小于20元,充值成功,余额:39元
可以看到,这个账户被先后反复多次充值。其原因正是因为账户余额被反复修改,修改后的值等于原有的值,使得CAS操作无法正确判断当前数据状态。
1.4 带有时间戳的队形引用:AtomicStampedReference
之所以出现上面的这种情况,是因为AtomicReference在对象修改过程中,丢失了状态信息。AtomicStampedReference则解决了上述问题,其内部不仅维护了对象值,还维护了一个时间戳。当AtomicStampedReference对应的数值被修改时,除了更新数据本身外,还必须要更新时间戳。当AtomicStampedReference设置对象值时,对象值以及时间戳都必须满足期望值,写入才会成功。因此,即使对象值被反复读写,写回原值,只要时间戳发生变化,就能防止不恰当的写入。
AtomicStampedReference的几个API在AtomicReference的基础上新增了有关时间戳的信息:
//比较设置参数依次为:期望值 写入新值 期望时间戳 新时间戳
public boolean compareAndSet(V expectedReference, V newReference,int expectedStamp,int newStamp)
//获得当前对象引用
public V getReference()
//获得当前时间戳
public int getStamp()
//设置当前对象引用和时间戳
public void set(V newReference, int newStamp)
我们使用AtomicStampedReference解决上面反复充值的问题:
public class AtomicStampedReferenceDemo {
static AtomicStampedReference<Integer> money = new AtomicStampedReference<Integer>(19,0);
public static void main(String[] args) {
for(int i=0; i<3; i++) {
final int timestamp = money.getStamp();
new Thread() {
public void run() {
while(true) {
while(true) {
Integer m = money.getReference();
if(m<20) {
if(money.compareAndSet(m, m+20,timestamp,timestamp+1)) {
System.out.println("余额小于20元,充值成功,余额:"+ money.getReference() + "元");
break;
} else {
break;
}
}
}
}
};
}.start();
}
new Thread() {
public void run() {
for (int j = 0; j < 100; j++) {
while(true) {
int timestamp = money.getStamp();
Integer m = money.getReference();
if(m>10) {
System.out.println("大于10元");
if(money.compareAndSet(m, m-10,timestamp,timestamp+1)) {
System.out.println("成功消费10元,余额:" + money.getReference());
break;
}
} else {
System.out.println("没有足够金额");
break;
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO: handle exception
}
}
};
}.start();
}
}
结果输出如下:
余额小于20元,充值成功,余额:39元
大于10元
成功消费10元,余额:29
大于10元
成功消费10元,余额:19
大于10元
成功消费10元,余额:9
没有足够金额
1.5 数组也能无锁:AtomicIntegerArray
当前可用的原子数组有:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray,分别表示整数数组、long型数组和普通对象数组。以AtomicIntegerArray为例,展示原子数组的使用方式。AtomicIntegerArray本质上是对int[]类型的封装,使用Unsafe类通过CAS的方式控制int[]在多线程的安全性。它提供以下几个核心API:
//获得数组第i个下标的元素
public final int get(int i)
//获得数组的长度
public final int length()
//将数组第i个下标设置为newValue,并返回旧的值
public final int getAndSet(int i, int newValue)
//进行CAS操作,如果第i个下标的元素等于expect,则设置为update,设置成功返回true
public final boolean compareAndSet(int i, int expect, int update)
//将第i个下标元素加 1
public final int getAndIncrement(int i)
//将第i个下标的元素减1
public final int getAndDecrement(int i)
//将第i个下标的元素增加delta(delta可以是负数)
public final int getAndAdd(int i, int delta)
下面展示AtomicIntegerArray的使用:
public class AtomicIntegerArrayDemo {
static AtomicIntegerArray arr = new AtomicIntegerArray(10);
public static class AddThread implements Runnable {
@Override
public void run() {
for(int k=0; k<10000; k++) {
arr.getAndIncrement(k%arr.length());
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] ts = new Thread[10];
for(int k=0; k<10; k++) {
ts[k] = new Thread(new AddThread());
}
for(int k=0; k<10; k++) {
ts[k].start();
}
for(int k=0; k<10; k++) {
ts[k].join();
}
System.out.println(arr);
}
}
结果为:
[10000,10000,10000,10000,10000,10000,10000,10000,10000,10000]
如果线程安全,数组内10个元素的值必然都是10000。反之,如果线程不安全,则部分或者全部数值会小于10000。
1.6 让普通变量也享受原子操作:AtomicIntegerFieldUpdater
可能由于初期考虑不周,一些普通变量会有线程安全的需求,原子包中的一个实用工具类AtomicIntegerFieldUpdater
可以让普通变量享受CAS原子操作带来的线程安全性。根据类型的不同,这个Updater
有三种:AtomicIntegerFieldUpdater
,AtomicLongFieldUpdater
和AtomicReferenceFieldUpdater
。分别可以对int
,long
和普通对象进行CAS修改。
现在思考一个场景。假设某地要进行一次选举。现在模拟这个投票场景,如果选民投了候选人一票,就记为1,否则记为0。最终的选票显然就是所有数据的简单求和。
public class AtomicIntegerFieldUpdaterDemo {
public static class Candidate {
int id;
volatile int score;
}
public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater
= AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");
public static AtomicInteger allScore = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
final Candidate stu = new Candidate();
Thread[] t = new Thread[10000];
for(int i=0; i<10000; i++) {
t[i] = new Thread() {
@Override
public void run() {
if(Math.random() > 0.4) {
scoreUpdater.incrementAndGet(stu);
allScore.incrementAndGet();
}
}
};
t[i].start();
}
for(int i=0; i<10000; i++) {t[i].join();}
System.out.println("score=" + stu.score);
System.out.println("allScore=" + allScore);
}
}
运行这段程序,最终的Candidate.score
总是和allScore
绝对相等。这说明AtomicIntegerFieldUpdater
很好地保证了Candidate.score
的线程安全。
注意:
- Updater只能修改它可见范围的变量。因为Updater是通过反射得到的这个变量。如果变量不可见,会出错。比如score申明为private,就不行;
- 为了确保变量被正确的读取,必须是volatile修饰;
- 由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(public native long objectFieldOffset()不支持静态变量) 。
二、有关死锁的问题
死锁:两个或者多个线程,相互占用对方需要的资源,而都不进行释放,导致彼此之间都相互等待对方释放资源,产生了无限制等待的现象。死锁一旦发生,如果没有外力接入,这种等待将永远存在,从而对线程产生严重的影响。
下面用一个简单的例子模拟死锁问题:
public class DeadLock extends Thread {
protected Object tool;
static Object fork1 = new Object();
static Object fork2 = new Object();
public DeadLock(Object obj) {
this.tool = obj;
if(tool == fork1) {
this.setName("A");
}
if(tool == fork2) {
this.setName("B");
}
}
@Override
public void run() {
if(tool == fork1) {
synchronized (fork1) {
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (fork2) {
System.out.println("哲学家A开始吃饭了");
}
}
}
if(tool == fork2) {
synchronized (fork2) {
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (fork1) {
System.out.println("哲学家B开始吃饭了");
}
}
}
}
public static void main(String[] args) throws InterruptedException {
DeadLock A = new DeadLock(fork1);
DeadLock B = new DeadLock(fork2);
A.start();
B.start();
Thread.sleep(1000);
}
}
上述代码模拟了两个哲学家互相等待对方的叉子。如果吃饭必须用两个叉子,哲学家A先占用叉子1,哲学家B占用叉子2,接着他们就互相等待,都没有办法获得两个叉子用餐,这就是死锁。
如果在实际环境中,遇到了这种情况,通常的表现就是相关的进程不再工作,并且CPU占用率为0(因为死锁的线程不占用CPU)。想确认问题,需要使用JDK提供的工具。首先,可以使用jps命令得到java进程的进程ID,接着使用jstack命令得到线程的线程堆栈:
jps
获取进程。
jstack [id]
查看指定进程的堆栈信息。
网友评论