美文网首页
071-JAVA线程安全的同步锁方案【阻塞】

071-JAVA线程安全的同步锁方案【阻塞】

作者: XAbo | 来源:发表于2022-06-14 23:08 被阅读0次

    1.互斥锁

    使用互斥锁对象保证临界区内代码的原子性。

    • 同步代码块的锁:需要显式指定锁对象。
    • 同步方法的锁:默认锁对象是this;静态方法,锁的就是该Class
    • 临界区的代码执行完后不仅会释放锁,也会唤醒其他等待该锁的线程。
    • 当临界区中的代码发生异常后,则会自动释放锁。

    2 互斥锁的问题

    • 死锁:一个线程需要同时获得多把锁,容易发生死锁。例:哲学家就餐问题。
    • 活锁:两个线程互相改变对方的结束条件,导致都不能结束。
    • 饥饿:线程一直得不到CPU的调度(可能优先级过低),不能执行和结束。

    解决死锁的方式一:手动控制保证加的锁的顺序一致,破坏死锁的依赖性。但是这样也容易造成线程的饥饿现场。

    3 互斥锁的原理

    3.1 对象头

    1. Klass Word:存放该对象所属类型的地址。
    2. Mark Work:存放该对象的标记信息。
    java对象头 Mark Work

    3.2 管程

    Monitor结构

    3.2 重量级锁

    每个JAVA对象都可以关联一个Monitor对象(OS的对象),如果使用synchronized给对象加重量级锁后,该对象头的Mark Work就被设置指向Monitor对象的指针,同时标记为上锁状态(最后两位为10)。Mark Work原有的数据暂存在Monitor对象中。

    线程2占有锁对象后,线程2的指向MonitorOwner其他访问临界区的线程,进入BLOCKED队列。

    线程2的临界区代码执行完后,会从Monitor对象中将Mark Work的数据恢复(解锁)。如果临界区代码执行中发生异常,jvm会将锁解开和唤醒其他线程。

    恢复后,线程2会叫醒BLOCKED队列中的其他线程。

    重量级锁 自旋优化:jdk7后不能控制自旋次数

    3.4 轻量级锁

    重量级锁的消耗太大。jdk6之后使用轻量级降低资源消耗。其实现思想是:

    线程获取锁对象后,该锁对象先不与Monitor关联,即先不上锁(适用于资源经常不被共享)。先在本线程的栈帧中创建锁记录,并用cas替换真正锁对象的Mark Work数据。交换成功后,即加锁(伪锁,并没有与Monitor关联)成功。

    Mark Work数据置换

    本线程下次访问可以识别锁记录,即表明该资源没有被共享过,本线程可以继续使用。如果本线程又尝试给该锁上锁,发生锁重入。

    锁重入 解锁

    当另一个线程过来(进入同步代码块),进行同样的cas替换数据操作,发现锁状态为00,则第二个线程上锁失败后,会将原轻量级锁的Mark Work数据与Monitor进行关联(锁膨胀),第二个线程便去MonitorBLOCKED队列等待。

    锁膨胀:将原轻量级锁的Mark Work数据与Monitor进行关联

    3.5 偏向锁

    比轻量级更轻的锁。偏向锁发生锁重入的时候,还要进行二次的cas操作,也存在效率问题。

    偏向锁:只有第一次使用cas将线程ID设置到Mark Work,之后发现这个线程ID是自己的,表示没有发生竞争,不需要再次进行cas操作。只要不发生竞争这个对象就归本线程持有。

    撤销偏向锁:

    • 设置jvm参数可以禁用偏向锁。
    • 对象调用hashCode方法也会禁用对象的偏向锁(其他锁都有各自存放锁对象Mark Work原来数据的地方,而偏向锁没有)。
    • 其他线程如果进入临界区,会直接将上个线程记录的信息擦除掉。并加上轻量级锁。

    批量重定向:

    当共享资源被标记为偏向锁后,即这些资源几乎不用(包括原来的线程都不用),突然另一个线程过来使用,jvm会将锁对象的线程ID临时更新为第二个线程的ID,第二个线程执行完后,会将锁对象的线程ID变会第一个线程ID;当这样的操作超过20次后,jvm会将偏向锁的线程ID批量更新为第二个线程的ID。

    当批量更新的操作多起来(阈值40)后,偏向锁被废弃,升级为轻量级锁。

    3.6 锁消除

    jvm会对热点代码进行优化,会判断synchronized是否有必要,如果没有必要就不会真正执行synchronized操作。

    4 互斥锁的通信

    4.1 Sleep方式

    public class WaitTest {
        static final  Object room = new Object();
        static boolean hasCigarette = false;
        static boolean hasTakeout = false;
    
        public static void main(String[] args) throws InterruptedException {
    
            new Thread(()->{
                synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟吗? " + hasCigarette);
                    if(!hasCigarette){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":没烟,歇会。" );
                        try {
                           //问题1:烟提前送到,小南线程还要继续休息(这里使用interrupted可以打断)
                            Thread.sleep(2000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟吗? " + hasCigarette);
                    if(hasCigarette){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟,干活。");
                    }
              }
            },"小南").start();
           //问题2:小南线程休息期间,其他线程都必须等待。
            for (int i = 0; i < 5; i++) {
                new Thread(()->{
                    synchronized (room){
                  System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":干活。");
                    }
                },"打工人").start();
            }
            Thread.sleep(1000);
            new Thread(()->{
                // 问题3:这个线程不能使用synchronized,否则,烟也没法送到。
                // synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":烟来。");
                    hasCigarette = true;
               // }
            },"小女").start();
            
        }
    }
    
    sleep不会释放锁,导致其他线程只能等待

    4.2 wait和notify方式

    wait()和notify()原理
    • wait()和notify()必须由同一个锁对象调用。因为对应的锁对象可以通过notify()唤醒使用同一个锁对象的waitting的状态下的另一个线程。
    • wait()和notify()是属于Object类的方法。
    • wait()和notify()必须在同步代码块或者同步方法中使用(Lock方式除外)。因为必须要通过锁对象调用这两个方法(先获得锁)。
    package com.company.thread;
    
    
    public class WaitTest {
        static final  Object room = new Object();
        static boolean hasCigarette = false;
        static boolean hasTakeout = false;
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(()->{
                synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟吗? " + hasCigarette);
                    if(!hasCigarette){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":没烟,歇会。" );
                        try {
                            room.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟吗? " + hasCigarette);
                    if(hasCigarette){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟,干活。");
                    }
              }
            },"小南").start();
    
            for (int i = 0; i < 5; i++) {
                new Thread(()->{
                    synchronized (room){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":干活。");
                    }
                },"打工人").start();
            }
            Thread.sleep(1000);
    
            new Thread(()->{
                synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":烟来。");
                    hasCigarette = true;
                    room.notify();
                }
            },"小女").start();
            
        }
    }
    
    解决了线程等待的问题,但是notify存在虚假唤醒
    public class WaitTest {
        static final  Object room = new Object();
        static boolean hasCigarette = false;
        static boolean hasTakeout = false;
        public static void main(String[] args) throws InterruptedException {
            new Thread(()->{
                synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟吗? " + hasCigarette);
                    if(!hasCigarette){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":没烟,歇会。" );
                        try {
                            room.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟吗? " + hasCigarette);
                    if(hasCigarette){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟,干活。");
                    }else {
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":接着睡。");
                    }
              }
            },"小南").start();
    
            new Thread(()->{
                synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有饭吗? " + hasTakeout);
                    if(!hasTakeout){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":没饭,歇会。" );
                        try {
                            room.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有饭吗? " + hasTakeout);
                    if(hasTakeout){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有饭,干活。");
                    }
                }
            },"大南").start();
    
            Thread.sleep(1000);
    
            new Thread(()->{
                synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":饭来。");
                    hasTakeout = true;
                    room.notify();
                }
            },"小女").start();
            
        }
    }
    
    notify随机叫醒等待的一个线程,造成虚假唤醒

    4.3 wait和notifyAll方式

    package com.company.thread;
    
    
    public class WaitTest {
        static final  Object room = new Object();
        static boolean hasCigarette = false;
        static boolean hasTakeout = false;
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(()->{
                synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟吗? " + hasCigarette);
                    while(!hasCigarette){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":没烟,歇会。" );
                        try {
                            room.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟吗? " + hasCigarette);
                    if(hasCigarette){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有烟,干活。");
                    }else {
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":接着睡。");
                    }
              }
            },"小南").start();
    
            new Thread(()->{
                synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有饭吗? " + hasTakeout);
                    while(!hasTakeout){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":没饭,歇会。" );
                        try {
                            room.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有饭吗? " + hasTakeout);
                    if(hasTakeout){
                        System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":有饭,干活。");
                    }
                }
            },"大南").start();
            Thread.sleep(1000);
    
            new Thread(()->{
                synchronized (room){
                    System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+":饭来。");
                    hasTakeout = true;
                    room.notifyAll();
                }
            },"小女").start();
            
        }
    }
    
    小南线程虽然被叫醒了,但是while使得其又wait

    4.4 保护性暂停

    一个线程等待另一个线程的执行结果。
    1.同步模式:两个线程通过中间变量,进行通信。

    public class GuardedObject {
        private Object result;
    
        public Object get(long timeout){
            synchronized (this){
                //开始时间
                long begin = System.currentTimeMillis();
                //经历时间
                long ing = 0;
                while (result == null){
                    if(ing >= timeout){
                        break;
                    }
                    try {
                        this.wait(timeout-ing);//避免虚假唤醒,等待时间长变长!!!
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                   ing = System.currentTimeMillis()-begin;
                }
                return result;
            }
        }
        public  void complete(Object result){
            synchronized (this){
                this.result = result;
                this.notifyAll();
            }
        }
    
        public static void main(String[] args) {
          GuardedObject guardedObject = new GuardedObject();
          new Thread(()->{
              System.out.println(System.currentTimeMillis()+"我在等待…… ");
              String s = (String)guardedObject.get(3000);
              System.out.println(System.currentTimeMillis()+"结果:"+ s);
    
          },"t1").start();
    
            new Thread(()->{
                try {
                    Thread.sleep(5000);
                    guardedObject.complete("ok");
                    System.out.println(System.currentTimeMillis()+"ok");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },"t2").start();
        }
    }
    
    超时等待
    2. 同步模式:两个线程通过中间对象(一对一),进行通信。 中间对象接偶
    package com.company.thread;
    
    import java.util.*;
    
    public class GuardedObject {
        private int id;
        public GuardedObject(int id) {
            this.id = id;
        }
        public int getId(){
            return id;
        }
        private Object result;
        public Object get(long timeout){
            synchronized (this){
                //开始时间
                long begin = System.currentTimeMillis();
                //经历时间
                long ing = 0;
                while (result == null){
                    if(ing >= timeout){
                        break;
                    }
                    try {
                        this.wait(timeout-ing);//避免虚假唤醒,等待时间长不对!!!
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                   ing = System.currentTimeMillis()-begin;
                }
                return result;
            }
        }
        public  void complete(Object result){
            synchronized (this){
                this.result = result;
                this.notifyAll();
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 3; i++) {
                new People().start();
            }
    
            for (Integer id:Mailboexs.getIds()) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                new Postman(id, ",:" + id).start();
            }
    
        }
    }
    
    class  People extends Thread{
        @Override
        public void run() {
            GuardedObject guardedObject = Mailboexs.createGuardedObject();
            System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+"等待收信ID:"+guardedObject.getId());
            Object mail = guardedObject.get(5000);
            System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+"收到信ID:"+guardedObject.getId()+"内容"+mail);
        }
    }
    
    class  Postman extends Thread{
        private int id;
        private String mail;
        public  Postman(int id,String mail){
         this.id = id;
         this.mail  = mail;
        }
    
        @Override
        public void run() {
            GuardedObject guardedObject = Mailboexs.getGuardedObject(id);
            guardedObject.complete(mail);
            System.out.println(System.currentTimeMillis()+Thread.currentThread().getName()+"送信ID:"+guardedObject.getId()+"内容"+mail);
        }
    }
    
    class Mailboexs{
        private  static  Map<Integer,GuardedObject> boxes = new HashMap<>();
        private  static int id = 1;
        private  static synchronized  int generateId(){
          return  id++;
        }
        public static GuardedObject createGuardedObject(){
            GuardedObject  go = new GuardedObject(generateId());
            boxes.put(go.getId(),go);
            return go;
        }
        public static GuardedObject getGuardedObject(int id){
            return  boxes.remove(id);
        }
        public static Set<Integer> getIds(){
            return boxes.keySet();
        }
    }
    

    3. 异步模式:生产者消费者模式,进行通信。

    生产者消费者模式
    package com.company.thread;
    
    import java.util.LinkedList;
    
    public class GuardedQueue {
        public static void main(String[] args) {
            MessageQueue queue = new MessageQueue(2);
            for (int i = 0; i <  3; i++) {
                int id = I;
                new Thread(() -> {
                    queue.put(new Message(id,"值"+id));
                },"生产者"+i).start();
            }
    
            new Thread(() -> {
                    while (true){
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Message message = queue.take();
                    }
            },"消费者").start();
        }
    }
    //消息队列,线程之间通信使用
    class MessageQueue{
        private LinkedList<Message> linkedList = new LinkedList<>();
        private  int capcity;
        public  MessageQueue(int capcity){
            this.capcity = capcity;
        }
        public   Message take(){
          synchronized (linkedList){
              while (linkedList.isEmpty()){
                  try {
                      System.out.println(Thread.currentThread().getName()+"队列为空,消费者线程等待");
                      linkedList.wait();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
              Message message = linkedList.removeFirst();
              System.out.println(Thread.currentThread().getName()+"已经消费"+message);
              linkedList.notifyAll();
              return message;
          }
    
        }
        public void  put(Message message){
            synchronized (linkedList){
                while (linkedList.size() == capcity){
                    try {
                        System.out.println(Thread.currentThread().getName()+"队列已满,生产者线程等待");
                        linkedList.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                linkedList.addLast(message);
                System.out.println(Thread.currentThread().getName()+"已生产消息"+message);
                linkedList.notifyAll();
            }
        }
    }
    //消息
    final  class Message{
        private int id;
        private Object value;
    
        public Message(int id, Object value) {
            this.id = id;
            this.value = value;
        }
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public Object getValue() {
            return value;
        }
    
        public void setValue(Object value) {
            this.value = value;
        }
    
        @Override
        public String toString() {
            return "Message{" +
                    "id=" + id +
                    ", value=" + value +
                    '}';
        }
    }
    
    结果

    相关文章

      网友评论

          本文标题:071-JAVA线程安全的同步锁方案【阻塞】

          本文链接:https://www.haomeiwen.com/subject/lostvrtx.html