- 当多个线程访问某个类时,不管运行环境时采用何种调度方式或者这些进程进行如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么这个类就是线程安全的。
- 原子性:提供了互斥访问,同一时刻只能有一个线程对它进行操作
- 可见性:一个线程对主内存的修改可以及时的被其他线程观察到
- 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序。
原子性
图片.pngAtomicInteger
AtomicInteger类中的incrementAndGet方法,调用CompareAndSetInt 简称CAS
有native修饰说明是底层的类,不是用java实现的
- ,AtomicLong的原理是依靠底层的cas来保障原子性的更新数据,在要添加或者减少的时候,会使用死循环不断地cas到特定的值,从而达到更新数据的目的。工作内存中的值与主存中进行比较。
- 实现原理底层采用while
循环方式,调用较多时可能会受到影响
package com.alan.concurrency.example.atomic;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@ThreadSafe
public class AtomicExample1 {
//请求数1000
public static int clientTotal = 5000;
//同时并发执行的线程数
public static int threadTotal = 200;
//count里面的值相当于工作内存,有可能和底层也就是主内存的值不一致
public static AtomicInteger count = new AtomicInteger(0);
private static void add(){
//用到unsafe类
count.incrementAndGet();
}
public static void main(String[] args) throws InterruptedException {
//定义线程池ExecutorService接口
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量,传入并发线程数 final修饰不允许重新赋值
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器闭锁。传入请求总数
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
//通过匿名内部类方式
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//semaphore控制并发数量
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
log.error("exception",e);
}
//每次执行计数器减掉一个
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}",count.get());
}
}
LongAdder
- LongAdder在AtomicLong的基础上将单点的更新压力分散到各个节点,在低并发的时候通过对base的直接更新可以很好的保障和AtomicLong的性能基本保持一致,而在高并发的时候通过分散提高了性能。
- 缺点是LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差。
package com.alan.concurrency.example.atomic;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
@Slf4j
@ThreadSafe
public class AtomicExample3 {
//请求数1000
public static int clientTotal = 5000;
//同时并发执行的线程数
public static int threadTotal = 200;
//count里面的值相当于工作内存,有可能和底层也就是主内存的值不一致
public static LongAdder count = new LongAdder();
private static void add(){
//用到unsafe类
count.increment();
}
public static void main(String[] args) throws InterruptedException {
//定义线程池ExecutorService接口
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量,传入并发线程数 final修饰不允许重新赋值
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器闭锁。传入请求总数
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
//通过匿名内部类方式
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//semaphore控制并发数量
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
log.error("exception",e);
}
//每次执行计数器减掉一个
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}",count);
}
}
AtomicReference
package com.alan.concurrency.example.atomic;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
@Slf4j
@ThreadSafe
public class AtomicExample4 {
private static AtomicReference<Integer> count = new AtomicReference<>(0);
public static void main(String[] args) {
count.compareAndSet(0,2);
count.compareAndSet(0,1);
count.compareAndSet(1,3);
count.compareAndSet(2,4);
count.compareAndSet(3,5);
log.info("count:{}",count.get());
}
}
AtomicIntegerFieldUpdater
package com.alan.concurrency.example.atomic;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
@ThreadSafe
public class AtomicExample5 {
//这里必须要使用volatile修饰符,并且非static才可以。通过lombok添加注解
@Getter
public volatile int count = 100;
private static AtomicIntegerFieldUpdater<AtomicExample5> update = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class,"count");
public static void main(String[] args) {
AtomicExample5 example5 = new AtomicExample5();
if(update.compareAndSet(example5,100,120)){
log.info("update success,{}",example5.getCount());
}
if(update.compareAndSet(example5,100,120)){
log.info("update success,{}",example5.getCount());
}else {
log.info("update failed,{}",example5.getCount());
}
}
}
AtomicStampedReference
-ABA问题 线程1准备用CAS将变量的值由A替换为B,在此之前,线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。但实际上这时的现场已经和最初不同了,尽管CAS成功,但可能存在潜藏的问题, AtomicStampedReference可以解决此问题
AtomicBoolean
- 让某一段代码只执行一次,绝对不会重复
package com.alan.concurrency.example.atomic;
import com.alan.concurrency.annoations.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@Slf4j
@ThreadSafe
public class AtomicExample6 {
//请求数5000
public static int clientTotal = 5000;
//同时并发执行的线程数
public static int threadTotal = 200;
private static AtomicBoolean isHappened = new AtomicBoolean(false);
public static void main(String[] args) throws InterruptedException { //定义线程池ExecutorService接口
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量,传入并发线程数 final修饰不允许重新赋值
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器闭锁。传入请求总数
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
//通过匿名内部类方式
executorService.execute(new Runnable() {
@Override
public void run() {
try {
//semaphore控制并发数量
semaphore.acquire();
test();
semaphore.release();
} catch (InterruptedException e) {
log.error("exception",e);
}
//每次执行计数器减掉一个
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
log.info("isHappened:{}",isHappened.get());
}
//让某一段代码只执行一次,绝对不会重复
private static void test(){
if (isHappened.compareAndSet(false, true)) {
log.info("executed");
}
}
}
图片.png
synchronized
- 修饰代码块:大括号括起来的代码,作用于调用的对象
- 修饰方法:整个方法,作用于调用的对象
- 修饰静态方法:整个静态方法,作用于所有对象
- 修饰类:括号括起来的部分,作用于所有对象
子类继承父类,父类的方法中含有synchronized关键字,子类继承此方法是不包含synchronized关键字的
package com.alan.concurrency.example.sync;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class SynchronizedExample1 {
//修饰一个代码块。作用范围是大括号括起的代码
public void test1(int j){
synchronized (this){
for (int i = 0; i < 10; i++) {
log.info("test1 {}- {}",j,i);
}
}
}
//修饰一个方法。作用范围是整个方法
public synchronized void test2(int j){
for (int i = 0; i < 10; i++) {
log.info("test2 {} - {}",j,i);
}
}
public static void main(String[] args) {
SynchronizedExample1 example1 = new SynchronizedExample1();
SynchronizedExample1 example2 = new SynchronizedExample1();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(()->{
example1.test2(1);
});
executorService.execute(()->{
example2.test2( 2);
});
}
}
- 修饰静态方法和修饰类
package com.alan.concurrency.example.sync;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class SynchronizedExample2 {
//修饰一个类。作用范围是所有对象
public void test1(int j){
synchronized (SynchronizedExample2.class){
for (int i = 0; i < 10; i++) {
log.info("test1 {}- {}",j,i);
}
}
}
//修饰一个静态方法。作用范围是所有对象
public static synchronized void test2(int j){
for (int i = 0; i < 10; i++) {
log.info("test2 {} - {}",j,i);
}
}
public static void main(String[] args) {
SynchronizedExample2 example1 = new SynchronizedExample2();
SynchronizedExample2 example2 = new SynchronizedExample2();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(()->{
example1.test1(1);
});
executorService.execute(()->{
example2.test1( 2);
});
}
}
对比
图片.png可见性
- 导致共享变量在线程中不可见的原因
1、线程交叉执行
2、重排序结合线程交叉执行
3、共享变量更新后的值没有在工作内存与主存间及时更新
可见性-synchronized方式
图片.png可见性-volatile方式 (volatile不具备原子性)
通过加入内存屏障和禁止重排序优化来实现
-
对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存
-
对volatile变量读操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量。
-
volatile写
图片.png -
volatile读
图片.png
volatile 一般用于保证变量状态
图片.png
volatile 也可以用于double check
有序性
- Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,确会影响到多线程并发执行的正确性。
- volatile、synchronized、lock
有序性原则
图片.png图片.png
图片.png
图片.png
-
总结
图片.png
网友评论