1. 无锁和有锁解决线程安全问题
1.1 非线程安全实现
在多线程的环境下,对共享资源的访问存在线程安全问题
public class TestLock {
public static void main(String[] args) {
Account.demo(new AccountUnsafe(10000));
}
}
interface Account{
public void withdraw(Integer amount);
public Integer getBalance();
public static void demo(Account account){
List<Thread> list = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
list.add(new Thread(()->{
account.withdraw(10);
}));
}
list.forEach(Thread::start);
list.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + " time:" + (end-start));
}
}
class AccountUnsafe implements Account{
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public void withdraw(Integer amount) {
balance -= amount;
}
@Override
public Integer getBalance() {
return this.balance;
}
@Override
public String toString() {
return "AccountUnsafe{" +
"balance=" + balance +
'}';
}
}
1.2 有锁保障线程安全
给 Account 对象加锁
public class TestLock {
public static void main(String[] args) {
Account.demo(new AccountSafe(10000));
}
}
interface Account{
public void withdraw(Integer amount);
public Integer getBalance();
public static void demo(Account account){
List<Thread> list = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
list.add(new Thread(()->{
account.withdraw(10);
}));
}
list.forEach(Thread::start);
list.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + " time:" + (end-start));
}
}
class AccountSafe implements Account{
private Integer balance;
public AccountSafe(Integer balance) {
this.balance = balance;
}
@Override
public synchronized void withdraw(Integer amount) {
balance -= amount;
}
@Override
public synchronized Integer getBalance() {
return this.balance;
}
@Override
public String toString() {
return "AccountUnsafe{" +
"balance=" + balance +
'}';
}
}
1.3 无锁保障线程安全
使用JUC工具包下的原子整数类,实现无锁的线程安全保障
public class TestLock {
public static void main(String[] args) {
Account.demo(new AccountSafe(10000));
}
}
interface Account{
public void withdraw(Integer amount);
public Integer getBalance();
public static void demo(Account account){
List<Thread> list = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
list.add(new Thread(()->{
account.withdraw(10);
}));
}
list.forEach(Thread::start);
list.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance() + " time:" + (end-start));
}
}
class AccountSafe implements Account{
private AtomicInteger balance;
public AccountSafe(Integer balance) {
this.balance = new AtomicInteger(balance);
}
@Override
public void withdraw(Integer amount) {
while (true){
int prev = balance.get();
int next = prev - amount;
if (balance.compareAndSet(prev,next)){
break;
}
}
// 可以简化为下面的方法
// balance.addAndGet(-1 * amount);
}
@Override
public Integer getBalance() {
return balance.get();
}
@Override
public String toString() {
return "AccountUnsafe{" +
"balance=" + balance +
'}';
}
}
2. CAS原理
2.1 cas解决线程安全的方法图
![](https://img.haomeiwen.com/i13992547/5cd595a00860ac18.png)
2.2 原理细节
- volatile
获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。CAS 必须借助 volatile 才能读取到共享变量的最新值来实现比较并交换的效果。
- CAS底层实现
CAS 的底层是 lock cmpxchg 指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证比较-交换的原子性。在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
- 无锁效率高的原因
无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
- CAS的特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
- CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系
- synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量
- CAS 体现的是无锁并发、无阻塞并发
- 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
3. 原子整数
例如:AtomicInteger,AtomicBoolean,AtomicLong
//API测试
public class TestLock {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger();
System.out.println(i.get()); //0
System.out.println(i.getAndIncrement()); //0
System.out.println(i.incrementAndGet()); //2
System.out.println(i.getAndDecrement()); //2
System.out.println(i.decrementAndGet()); //0
System.out.println(i.getAndAdd(5)); //0
System.out.println(i.addAndGet(5)); //10
//getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
System.out.println(i.getAndUpdate(x->x+5)); //10
System.out.println(i.updateAndGet(x->x+5)); //20
//getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10,(p,x)->p+x)); //20
System.out.println(i.accumulateAndGet(10,(p,x)->p+x)); //40
}
}
4. 原子引用
例如:AtomicReference,AtomicMarkableReference,AtomicStampedReference
4.1 AtomicReference
使用 AtomicReference 实现对大整数的无锁线程安全操作
//模拟银行取款的大整数无锁线程安全操作
public class TestLock {
public static void main(String[] args) {
DecimalAccount.demo(new DecimalAccountSafe(new BigDecimal(10000)));
}
}
interface DecimalAccount{
public void withdraw(BigDecimal amount);
public BigDecimal getBalance();
public static void demo(DecimalAccount account){
List<Thread> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(new Thread(()->{
account.withdraw(BigDecimal.TEN);
}));
}
list.forEach(Thread::start);
list.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(account.getBalance());
}
}
class DecimalAccountSafe implements DecimalAccount{
AtomicReference<BigDecimal> balance;
public DecimalAccountSafe(BigDecimal balance) {
this.balance = new AtomicReference<>(balance);
}
@Override
public void withdraw(BigDecimal amount) {
while (true){
BigDecimal prev = balance.get();
BigDecimal next = prev.subtract(amount);
if (balance.compareAndSet(prev,next)){
break;
}
}
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
}
4.2 ABA问题
- 问题演示
如果使用普通的 AtomicReference 会出现仅能判定共享变量的值和初始值是否相同,但是不能感知到该值是否在此之前被修改后,又被修改回来初始值的情况
public class TestLock {
private static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
String prev = ref.get();
other(); //其他线程修改该值
TimeUnit.MICROSECONDS.sleep(1000);
//依旧可以修改成功
System.out.println("main: " + ref.compareAndSet(prev, "B"));
}
private static void other() throws InterruptedException {
new Thread(() -> {
System.out.println("Thread1: " + ref.compareAndSet(ref.get(), "B"));
}).start();
TimeUnit.MICROSECONDS.sleep(1000);
new Thread(() -> {
System.out.println("Thread2:" + ref.compareAndSet(ref.get(), "A"));
}).start();
}
}
- 使用 AtomicStampedReference 解决 ABA问题
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,可以获得引用变量中途被更改了几次。
public class TestLock {
private static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
String prev = ref.getReference();
int stamp = ref.getStamp();
other();
TimeUnit.MICROSECONDS.sleep(1000);
System.out.println("main: " + ref.compareAndSet(prev,"B",stamp,1));
}
private static void other() throws InterruptedException {
new Thread(() -> {
System.out.println("Thread1: " + ref.compareAndSet(ref.getReference(), "B",0,1));
}).start();
TimeUnit.MICROSECONDS.sleep(1000);
new Thread(() -> {
System.out.println("Thread2:" + ref.compareAndSet(ref.getReference(), "A",1,2));
}).start();
}
}
- 使用 AtomicMarkableReference 解决 ABA问题
AtomicStampedReference 可以给原子引用加上版本号,但是有时候并不关心引用变量更改了几次,只是单纯的关心是否更改过,此时可以使用该类。
public class TestLock {
private static AtomicMarkableReference<String> ref = new AtomicMarkableReference<>("A",true);
public static void main(String[] args) throws InterruptedException {
String prev = ref.getReference();
other();
TimeUnit.MICROSECONDS.sleep(1000);
System.out.println("main: " + ref.compareAndSet(prev,"B",true,false));
}
private static void other() throws InterruptedException {
new Thread(() -> {
System.out.println("Thread1: " + ref.compareAndSet(ref.getReference(), "B",true,false));
}).start();
TimeUnit.MICROSECONDS.sleep(1000);
new Thread(() -> {
System.out.println("Thread2:" + ref.compareAndSet(ref.getReference(), "A",false,false));
}).start();
}
}
5.原子数组
例如:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
//分别使用不安全和安全的数组,来保存累计数和
public class TestLock {
public static void main(String[] args) {
//不安全的数组
demo(
() -> new int[10],
array -> array.length,
(array, index) -> array[index]++,
array -> System.out.println(Arrays.toString(array))
);
//安全的数组
demo(
() -> new AtomicIntegerArray(10),
array -> array.length(),
(array, index) -> array.getAndIncrement(index),
array -> System.out.println(array)
);
}
//函数式编程
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->void
private static <T> void demo(
Supplier<T> arraySupplier,
Function<T, Integer> lengthFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer
) {
List<Thread> list = new ArrayList<>();
T array = arraySupplier.get();
int len = lengthFun.apply(array);
for (int i = 0; i < len; i++) {
list.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j % len);
}
}));
}
list.forEach(Thread::start);
list.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
printConsumer.accept(array);
}
}
- result
[8239, 8236, 8257, 8248, 8226, 8250, 8262, 8250, 8232, 8246]
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
6.字段更新器
例如:AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater利用
字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现 异常
//更新字段实例
public class TestLock {
private volatile Integer fieldNumber;
public static void main(String[] args) {
AtomicReferenceFieldUpdater fieldUpdater = AtomicReferenceFieldUpdater.newUpdater(TestLock.class,Integer.class,"fieldNumber");
TestLock testLock = new TestLock();
fieldUpdater.compareAndSet(testLock,null,10);
System.out.println(testLock.fieldNumber);
fieldUpdater.compareAndSet(testLock,10,20);
System.out.println(testLock.fieldNumber);
fieldUpdater.compareAndSet(testLock,10,30);
System.out.println(testLock.fieldNumber);
}
}
- result
10
20
20
7.原子累加器
7.1 累加器性能比较
性能提升的原因是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]... 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
public class TestLock {
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
demo(()->new AtomicLong(0),adder->adder.getAndIncrement());
}
System.out.println("+++++++++++++++++");
for (int i = 0; i < 5; i++) {
demo(()->new LongAdder(),adder->adder.increment());
}
}
private static <T>void demo(Supplier<T> adderSupplier,Consumer<T> action){
List<Thread> list = new ArrayList<>();
T adder = adderSupplier.get();
long start = System.nanoTime();
// 40个线程,每个线程累加 5 万
for (int i = 0; i < 40; i++) {
list.add(new Thread(()->{
for (int j = 0; j < 50000; j++) {
action.accept(adder);
}
}));
}
list.forEach(Thread::start);
list.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println("time:"+ (end - start)/1000_000 + " ms");
}
}
- result
time:115 ms
time:78 ms
time:76 ms
time:75 ms
time:93 ms
+++++++++++++++++
time:26 ms
time:14 ms
time:24 ms
time:18 ms
time:18 ms
7.2 LongAdder原理
8.Unsafe类
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
//使用Unsafe完成CAS操作
public class TestLock {
public static void main(String[] args) throws NoSuchFieldException {
Unsafe unsafe = UnsafeAccessor.getUnsafe();
// 获得成员变量
Field id = Student.class.getDeclaredField("id");
Field name = Student.class.getDeclaredField("name");
// 获得成员变量的偏移量
long idOffset = unsafe.objectFieldOffset(id);
long nameOffset = unsafe.objectFieldOffset(name);
Student student = new Student();
// 使用 cas 方法替换成员变量的值
unsafe.compareAndSwapInt(student,idOffset,0,10);
unsafe.compareAndSwapObject(student,nameOffset,null,"yorick");
System.out.println(student);
}
}
@Data
class Student{
volatile int id;
volatile String name;
}
class UnsafeAccessor{
private static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
public static Unsafe getUnsafe() {
return unsafe;
}
}
网友评论