JMM
JMM:java内存模型(Java Memory Model简称JMM),JMM本身是一种规范,并不真实存在,就像是十二生肖一样 也是一种概念。
JMM关于同步的规定
- 1 线程解锁前,必须把共享变量刷新回主内存
- 2 线程加锁前,必须读取主内存共享变量的最新值到自己的工作内存中
-
3 加锁和解锁是同一把锁。
JVM运行程序的单元是线程,每个线程创建时 JVM都会为其创建一把工作内存(有些地方也称为栈空间),工作内存是每个线程的私有空间,而JMM中规定所有共享变量等都存在于主内存中,主内存是共享内存区域,所有线程都可以访问。但是线程对变量的操作(读写)必须要在工作内存中进行的,首先将从主内存复制到工作内存中,操作完成后再将变量写回主内存,不能直接操作主内存的变量。各个线程之间无法对方的工作内存,因此线程间的通信是通过主内存来完成的。
image.png
为什么说volatile是轻量级的同步机制?
JMM对于同步的要求有三个:
- 1 可见性
- 2 原子性
- 3 有序性
volatile三个特性 - 1 保证可见性
- 2 不保证原子性
- 3 禁止指令重排
volatile可以保证可见性和有序性(禁止重排可以保证有序性),但是不保证原子性,所以是轻量化的同步机制
volatile保证可见性demo
public class Test1 {
//验证volatile的可见性
public static void main(String[] args) {
Mydata mydata = new Mydata();
new Thread(()->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"线程进入");
mydata.addTO10();
},"a").start();
while (mydata.num==0){//如果一直为0就一直卡在这里
}
System.out.println("main线程结束");
}
}
class Mydata{
volatile int num;
public void addTO10(){
num = 10;
}
}
如果num不加volatile 修饰,那么运行main函数后 结果是A线程跑完,main线程一直卡在while循环里面,因为主线程无法得到主内存中的num的 最新数据
image.png
加了volatile ,由于volatile 修改了主内存的数据后,会及时通知其他工作线程(例子里面的是main线程)刷新数据到工作内存里,这样main线程就能拿到最新数据了,main线程也就跑完程序了
image.png
volatile不保证原子性Demo
public class Test1 {
//验证volatile的不保证原子性
public static void main(String[] args) {
Mydata mydata = new Mydata();
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
mydata.addPlusPlus();
}
},""+i).start();
}
while (Thread.activeCount()>2){//等上面的线程跑完, 两个线程是 main 和gc线程,
Thread.yield();
}
System.out.println("20个线程各自执行1000次++后的结果是"+mydata.num);
}
}
class Mydata{
volatile int num;
public void addPlusPlus(){
num ++;
}
}
运行结果,小于20*1000=20000这个数
分析
i++ 分为三步
- 1 将主内存的数据 读到工作内存里面
- 2 将数据+1
- 3 将结果写入到主内存里面
volatile不保证原子性的分析:在一个线程t1将自己的工作内存的更新后的数据写入到主内存的之后,此时如果其他线程t2如果发生读操作(也就是上面的第一步), t2会去主内存加载最新数据到工作内存,但是 如果此时t2线程已经处于第二步了,那么t2得到的数据就是错误的,造成最终的数据不一致,这就是volatile不保证原子性的原因。
解决volatile不保证原子性的方法是使用automic类,为什么原子类能解决这个问题? 后面会有
public class Test1 {
//验证volatile的不保证原子性
public static void main(String[] args) {
Mydata mydata = new Mydata();
for (int i = 0; i < 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
mydata.addPlusPlus();
mydata.atomicAdd();
}
},""+i).start();
}
while (Thread.activeCount()>2){
Thread.yield();
}
System.out.println("20个线程各自执行1000次++后-普通变量-的结果是"+mydata.num);
System.out.println("20个线程各自执行1000次++后-atomic-的结果是"+mydata.num2);
}
}
class Mydata{
volatile int num;
public void addPlusPlus(){
num ++;
}
AtomicInteger num2 = new AtomicInteger();
public void atomicAdd(){
num2.getAndIncrement();//获取并且自增
}
}
执行结果
指令重排
计算机在执行程序时,为了提高性能,编译器和处理器常常会对指令做重新拍下的操作,指令重排一般分为以下三种:
image.png
处理器在进行重排时必须遵守的规则是数据的依赖性: 比如数据b 依赖于数据a的值,那么b的初始化肯定是要在a之后的
多线程环境下 线程交替执行,有序重排序的存在,两个线程的变量能否保住一致性是无法确定的,结果也是无法预测的
比如以下例子:
public void reSort(){
int x = 11;//语句1
int y = 12;//语句2
x = x+5;//语句3
y = x * x;//语句4
// 最终的执行命令顺序可能是1234,2134,1324(注意无论怎么排,4都不肯是第一步,违背的数据依赖性原则)
}
多线程下的情况
volatile 实现禁止指令重排优化,从而避免多线程环境下的乱序执行现象。
内存屏障 Memory Brrrier: 也称为内存栅栏,它是一个CPU指令。作用有两项
image.png
1 保证特定操作的执行顺序。
2 保证某些变量的可见性(volatile 就是利用该特性实现可见性的)
如果来指令间插入Memory Brrrier,它就会告诉编译器和CPU,不管什么指令都不能和这条Memory Brrrier指令重排序,Memory Brrrier的另外一个作用就是强制刷出各种CPU的缓存数据,因此CPU的线程都能读取到数据的最新版本。
原子类为什么能够解决volatile原子性问题?
答案:CAS自旋
compareAndSet方法:与主内存比较并设置
public static void main(String[] args) {
//原子基本变量
AtomicInteger atomicInteger = new AtomicInteger(10);//对象存于堆内,堆是共享区域,初始化为10
//当期待值10与原始值10相等,则设置成功
System.out.println(atomicInteger.compareAndSet(10,50)+",当前值"+atomicInteger.get());
//当期待值10与原始值50不等,则设置失败
System.out.println(atomicInteger.compareAndSet(10,1000)+",当前值"+atomicInteger.get());
//原子引用
User zhangsan = new User("张三",20);
User lisi = new User("李四",25);
AtomicReference<User> atomicReference = new AtomicReference<>();
atomicReference.set(zhangsan);
//如果主内存中是张三 那么改为李四 true
System.out.println(atomicReference.compareAndSet(zhangsan,lisi)+",当前值"+atomicReference.get());
//false
System.out.println(atomicReference.compareAndSet(zhangsan,lisi)+",当前值"+atomicReference.get());
}
cas保证原子性的原理:
atomicInteger.getAndIncrement();
//这里调用的是unsafe对象的方法
public final int getAndIncrement() {
//this 是当前对象 valueOffset是当前对象所处的内存地址偏移量,1位增量值
return unsafe.getAndAddInt(this, valueOffset, 1);
}
//原子性 靠的是unsafe来保证的 var1是上面一步传入的this对象,var2是上面传来的地址偏移量,var4是要增加的值
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//根据this对象和地址偏移量获取主内存的值
var5 = this.getIntVolatile(var1, var2);//这里获取的值,其实是volatile修饰的,这样能保证每次获取的都是主存中的最新值
// private volatile int value;
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));//使用var5去主内存中的值比较,如果一致则更新为var5 + var4,否则轮训
return var5;
}
CAS的全称是compare-and-swap,它是CPU的一条系统并发原语,这一过程是原子的,在这一过程中不会造成数据不一致的问题。CAS的核心类是Unsafe,由于java无法访问底层系统,需要通过本地方法(native)来访问,Unsafe类内大量的native方法就是来访问底层的,可以直接访问内存中的数据,Unsafe位于JDK中的rt.jar的sun.misc包下。多个线程访问的例子
1 假设现在有两个线程A,B来执行getAndAddInt(上面的方法)操作(分别泡在不同的CPU上)
2 atomicInteger的原始值为3,即主内存AtomicInteger的value值为3,根据JMM模型:线程A,B 均持有一份值为3的副本到自己的工作内存,
3 线程A通过 this.getIntVolatile(var1, var2)获取到的主内存值为3,这时线程A被挂起
4 线程A通过 this.getIntVolatile(var1, var2)获取到的主内存值为3,刚好这时B未被挂起,B执行compareAndSwapInt(),执行成功,主内存中值被修改为4,线程B操作完成,收工。
5 这时A恢复,执行compareAndSwapInt()方法,发现自己手里的3和主内存中的4不一致,说明这个值被别人提前动过了,所以线程A此次操作失败。
6 线程A现在重新再来,this.getIntVolatile(var1, var2)获取到的主内存值为4(这里因为AtomicInteger中的value是volatile修饰的,所以线程B修改后,线程A能感知到,所以这里获取的是4),拿这个4再去和主内存比较,比较成功 操作成功
这里为什么不使用synchonized:因为synchonized同一时刻只允许一个线程访问,cas这里允许多个访问,即保证了一致性也保证了并发性
CAS的缺点
- 1 如果cas失败,需要循环cas ,极端情况下,可能cpu开销大
- 2 cas 只能保证一个变量的原子操作
- 3 ABA问题
ABA问题:就以上面的多个线程访问的例子来说吧,如果B将3修改成4之后又将4修改成3了,那么此时A线程依然会执行成功,虽然可以执行成功,但是这并不意外这一切OK,一切都没问题
ABA问题的解决思路:
加版本号,就好比 在value之外维护一个自增变量,每次操作主内存数据后,这个自增变量都自增1,然后当其他线程来访问的时候 既需要比较value,也需要比较版本号。
public class Test2_2 {
static AtomicReference<Integer> atomicReference = new AtomicReference<>(100);//100是初始值
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100,1);//100是初始值 1是初始版本号
public static void main(String[] args) {
System.out.println("*******************下面操作会有ABA问题的出现********************");
new Thread(()->{
atomicReference.compareAndSet(100,101);//A->B
atomicReference.compareAndSet(101,100);//B->A
},"t1").start();
new Thread(()->{
//睡眠一秒 等待上面的ABA操作完成
try {TimeUnit.SECONDS.sleep(1);}catch (Exception e){}
atomicReference.compareAndSet(100,2019);//B->A
System.out.println("atomicReference最终的值是\t"+atomicReference.get());
},"t2").start();
//睡眠3秒不然结果输出太混乱
try {TimeUnit.SECONDS.sleep(2);}catch (Exception e){}
System.out.println("*******************下面操作会有ABA问题的出现********************");
new Thread(()->{
int stamp = atomicStampedReference.getStamp();//最初的版本号是1
System.out.println("t3第一次获取的版本号是\t"+stamp);
//睡眠1秒 等待t4获取最初版本号
try {TimeUnit.SECONDS.sleep(1);}catch (Exception e){}
atomicStampedReference.compareAndSet(100,101,stamp,stamp+1);
stamp = atomicStampedReference.getStamp();
System.out.println("t3第二次获取的版本号是\t"+stamp);
atomicStampedReference.compareAndSet(101,100,stamp,stamp+1);
System.out.println("t3第三次获取的版本号是\t"+atomicStampedReference.getStamp());
},"t3").start();
new Thread(()->{
int stamp = atomicStampedReference.getStamp();//最初的版本号是1
System.out.println("t4第一次获取的版本号是\t"+stamp);
//睡眠三秒 等待上面的ABA操作完成
try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
boolean result = atomicStampedReference.compareAndSet(100,2019,stamp,stamp+1);
System.out.println("最后的操作结果\t"+result+",最终的值是\t"+atomicStampedReference.getReference().longValue()+"最终的 版本号是"+atomicStampedReference.getStamp());
},"t4").start();
}
}
结果
image.png
集合类的线程不安全
public static void listNotSafe() {
List<String> list = new ArrayList<>();
// List<String> list = new Vector<>();
// List<String> list = Collections.synchronizedList(new ArrayList<>());
// CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().replace("-", "").substring(8));
System.out.println(list);
}, "t" + i).start();
}
}
/**
* 1 故障描述 多线程环境下会报错 java.util.ConcurrentModificationException
*
* 2 报错原因
* 一个线程正在写,另外一个线程抢夺了资源,导致数据不一致异常,也就是并发修改异常
*
* 3 解决方法
* 3.1 使用 new Vector<>()(效率低 使用synchronized来实现同步)
* 3.2 使用 Collections.synchronizedList(new ArrayList<>());(效率也低,也是使用synchronized来实现同步)
* 3.3 (推荐)使用 new CopyOnWriteArrayList<>();
* CopyOnWriteArrayList 写时复制,读的时候不复制,写的时候复制一个新数组,读的时候还是读老数组,这样既能保证安全也能保证效率,读写分离的思想
* CopyOnWriteArrayList add方法源码
* public void add(int index, E element) {
* final ReentrantLock lock = this.lock;
* lock.lock();//首先加锁
* try {
* Object[] elements = getArray();
* int len = elements.length;
* if (index > len || index < 0)
* throw new IndexOutOfBoundsException("Index: "+index+
* ", Size: "+len);
* Object[] newElements;
* int numMoved = len - index;
* if (numMoved == 0)
* newElements = Arrays.copyOf(elements, len + 1);
* else {
* newElements = new Object[len + 1];
* System.arraycopy(elements, 0, newElements, 0, index);
* System.arraycopy(elements, index, newElements, index + 1,
* numMoved);
* }
* newElements[index] = element;//在复制后的数组尾部加入数据
* setArray(newElements);
* } finally {
* lock.unlock();//解锁
* }
* }
*
* 并发修改异常也会出现在 set和 Map中 ,都可以使用CopyOnWriteArraySet和ConcurrentHashMap 来获得一个线程安全的版本
* 其中Set的JUC解决方案中用 CopyOnWriteArraySet,值得注意的是 CopyOnWriteArraySet中维护的是CopyOnWriteArrayList
* Map的JUC解决方案中用 ConcurrentHashMap 来解决
*/
公平锁和非公平锁
并发包中的ReentrantLock创建可以在构造函数中指定为公平锁和非公平锁,默认为非公平锁
关于二者的区别
- 公平锁:在并发环境中,每个线程在获取锁时会先查看这个锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,那么当前线程就占有这个锁,否则就加入到等待队列中,以后会按照FIFO规则中排队。
- 非公平锁:公平锁比较粗鲁,上来就尝试占有锁,如果尝试失败,再采用公平锁的那种方式,非公平锁的吞吐量比公平锁要大,synchronized就是一种非公平锁。
可重入锁(也叫递归锁)
指同一个线程,如果在外层函数获得锁之后,内层递归函数依然可以获取该锁的代码,也就是说在外层方法获取锁之后,可以进入任何一个他已经拥有锁的同步代码或者代码块。
ReentrantLock和Synchronized就是典型的可重入锁,可重入锁的最大作用是避免死锁。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Phone implements Runnable{
public synchronized void sendSMS(){
System.out.println(Thread.currentThread().getName()+"进入sendSMS方法");
sendEmail();
}
public synchronized void sendEmail(){
System.out.println(Thread.currentThread().getName()+"进入sendEmail方法");
}
@Override
public void run() {
get();
}
Lock lock = new ReentrantLock();
public void get(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"进入get方法");
set();
}finally {
lock.unlock();
}
}
public void set(){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"进入set方法");
}finally {
lock.unlock();
}
}
}
public class Test4 {
public static void main(String[] args) {
System.out.println("******************synchronized版本的可重入锁********************");
Phone phone = new Phone();
new Thread(()->{
phone.sendSMS();
},"t1").start();
new Thread(()->{
phone.sendSMS();
},"t2").start();
try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
System.out.println("******************ReentrantLock版本的可重入锁********************");
Thread t3 = new Thread(phone,"t3");
Thread t4 = new Thread(phone,"t4");
t3.start();
t4.start();
}
}
image.png
无论执行多少次,结果都是连续的,t1 和 t2之间不会插队,t3 和 t4之间不会插队。也就说明了重入锁的情况,在sendSMS()中调用sendEmail(),是无需等待锁资源的。get()中调用set()也一样,因为都是一把锁。
另外:加锁与解锁必须要成双成对才行,即使重复加锁也需要成双成对。
image.png
image.png
自旋锁
尝试获取锁的线程如果获取失败不会立即阻塞,而是采用循环的方式继续去尝试获取锁,这样的好处是减少线程上下文的切换,缺点是消耗CPU
/**
* @Desc 通过CAS操作完成自旋锁,A线程先进来调用myLock获取锁,持有锁3秒,
* B随后进来后发现AtomicReference中已被线程A占有,不是null,所以线程B只能自旋等待,直到A线程释放所资源B随后抢到
*/
public class MyRotateLock {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(thread.getName()+"进入myLock方法");
//如果获取失败,就一直循环去获取锁
while (!atomicReference.compareAndSet(null,thread)){
// System.out.println(thread.getName()+"尝试获取锁");
}
}
public void myUnLock(){
Thread thread = Thread.currentThread();
//解锁
atomicReference.compareAndSet(thread,null);
System.out.println(thread.getName()+"进入myUnLock方法,解锁成功");
}
public static void main(String[] args) {
MyRotateLock myRotateLock = new MyRotateLock();
new Thread(()->{
myRotateLock.myLock();
//持有锁3秒时间
try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
myRotateLock.myUnLock();
},"AA").start();
try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}
new Thread(()->{
myRotateLock.myLock();
try {TimeUnit.SECONDS.sleep(1);}catch (Exception e){}
myRotateLock.myUnLock();
},"BB").start();
}
}
读写锁
public class MyReadWriteLock1 {
private volatile Map<String,Object> map = new HashMap<>();
public void put(String key,Object value){
System.out.println(Thread.currentThread().getName()+"正在写入数据,写入的key是"+key);
//模拟写过程
try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入完成");
}
public void get(String key){
System.out.println(Thread.currentThread().getName()+"正在读取:"+key);
//模拟读过程
try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}
Object result = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取完成:"+result);
}
public static void main(String[] args) {
//模拟5个线程同时去写,五个线程同时去读
MyReadWriteLock1 myReadWriteLock1 = new MyReadWriteLock1();
for (int i = 1; i <=5 ; i++) {
final int tempInt = i;
new Thread(()->{
myReadWriteLock1.put(tempInt+"",tempInt+"");
},i+"").start();
}
for (int i = 1; i <=5 ; i++) {
final int tempInt = i;
new Thread(()->{
myReadWriteLock1.get(tempInt+"");
},i+"").start();
}
}
}
image.png
上面的代码无论怎么运行,都有错误,因为写的时候有其他线程去读,数据会错误,
多个线程去读一个资源是没问题的,但是如果有一个线程去写共享资源,不不能让其他线程来读或者写这个资源了,也就是写-独占,读-共享。写操作时必须保证写线程独占这个资源,并且写操作必须是原子的,不允许过程被打断。
读--读 可以同时进行 加读锁
读--写 不可以同时进行 加写锁
写--写 不可以同时进行 加写锁
public class MyReadWriteLock2 {
private volatile Map<String,Object> map = new HashMap<>();
//读写锁资源,默认非公平
private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
//写操作 加写锁
public void put(String key,Object value){
reentrantReadWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"正在写入数据,写入的key是"+key);
//模拟写过程
try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}
map.put(key,value);
System.out.println(Thread.currentThread().getName()+"写入完成");
}catch (Exception e){
}finally {
reentrantReadWriteLock.writeLock().unlock();
}
}
//读操作 加读锁
public void get(String key){
reentrantReadWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"正在读取:"+key);
//模拟读过程
try {TimeUnit.MICROSECONDS.sleep(300);}catch (Exception e){}
Object result = map.get(key);
System.out.println(Thread.currentThread().getName()+"读取完成:"+result);
}catch (Exception e){
}finally {
reentrantReadWriteLock.readLock().unlock();
}
}
public static void main(String[] args) {
//模拟5个线程同时去写,五个线程同时去读
MyReadWriteLock2 myReadWriteLock1 = new MyReadWriteLock2();
for (int i = 1; i <=5 ; i++) {
final int tempInt = i;
new Thread(()->{
myReadWriteLock1.put(tempInt+"",tempInt+"");
},i+"").start();
}
for (int i = 1; i <=5 ; i++) {
final int tempInt = i;
new Thread(()->{
myReadWriteLock1.get(tempInt+"");
},i+"").start();
}
}
}
CountDownLatch 计数器闭锁
让一些线程阻塞到另外一些线程完成操作后才被唤醒,CountDownLatch主要有两个方法 ,所有调用await()方法的线程会被阻塞,直到计数器为才被唤醒。调用countDown()方法的线程不会被阻塞,countDown()方法会让计数器减1。
这种现象只会出现一次,因为计数器不能被重置,如果业务上需要一个可以重置计数次数的版本,可以考虑使用CycliBarrier
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.concurrent.CountDownLatch;
public class Test2 {
public static void main(String[] args) throws Exception{
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"国灭亡");
countDownLatch.countDown();
},Country.list_Contry(i).getName()).start();
}
countDownLatch.await();
System.out.println("战国时代结束,秦统一天下");
}
}
@AllArgsConstructor
@Getter
enum Country{//使用枚举,枚举就相当于一个小型的简单数据库
ONE(1,"赵"),TWO(2,"韩"),ThREE(3,"楚"),FOUR(4,"燕"),FIVE(5,"齐"),SIX(6,"魏");
private Integer code;
private String name;
public static Country list_Contry(int index){
Country[] list = Country.values();
for (Country country : list) {
if(index==country.getCode()){
return country;
}
}
return null;
}
}
image.png
CyclicBarrier(字面意思:可循环使用的屏障)
CyclicBarrier是让一组线程到达的一个屏障点,直到最后一个线程到达屏障时,屏障才会开门,开门执行CyclicBarrier初始化的线程操作,这个操作完成之后,所有被拦截的线程才会被唤醒。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class Test3 {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙~~~");
try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
System.out.println("召唤完成~~~");
}); //数字达到7 就输出召唤神龙
for (int i = 1; i <= 7 ; i++) {//这里可以弄成14,cyclicBarrier 会阻塞两次,因为它可以重用
final int tempInt = i;
new Thread(()->{
System.out.println("收集到第"+tempInt+"龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("第"+tempInt+"龙珠继续运行");
},i+"").start();
}
}
}
image.png
Semaphore (可以理解为简单版的阻塞队列)
信号量主要用于两个目的,一个是控制并发线程的数量,一个是多个共享资源互斥使用。
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class Test4 {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//模拟三个停车位
// 模拟6部车
for (int i = 1; i <= 6 ; i++) {
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到了车位");
//占有车位三秒
try {TimeUnit.SECONDS.sleep(3);}catch (Exception e){}
System.out.println(Thread.currentThread().getName()+"离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},i+"").start();
}
}
}
image.png
阻塞队列 BlockingQueue
阻塞队列的特点:从空的阻塞队列中获取元素操作会被阻塞,同样,往满的阻塞队列中丢一个元素操作 也会被阻塞。
image.png
为什么需要阻塞队列:在生产者消费者模型中,一个比较难的重点是唤醒其他线程 这往往是很麻烦的,阻塞队列就不存在这个问题,唤醒线程的时机BlockingQueue给程序员包办了
可以用在一下三个方面,生产者消费者模式,线程池,消息队列。
从继承结构上来看阻塞队列BlockingQueue和我们CRUD工程师经常使用的ArrayLIst,LinkedList一样,属于同一个层级,只是大部分的业务比较简单,用不上BlockingQueue而已,BlockingQueue下有7个常用的子类,ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,DelayQueue,PriorityBlockingQueue,LinkedTransferQueue,LinkedBlockingDeque,前三个比较常用。
ArrayBlockingQueue 由数组组成的有界阻塞队列(界限在构造函数上手动指定)。
LinkedBlockingQueue 由链表组成的有界队列(默认长度是Integer.MAX_VALUE,这个值其实也就相当于无界了)
SynchronousQueue 只存储一个元素的有界队列(这个可以做一个自旋锁的版本不?)
PriorityBlockingQueue 支持优先级的无界阻塞队列
DelayQueue 使用优先级队列实现的延迟无界阻塞队列
LinkedTransferQueue 由链表组成的无界阻塞队列
LinkedBlockingDeque 由链表组成的双向队列
阻塞队列的四类常用方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 无 | 无 |
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Test1 {
public static void main(String[] args)throws Exception {
// test1();
// test2();
// test3();
test4();
}
// 测试 --- 抛异常
public static void test1(){
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("1"));
System.out.println(blockingQueue.add("2"));
System.out.println(blockingQueue.add("3"));
//比较暴力:前面已经放入了三个元素进去 这里再放一个就会抛异常java.lang.IllegalStateException: Queue full
// blockingQueue.add("4");
//返回队首第一个元素
System.out.println(blockingQueue.element());
System.out.println("*************");
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//同样的:前面已经弹出三个元素出来 这里再弹一个就会抛异常ava.util.NoSuchElementException
// System.out.println(blockingQueue.remove());
}
// 测试--- 特殊值
public static void test2(){
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("1"));
System.out.println(blockingQueue.offer("2"));
System.out.println(blockingQueue.offer("3"));
//这里比较温和一点:放不进去就不放了 返回false
System.out.println(blockingQueue.offer("4"));
//返回队首第一个元素
System.out.println(blockingQueue.peek());
System.out.println("*************");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//同样的:这里没元素返回了,返回null
System.out.println(blockingQueue.poll());
}
// 测试---阻塞
public static void test3() throws Exception{
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("1");
blockingQueue.put("2");
blockingQueue.put("3");
System.out.println("*************");
//这里放不进去就一直等待,一直阻塞,直到放进去为止
// blockingQueue.put("3");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//同样的:一直等待,一直阻塞,直到弹出元素为止
System.out.println(blockingQueue.take());
}
// 测试---超时阻塞
public static void test4() throws Exception{
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("1", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("2", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("3", 2, TimeUnit.SECONDS));
System.out.println("*************");
//2秒钟内数据都没放进去就返回false
System.out.println(blockingQueue.offer("4", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
//同样的:2秒钟内数据都没取出来就返回null
System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));
}
}
SynchronousQueue
每个put操作之后必须要等take完,才能进行下一次put
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName+"放入了元素1");
blockingQueue.put("1");
System.out.println(threadName+"放入了元素2");
blockingQueue.put("2");
System.out.println(threadName+"放入了元素3");
blockingQueue.put("3");
}catch (Exception e){
}
},"AAA").start();
new Thread(()->{
String threadName = Thread.currentThread().getName();
try {
TimeUnit.SECONDS.sleep(2);//模拟取数据的过程
System.out.println(threadName+"取出了元素"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(threadName+"取出了元素"+blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(threadName+"取出了元素"+blockingQueue.take());
}catch (Exception e){
}
},"BBB").start();
}
}
image.png
生产者消费者模型
image.png
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//线程操作资源类
class ShareData{
private volatile int num =0;//保证资源可见性
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment()throws Exception{
lock.lock();
try {
while (num != 0){//当num不为0 生产者阻塞 这里一定要用while,用if就死定了
condition.await();
}
num++;
System.out.println("生产者生产了一次,num为:"+num);
condition.signalAll();//唤醒其他线程去消费
}catch (Exception e){
}finally {
lock.unlock();
}
}
public void derement()throws Exception{
lock.lock();
try {
while (num == 0){//当num == 0 消费者阻塞 这里一定要用while,用if就死定了
condition.await();
}
num --;
System.out.println("消费者消费了一次,num为:"+num);
condition.signalAll();//唤醒其他线程去生产
}catch (Exception e){
}finally {
lock.unlock();
}
}
}
public class Test3 {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(()->{
try {
for (int i = 0; i <5 ; i++) {
shareData.increment();
}
}catch (Exception e){
}
},"AAA").start();
new Thread(()->{
try {
for (int i = 0; i <5 ; i++) {
shareData.derement();
}
}catch (Exception e){
}
},"BBB").start();
new Thread(()->{
try {
for (int i = 0; i <5 ; i++) {
shareData.increment();
}
}catch (Exception e){
}
},"CCC").start();
new Thread(()->{
try {
for (int i = 0; i <5 ; i++) {
shareData.derement();
}
}catch (Exception e){
}
},"DDD").start();
}
}
synchronized 和 Lock的区别
- 1
synchronized属于JVM层面的东西,是关键字。其底层实现是通过监视器(monitor,其实notify/wait也是通过监视器来的,只有同步块。同步方法才能调用wait/notify)对象来完成的。
Lock是具体类,是API层面的锁。 - 2 使用方法
synchronized不需要程序员去手动释放锁,系统会自动释放锁
ReentrantLock 是需要手动去释放锁的,没有主动释放就会造成死锁
3 是否可中断
synchronized不可中断,要么正常完成 退出方法,要么出现异常 异常退出,java -p 中monitor就是一个monitorenter对应两个monitorexit ,其中一个是正常退出,一个是异常退出。
ReentrantLock 是可以中断的,可以通过interrupt()方法中断或者调用的时候指定时间 tryLock(lone timeout,TimeUint unit)
4 公平性
synchronized 是非公平的锁
synchronized 可以通过构造函数指定是否公平
5 锁绑定多个条件的Condition
synchronized 没有这个功能
ReentrantLock 用来实现分组唤醒线程,可以精确唤醒,而不像notify()/notifyAll() 要么随机唤醒一个,要么全部唤醒。下面的代码就可以体现这个优势
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 有三个线程 实现A 打印3次A,B打印3次B,C打印3次C,交替打印,,,按照这样顺序执行10轮
*/
class Data{
private int num = 1; //A -1,B -2,C-3
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
private void print(String str,int count){
for (int i = 0; i <count ; i++) {
System.out.println(str);
}
}
public void Aprint(){
lock.lock();
try {
while (num !=1){
condition1.await();//不是A对应的1 那就在这里等着
}
print("A",3);
num =2;//让B可以去执行
condition2.signal();//唤醒B
}catch (Exception e){
}finally {
lock.unlock();
}
}
public void Bprint(){
lock.lock();
try {
while (num !=2){
condition2.await();
}
print("B",3);
num =3;
condition3.signal();
}catch (Exception e){
}finally {
lock.unlock();
}
}
public void Cprint(){
lock.lock();
try {
while (num !=3){
condition3.await();
}
print("C",3);
num =1;
condition1.signal();
}catch (Exception e){
}finally {
lock.unlock();
}
}
}
public class Test4 {
public static void main(String[] args) {
Data data = new Data();
new Thread(()->{
try {
for (int i = 0; i <10 ; i++) {
data.Aprint();
}
}catch (Exception e){
}
},"AAA").start();
new Thread(()->{
try {
for (int i = 0; i <10 ; i++) {
data.Bprint();
}
}catch (Exception e){
}
},"BBB").start();
new Thread(()->{
try {
for (int i = 0; i <10 ; i++) {
data.Cprint();
}
data.Cprint();
}catch (Exception e){
}
},"CCC").start();
}
}
volatile + atomicInteger +BlockingQueue 实现的高效生产者消费模型
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class MyResource{
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println("注入类型是"+blockingQueue.getClass().getName());
}
public void produce() throws Exception{
//不要在while里大量创建引用
String threadName = Thread.currentThread().getName();
String data = null;
boolean result = false;
while (FLAG){
data = atomicInteger.incrementAndGet()+"";
result = blockingQueue.offer(data,2,TimeUnit.SECONDS);
if(result){
System.out.println(threadName+"线程放入元素"+data+"成功");
}else{
System.out.println(threadName+"线程放入元素"+data+"失败");
}
}
System.out.println("开关关闭,生产动作结束");
}
public void consume() throws Exception{
//不要在while里大量创建引用
String threadName = Thread.currentThread().getName();
String result = null;
while (FLAG){
result = blockingQueue.poll(2,TimeUnit.SECONDS);
if(result==null ||"".equals(result)){
// FLAG = false;
System.out.println("消费者2秒都没取到数据,消费者退出");
}
System.out.println(threadName+"线程取到元素元素:"+result+"------------");
}
System.out.println("开关关闭,消费动作结束");
}
public void stop() {
this.FLAG = false;
}
}
public class Test5 {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<>(3));
new Thread(()->{
try {
myResource.produce();
}catch (Exception e){
}
},"AAA").start();
new Thread(()->{
try {
myResource.produce();
}catch (Exception e){
}
},"CCC").start();
new Thread(()->{
try {
myResource.consume();
}catch (Exception e){
}
},"BBB").start();
try {TimeUnit.SECONDS.sleep(2); }catch (Exception e){}
System.out.println("关闭开关--结束");
myResource.stop();
System.out.println(myResource.blockingQueue);
}
}
Callable与FutureTask
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
class MyTHreahd implements Callable<Integer>{
@Override
public Integer call() throws Exception{
TimeUnit.SECONDS.sleep(2);
System.out.println("进入call方法");
return 100;
}
}
public class Test1 {
public static void main(String[] args) throws Exception{
FutureTask<Integer> futureTask = new FutureTask<>(new MyTHreahd());
new Thread(futureTask).start();//这里使用了适配器思想FutureTask 间接的实现了Runnable接口, futureTask的run()方法调用了call()方法,所以后面能获取到结果
// int result2 = futureTask.get(); //get是获取计算结果,一旦调用那么当前线程就有可能阻塞,直到futureTask计算完,所以一般这句都是放到最后
int result1 = 50;
System.out.println("******************main******************");
while (!futureTask.isDone()){
// System.out.println("futureTask 正在计算");
}
int result2 = futureTask.get();
System.out.println("计算结果"+(result1+result2));
}
}
线程池
线程池的特点就是 线程复用,控制最大并发数,管理线程
- 降低资源消耗:通过利用已经创建的线程降低 创建线程,销毁线程的消耗
- 提高线程响应速度:任务到达是,线程池中的线程可以立即执行任务,不需要等待线程的创建时间
- 管理线程:线程是稀缺资源,使用线程池可以统一分配,调优,监控线程
典型的三种线程池(真实情况不这样创建线程池)
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Test2{
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newFixedThreadPool(5);//一池5线程
// ExecutorService threadPool = Executors.newSingleThreadExecutor();//一池1线程
ExecutorService threadPool = Executors.newCachedThreadPool();//一池N线程 线程数可以动态分配,如果任务多,执行时间长,就可能启动多个线程来执行任务
try {
for (int i = 0; i <10 ; i++) {
threadPool.execute(()->{
try{TimeUnit.SECONDS.sleep(1);}catch (Exception e){}
System.out.println(Thread.currentThread().getName()+":执行任务");
});
}
}catch (Exception e){
}finally {
threadPool.shutdown();
}
}
}
线程池核心构造方法
public ThreadPoolExecutor(int corePoolSize,//常驻核心线程数
int maximumPoolSize,//线程池能够容纳最大的线程数
long keepAliveTime,//当线程池线程数量超过corePoolSize时,空闲线程的空闲时间达到keepAliveTime时,多余的空闲时间会被销毁到剩下corePoolSize个线程为止
TimeUnit unit,//keepAliveTime的单位
BlockingQueue<Runnable> workQueue,//任务缓冲队列:已经提交但是还未被执行的任务
ThreadFactory threadFactory,//生成工作线程的线程工厂,一般使用默认即可
RejectedExecutionHandler handler) {//拒绝策略,当线程数达到最大线程数,并且任务队列已经满的时候时,线程池拒绝新的任务提交策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
//四种拒绝策略
ThreadPoolExecutor.AbortPolicy; //默认策略:直接抛出异常
ThreadPoolExecutor.CallerRunsPolicy//调用者运行:既不抛异常,也不丢任务,而是将任务返回给调用者
ThreadPoolExecutor.DiscardOldestPolicy//抛弃任务队列中等待最久的队列,然后把当前任务加入到任务队列
ThreadPoolExecutor.DiscardPolicy//直接抛弃任务
}
线程池底层工作原理
- 1 创建线程池后,等待提交过来的任务请求
- 2 当调用execute()方法添加一个请求任务时,线程池会做一下判断
- 2.1:如果正在运行的线程数小于corePoolSize 时,马上创建线程执行这个任务
- 2.2:如果正在运行的线程数大于等于corePoolSize 时,将这个任务放入队列
- 2.3:当队列满了,且运行线程数小于maximumPoolSize 数时,创建核心常驻线程
- 2.4:当队列已满,且运行线程数等于于maximumPoolSize 数时,启动拒绝策略
- 3 当线程完成任务时,会从任务队列中取任务执行
-
4 当线程无事可做时,并且不做事的时间超过keepAliveTime时,如果当前运行线程数大于corePoolSize 这个空闲线程会被停掉,直到线程数量小于等于corePoolSize
image.png
image.png
import java.util.concurrent.*;
public class Test2 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newFixedThreadPool(5);//一池5线程
// ExecutorService threadPool = Executors.newSingleThreadExecutor();//一池1线程
// ExecutorService threadPool = Executors.newCachedThreadPool();
// System.out.println(Runtime.getRuntime().availableProcessors()); 获取服务器核心数
//这里 不使用JDK提供的Executors创建线程池的原因是,其设置的LinkedBlockingQueue 长度是Integer.MAX_VALUE 21亿,这简直坑爹,会造成OOM
ExecutorService threadPool = new ThreadPoolExecutor(2,
5,
1,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
for (int i = 0; i <10 ; i++) {
System.out.println(i);
threadPool.execute(()->{
try {TimeUnit.SECONDS.sleep(1); }catch (Exception e){}
System.out.println(Thread.currentThread().getName()+"正在运行");
});
// executorService.submit() 也可以调用submit方法,这个方法返回一个Futher对象,通过Futher对象get方法获取执行结果
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {TimeUnit.SECONDS.sleep(3); }catch (Exception e){}
threadPool.shutdown();
}
}
}
线程数量的配置
CPU密集型任务:如果任务需要大量的计算,而CPU没有阻塞,一直全速运行,尽可能配置少的线程数量,,一般公式是 CPU核心数+1
IO密集型任务,像读DB呀,Redis呀,文件,都属于IO密集型。IO密集型会导致线程等待,浪费了CPU计算能力,所以尽可能多配置线程数 CPU核心数乘2,IO密集型时,大部分任务都阻塞时,参考公式:CPU核心数/(1-阻塞系数),比如8核心的服务器,8/(1-0.9) = 80个线程。
死锁及定位死锁代码
import java.util.concurrent.TimeUnit;
class DeadLock implements Runnable{
private String lockA;
private String LockB;
public DeadLock(String lockA, String lockB) {
this.lockA = lockA;
LockB = lockB;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"进入run方法");
synchronized (lockA){
try { TimeUnit.SECONDS.sleep(2);}catch (Exception e){e.printStackTrace();}
synchronized (LockB){
System.out.println("执行完成");
}
}
}
}
public class Test3 {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new DeadLock(lockA,lockB),"AAA").start();
new Thread(new DeadLock(lockB,lockA),"BBB").start();
}
}
网友评论