什么是原子性
如果把一个事务可看作是一个程序,它要么完整的被执行,要么完全不执行。这种特性就叫原子性
问题1
public class Counter {
volatile int i = 0;
public void add() {
i++;
}
}
public class Demo1_CounterTest {
public static void main(String[] args) throws InterruptedException {
final Counter ct = new Counter ();
// 开启10个线程对Counter对象中的i进行累加
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
ct.add();
}
System.out.println("done...");
}
}).start();
}
Thread.sleep(6000L);
System.out.println(ct.i);
// 无法达到预期输出结果 100000
}
}
以上代码虽然对实现了变量 i 的可见性,但是并没有实现对 i 的原子操作。其他线程都能读到 i ,但读取过后 i 又发生改变,就会导致读取的 i 是一个失效的值,从而引发原子性问题
原子操作
- 原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序不可以被打乱,也不可以被切割而只执行其中一部分。
- 将整个操作视作为一个整体,资源在该次操作中保持一致,这是原子性的核心特征
解决原子性问题
- Atomic 原子类
import java.util.concurrent.atomic.AtomicInteger;
public class Counter {
// 使用原子类
AtomicInteger i = new AtomicInteger(0);
public void add() {
i.incrementAndGet();
}
}
- synchronized 同步关键字
public class Counter {
volatile int i = 0;
// 加入同步关键字,即可实现线程安全同步锁,一个线程执行完了才能执行下一个线程
public synchronized void add() {
i++;
}
}
- Lock 锁
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Counter {
volatile int i = 0;
// 加入锁,在同一时间只会有一个线程抢到锁
Lock lock = new ReentrantLock();
public void add() {
lock.lock();
try {
i++;
}finally {
lock.unlock();
}
}
}
CAS(Compare and swap)
-
Compare and swap 比较和交换。属于硬件同步原语,处理器提供了基本内存操作的原子性保证
-
CAS操作需要输入两个数值,一个旧值(期望操作前的值)和一个新值,在操作期间先对旧值进行比较,若没有发生变化,才交换成新的值,发生了变化则不交换
-
Java中的sun.misc.Unsafe类,提供了compareAndSwaplnt() 和 compareAndSwapLong() 等几个方法实现CAS
CAS操作会用原来的值进行比较,比较后再设置新的值。如果发现比较的值不对,则本次操作失败。进入自旋(重新再执行)
模拟CAS操作
public class Counter {
volatile int i = 0;
private static Unsafe unsafe = null;
private static long valueOffset;
static {
//unsafe = Unsafe.getUnsafe(); // JDK不提供,通过反射来实现
try{
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
//获取i字段的offset
Field iField = Counter.class.getDeclaredField("i");
valueOffset = unsafe.objectFieldOffset(iField);
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
public void add() {
while(true) {
// 拿到i字段的当前值
int current = unsafe.getIntVolatile(this, valueOffset);
// 用CAS操作将i加1
if (unsafe.compareAndSwapInt(this, valueOffset, current, current+1)) {
break;
}
}
}
}
AtomicInteger内部实现的就是CAS原子性操作
J.U.C包内的原子操作封装类
类名 | 解释 |
---|---|
AtomicBoolean | 原子更新布尔类型 |
AtomicInteger | 原子更新整型 |
AtomicLong | 原子更新长整型 |
AtomicIntegerArray | 原子更新整型数组里的元素 |
AtomicLongArray | 原子更新长整型数组里的元素 |
AtomicReferenceArray | 原子更新引用类型数组里的元素 |
AtomicIntegerFieldUpdater | 原子更新整型字段的更新器 |
AtomicLongFieldUpdater | 原子更新长整型字段的更新器 |
AtomicReferenceFieldUpdater | 原子更新引用类型的字段 |
AtomicReference | 原子更新引用类型 |
AtomicStampedReference | 原子更新带有版本号的引用类型 |
AtomicMarkableReference | 原子更新带有标记位的引用类型 |
1.8更新:计数器增强版,高并发下性能更好 | 原理:分成多个单元,不同线程更新不同的单元,只有需要汇总的时候才计算所有单元的操作 |
DoubleAccumulator | 更新器 |
LongAccumulator | 更新器 |
DoubleAdder | 计数器 |
LongAdder | 计数器 |
LongAccumulator示例
import java.util.concurrent.atomic.LongAccumulator;
public class Demo_LongAccumulator {
public static void main(String[] args) throws InterruptedException {
/*
LongAccumulator 可以帮我们做到自定义的累加计算
第一个参数为lambda方法,y表示状态值,x表示每次传入的值
第二个参数为状态值,(这里传入0,则y一开始等于0)
*/
LongAccumulator accumulator = new LongAccumulator(
(y,x)->{
System.out.println("x:" + x + ",y:" + y);
// 自定义累加逻辑 (这里为:x + y)
return x + y;
},
0L);
for (int i = 0; i < 3; i++) {
// 把1传入到 LongAccumulator 中
accumulator.accumulate(1);
}
System.out.println("result=" + accumulator.get());
}
}
LongAdder示例
import java.util.concurrent.atomic.LongAdder;
public class Demo_LongAdder {
public static void main(String[] args) throws InterruptedException {
/*
LongAdder类与AtomicLong类的区别在于
高并发时前者将对单一变量的CAS操作
分散为对数组cells中多个元素的CAS操作,取值时进行求和;
而在并发较低时仅对base变量进行CAS操作,与AtomicLong类原理相同
*/
LongAdder lacount = new LongAdder();
for (int i = 0; i < 6; i++) {
new Thread(() -> {
long starttime = System.currentTimeMillis();
while (System.currentTimeMillis() - starttime < 2000) { // 运行两秒
//increment()方法就是对add(long x)的封装
lacount.increment();
}
long endtime = System.currentTimeMillis();
}).start();
}
Thread.sleep(3000);
// 返回的是base和cells数组中所有元素的和,这里的base像是一个初始值的作用
System.out.println(lacount.sum());
}
}
CAS的三个问题
- 循环+CAS,自旋的实现让所有线程都处于高频运行,争抢CPU执行时间的状态。如果长时间不成功,会带来很大的CPU资源消耗
- 仅针对单个变量的操作,不能用于多个变量来实现原子操作
- ABA问题。
ABA问题
ABA问题- 线程1、线程2同时读取到i=0后
- 线程1、线程2都要执行CAS操作
- 假设线程2操作稍后于线程1、则线程1执行成功,线程2执行失败
- 但紧接着线程1又执行了CAS(1,0),将i的值改回0,此时线程2执行,引发ABA问题
AtomicStampedReference 加入版本号的数据比较更新
// 存储在栈里面元素 -- 对象
public class Node {
public final String value;
public Node next;
public Node(String value) {
this.value = value;
}
@Override
public String toString() {
return "value=" + value;
}
}
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.LockSupport;
public class ConcurrentStack {
// top cas无锁修改
//AtomicReference<Node> top = new AtomicReference<Node>();
AtomicStampedReference<Node> top =
new AtomicStampedReference<>(null, 0);
public void push(Node node) { // 入栈
Node oldTop;
int v;
do {
v = top.getStamp();
oldTop = top.getReference();
node.next = oldTop;
}
while (!top.compareAndSet(oldTop, node, v, v+1)); // CAS 替换栈顶
}
// 出栈 -- 取出栈顶 ,为了演示ABA效果, 增加一个CAS操作的延时
public Node pop(int time) {
Node newTop;
Node oldTop;
int v;
do {
v = top.getStamp();
oldTop = top.getReference();
if (oldTop == null) { //如果没有值,就返回null
return null;
}
newTop = oldTop.next;
if (time != 0) { //模拟延时
LockSupport.parkNanos(1000 * 1000 * time); // 休眠指定的时间
}
}
while (!top.compareAndSet(oldTop, newTop, v, v+1)); //将下一个节点设置为top
return oldTop; //将旧的Top作为值返回
}
}
网友评论