美文网首页
《实战高并发程序设计》读书笔记-多线程团队协作:同步控制

《实战高并发程序设计》读书笔记-多线程团队协作:同步控制

作者: 乙腾 | 来源:发表于2021-05-08 00:36 被阅读0次

    synchronized的功能扩展:重入锁

    重入锁可以完全替代synchronized关键字。在JDK 5.0的早期版本中,重入锁的性能远远好于synchronized,但从JDK 6.0开始,JDK在synchronized上做了大量的优化,使得两者的性能差距并不大。
    重入锁使用java.util.concurrent.locks.ReentrantLock类来实现。
    重入锁使用案例

    01 public class ReenterLock implements Runnable{
    02     public static ReentrantLock lock=new ReentrantLock();
    03     public static int i=0;
    04     @Override
    05     public void run() {
    06         for(int j=0;j<10000000;j++){
    07             lock.lock();
    08             try{
    09                 i++;
    10             }finally{
    11                 lock.unlock();
    12             }
    13         }
    14     }
    15     public static void main(String[] args) throws InterruptedException {
    16         ReenterLock tl=new ReenterLock();
    17         Thread t1=new Thread(tl);
    18         Thread t2=new Thread(tl);
    19         t1.start();t2.start();
    20         t1.join();t2.join();
    21         System.out.println(i);
    22     }
    23 }
    

    7~12行,使用重入锁保护临界区资源i,确保多线程对i操作的安全性。重入锁与synchronized相比,重入锁有着显示的操作过程。开发人员必须手动指定何时加锁,何时释放锁。也正因为这样,重入锁对逻辑控制的灵活性要远远好于synchronized。但值得注意的是,在退出临界区时,必须记得释放锁(代码第11行),否则,其他线程就没有机会再访问临界区了。

    重点

    重入锁的特点

    获取锁的线程,可以重复获取该锁,称之为重入锁。
    比如上面代码改成如下更能体现出来重入锁的特点

    lock.lock();
    lock.lock();
    try{
      i++;
    }finally{
        lock.unlock();
        lock.unlock();
    }
    

    在这种情况下,一个线程连续两次获得同一把锁。这是允许的!如果不允许这么操作,那么同一个线程在第2次获得锁时,将会和自己产生死锁。程序就会“卡死”在第2次申请锁的过程中。但需要注意的是,如果同一个线程多次获得锁,那么在释放锁的时候,也必须释放相同次数。如果释放锁的次数多,那么会得到一个java.lang.IllegalMonitorStateException异常,反之,如果释放锁的次数少了,那么相当于线程还持有这个锁,因此,其他线程也无法进入临界区。、

    中断响应

    对于synchronized来说,如果一个线程在等待锁,那么结果只有两种情况,要么它获得这把锁继续执行,要么它就保持等待。而使用重入锁,则提供另外一种可能,那就是线程可以被中断。
    中断响应案例

    01 public class IntLock implements Runnable {
    02     public static ReentrantLock lock1 = new ReentrantLock();
    03     public static ReentrantLock lock2 = new ReentrantLock();
    04     int lock;
    05     /**
    06      * 控制加锁顺序,方便构造死锁
    07      * @param lock
    08      */
    09     public IntLock(int lock) {
    10         this.lock = lock;
    11     }
    12
    13     @Override
    14     public void run() {
    15         try {
    16             if (lock == 1) {
    17                 lock1.lockInterruptibly();
    18                 try{
    19                     Thread.sleep(500);
    20                 }catch(InterruptedException e){}
    21                 lock2.lockInterruptibly();
    22             } else {
    23                 lock2.lockInterruptibly();
    24                 try{
    25                     Thread.sleep(500);
    26                 }catch(InterruptedException e){}
    27                 lock1.lockInterruptibly();
    28             }
    29
    30         } catch (InterruptedException e) {
    31             e.printStackTrace();
    32         } finally {
    33             if (lock1.isHeldByCurrentThread())
    34                 lock1.unlock();
    35             if (lock2.isHeldByCurrentThread())
    36                 lock2.unlock();
    37             System.out.println(Thread.currentThread().getId()+":线程退出");
    38         }
    39     }
    40
    41     public static void main(String[] args) throws InterruptedException {
    42         IntLock r1 = new IntLock(1);
    43         IntLock r2 = new IntLock(2);
    44         Thread t1 = new Thread(r1);
    45         Thread t2 = new Thread(r2);
    46         t1.start();t2.start();
    47         Thread.sleep(1000);
    48         //中断其中一个线程
    49         t2.interrupt();
    50     }
    51 }
    

    线程t1和t2启动后,t1先占用lock1,再占用lock2;t2先占用lock2,再请求lock1。因此,很容易形成t1和t2之间的相互等待。在这里,对锁的请求,统一使用lockInterruptibly()方法。这是一个可以对中断进行响应的锁申请动作,即在等待锁的过程中,可以响应中断。
    在代码第47行,主线程main处于休眠,此时,这两个线程处于死锁的状态,在代码第49行,由于t2线程被中断,故t2会放弃对lock1的申请,同时释放已获得lock2。这个操作导致t1线程可以顺利得到lock2而继续执行下去。
    另外上面通过加锁顺序,构造死锁的思路也值得借鉴。
    执行上述代码,将输出:

    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.
    doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.
    acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly
    (ReentrantLock.java:335)
        at geym.conc.ch3.synctrl.IntLock.run(IntLock.java:31)
        at java.lang.Thread.run(Thread.java:745)
    9:线程退出
    8:线程退出
    

    可以看到,中断后,两个线程双双退出。但真正完成工作的只有t1。而t2线程则放弃其任务直接退出,释放资源。

    重点

    lock()与lockInterruptibly()的区别

    ReentrantLock.lockInterruptibly允许在等待时由其它线程调用等待线程的Thread.interrupt方法来中断等待线程的等待而直接返回,这时不用获取锁,而会抛出一个InterruptedException
    ReentrantLock.lock方法不允许Thread.interrupt中断,即使检测到Thread.isInterrupted,一样会继续尝试获取锁,失败则继续休眠。只是在最后获取锁成功后再把当前线程置为interrupted状态,然后再中断线程。
    也就是只有通过lockInterruptibly()获取锁,才可以通过线程中断。

    interrupt()不提倡

    这种中断线程的方式并不提倡,后面会解释。

    锁申请等待限时

    除了等待外部通知之外,要避免死锁还有另外一种方法,那就是限时等待。
    限时等待案例

    01 public class TimeLock implements Runnable{
    02     public static ReentrantLock lock=new ReentrantLock();
    03     @Override
    04     public void run() {
    05         try {
    06             if(lock.tryLock(5, TimeUnit.SECONDS)){
    07                 Thread.sleep(6000);
    08             }else{
    09                 System.out.println("get lock failed");
    10             }
    11         } catch (InterruptedException e) {
    12             e.printStackTrace();
    13         }finally{if(lock.isHeldByCurrentThread()) lock.unlock();}
    14     }
    15     public static void main(String[] args) {
    16         TimeLock tl=new TimeLock();
    17         Thread t1=new Thread(tl);
    18         Thread t2=new Thread(tl);
    19         t1.start();
    20         t2.start();
    21     }
    22 }
    

    在这里,tryLock()方法接收两个参数,一个表示等待时长,另外一个表示计时单位。这里的单位设置为秒,时长为5,表示线程在这个锁请求中,最多等待5秒。如果超过5秒还没有得到锁,就会返回false。如果成功获得锁,则返回true。
    在本例中,由于占用锁的线程会持有锁长达6秒,故另一个线程无法在5秒的等待时间内获得锁,因此,请求锁会失败。
    ReentrantLock.tryLock()方法也可以不带参数直接运行。在这种情况下,当前线程会尝试获得锁,如果锁并未被其他线程占用,则申请锁会成功,并立即返回true。如果锁被其他线程占用,则当前线程不会进行等待,而是立即返回false。这种模式不会引起线程等待,因此也不会产生死锁。下面演示了这种使用方式:

    01 public class TryLock implements Runnable {
    02     public static ReentrantLock lock1 = new ReentrantLock();
    03     public static ReentrantLock lock2 = new ReentrantLock();
    04     int lock;
    05
    06     public TryLock(int lock) {
    07         this.lock = lock;
    08     }
    09
    10     @Override
    11     public void run() {
    12         if (lock == 1) {
    13             while (true) {
    14                 if (lock1.tryLock()) {
    15                     try {
    16                         try {
    17                             Thread.sleep(500);
    18                         } catch (InterruptedException e) {
    19                         }
    20                         if (lock2.tryLock()) {
    21                             try {
    22                                 System.out.println(Thread.currentThread()
    23                                         .getId() + ":My Job done");
    24                                 return;
    25                             } finally {
    26                                 lock2.unlock();
    27                             }
    28                         }
    29                     } finally {
    30                         lock1.unlock();
    31                     }
    32                 }
    33             }
    34         } else {
    35             while (true) {
    36                 if (lock2.tryLock()) {
    37                     try {
    38                         try {
    39                             Thread.sleep(500);
    40                         } catch (InterruptedException e) {
    41                         }
    42                         if (lock1.tryLock()) {
    43                             try {
    44                                 System.out.println(Thread.currentThread()
    45                                         .getId() + ":My Job done");
    46                                 return;
    47                             } finally {
    48                                 lock1.unlock();
    49                             }
    50                         }
    51                     } finally {
    52                         lock2.unlock();
    53                     }
    54                 }
    55             }
    56         }
    57     }
    58
    59     public static void main(String[] args) throws InterruptedException {
    60         TryLock r1 = new TryLock(1);
    61         TryLock r2 = new TryLock(2);
    62         Thread t1 = new Thread(r1);
    63         Thread t2 = new Thread(r2);
    64         t1.start();
    65         t2.start();
    66     }
    67 }
    

    输出

    9:My Job done
    8:My Job done
    

    上述代码中,采用了非常容易死锁的加锁顺序。也就是先让t1获得lock1,再让t2获得lock2,接着做反向请求,让t1申请lock2,t2申请lock1。在一般情况下,这会导致t1和t2相互等待,从而引起死锁。
    但是使用tryLock()后,这种情况就大大改善了。由于线程不会傻傻地等待,而是不停地尝试,因此,只要执行足够长的时间,线程总是会得到所有需要的资源,从而正常执行(这里以线程同时获得lock1和lock2两把锁,作为其可以正常执行的条件)。在同时获得lock1和lock2后,线程就打印出标志着任务完成的信息“My Job done”。
    上面代码获取到锁的时机是:一旦有一个线程释放所有资源,此时刚结束sleep状态的线程就可以获取到锁,即同时获取两把锁,结束线程从而释放所有锁,此时另一个线程因为没有了竞争,也可以获取锁,从而结束线程。

    公平锁

    在大多数情况下,锁的申请都是非公平的。非公平锁可能会引发线程饥饿现象,即线程随机获取锁,但是有的线程总也获取不到锁。
    公平锁的一大特点是:它不会产生饥饿现象。只要你排队,最终还是可以等到资源的。如果我们使用synchronized关键字进行锁控制,那么产生的锁就是非公平的。而重入锁允许我们对其公平性进行设置。它有一个如下的构造函数:

    public ReentrantLock(boolean fair)
    

    当参数fair为true时,表示锁是公平的。公平锁看起来很优美,但是要实现公平锁必然要求系统维护一个有序队列,因此公平锁的实现成本比较高,性能相对也非常低下,因此,默认情况下,锁是非公平的。如果没有特别的需求,也不需要使用公平锁。公平锁和非公平锁在线程调度表现上也是非常不一样的。下面的代码可以很好地突出公平锁的特点:

    01 public class FairLock implements Runnable {
    02     public static ReentrantLock fairLock = new ReentrantLock(true);
    03
    04     @Override
    05     public void run() {
    06         while(true){
    07         try{
    08             fairLock.lock();
    09             System.out.println(Thread.currentThread().getName()+" 获得锁");
    10         }finally{
    11             fairLock.unlock();
    12         }
    13         }
    14     }
    15
    16     public static void main(String[] args) throws InterruptedException {
    17         FairLock r1 = new FairLock();
    18         Thread t1=new Thread(r1,"Thread_t1");
    19         Thread t2=new Thread(r1,"Thread_t2");
    20         t1.start();t2.start();
    21     }
    22 }
    

    上述代码第2行,指定锁是公平的。接着,由两个线程t1和t2分别请求这把锁,并且在得到锁后,进行一个控制台的输出,表示自己得到了锁。在公平锁的情况下,得到输出通常如下所示:

    Thread_t1 获得锁
    Thread_t2 获得锁
    Thread_t1 获得锁
    Thread_t2 获得锁
    Thread_t1 获得锁
    Thread_t2 获得锁
    Thread_t1 获得锁
    Thread_t2 获得锁
    Thread_t1 获得锁
    

    公平锁中内部维护了一个队列,所有申请锁的线程按照申请顺序进入队列,按照入队列的顺序获取锁。
    如果不使用公平锁,那么情况会完全不一样,下面是使用非公平锁时的部分输出:

    前面还有一大段t1连续获得锁的输出
    Thread_t1 获得锁
    Thread_t1 获得锁
    Thread_t1 获得锁
    Thread_t1 获得锁
    Thread_t2 获得锁
    Thread_t2 获得锁
    Thread_t2 获得锁
    Thread_t2 获得锁
    Thread_t2 获得锁
    后面还有一大段t2连续获得锁的输出
    

    可以看到,根据系统的调度,一个线程会倾向于再次获取已经持有的锁,这种分配方式是高效的,但是无公平性可言。

    ReentrantLock重要api整理

    lock():获得锁,如果锁已经被占用,则等待。
    lockInterruptibly():获得锁,但优先响应中断。
    tryLock():尝试获得锁,如果成功,返回true,失败返回false。该方法不等待,立即返回。
    tryLock(long time, TimeUnit unit):在给定时间内尝试获得锁。
    unlock():释放锁。

    在重入锁的实现中,主要包含三个要素:

    第一,是原子状态。原子状态使用CAS操作来存储当前锁的状态,判断锁是否已经被别的线程持有。
    第二,是等待队列。所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统就能从等待队列中唤醒一个线程,继续工作。
    第三,是阻塞原语park()和unpark(),用来挂起和恢复线程。没有得到锁的线程将会被挂起。

    重入锁的好搭档:Condition条件

    Condtion是与重入锁相关联的。通过Lock接口(重入锁就实现了这一接口)的Condition newCondition()方法可以生成一个与当前重入锁绑定的Condition实例。利用Condition对象,我们就可以让线程在合适的时间等待,或者在某一个特定的时刻得到通知,继续执行。

    Condition接口提供的基本方法

    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
    
    • await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()或者signalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。
    • awaitUninterruptibly()方法与await()方法基本相同,但是它并不会在等待过程中响应中断。
    • singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obejct.notify()方法很类似。
      Condition案例
    01 public class ReenterLockCondition implements Runnable{
    02     public static ReentrantLock lock=new ReentrantLock();
    03     public static Condition condition = lock.newCondition();
    04     @Override
    05     public void run() {
    06         try {
    07             lock.lock();
    08             condition.await();
    09             System.out.println("Thread is going on");
    10         } catch (InterruptedException e) {
    11             e.printStackTrace();
    12         }finally{
    13             lock.unlock();
    14         }
    15     }
    16     public static void main(String[] args) throws InterruptedException {
    17         ReenterLockCondition tl=new ReenterLockCondition();
    18         Thread t1=new Thread(tl);
    19         t1.start();
    20         Thread.sleep(2000);
    21         //通知线程t1继续执行
    22         lock.lock();
    23         condition.signal();
    24         lock.unlock();
    25     }
    26 }
    

    允许多个线程同时访问:信号量

    号量为多线程协作提供了更为强大的控制方法。广义上说,信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却可以指定多个线程,同时访问某一个资源。信号量主要提供了以下构造函数:

    public Semaphore(int permits)
    public Semaphore(int permits, boolean fair)    //第二个参数可以指定是否公平
    

    在构造信号量对象时,必须要指定信号量的准入数,即同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。

    信号量的主要逻辑方法有:

    public void acquire()
    public void acquireUninterruptibly()
    public boolean tryAcquire()
    public boolean tryAcquire(long timeout, TimeUnit unit)
    public void release()
    

    acquire()方法尝试获得一个准入的许可。若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断。acquireUninterruptibly()方法和acquire()方法类似,但是不响应中断。tryAcquire()尝试获得一个许可,如果成功返回true,失败则返回false,它不会进行等待,立即返回。release()用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
    案例

    01 public class SemapDemo implements Runnable{
    02     final Semaphore semp = new Semaphore(5);
    03     @Override
    04     public void run() {
    05         try {
    06             semp.acquire();
    07             //模拟耗时操作
    08             Thread.sleep(2000);
    09             System.out.println(Thread.currentThread().getId()+":done!");
    10             semp.release();
    11         } catch (InterruptedException e) {
    12             e.printStackTrace();
    13         }
    14     }
    15
    16     public static void main(String[] args) {
    17         ExecutorService exec = Executors.newFixedThreadPool(20);
    18         final SemapDemo demo=new SemapDemo();
    19         for(int i=0;i<20;i++){
    20             exec.submit(demo);
    21         }
    22     }
    23 }
    

    上述代码中,第7~9行为临界区管理代码,程序会限制执行这段代码的线程数。这里在第2行,申明了一个包含5个许可的信号量。这就意味着同时可以有5个线程进入代码段第7~9行。申请信号量使用acquire()操作,在离开时,务必使用release()释放信号量(代码第10行)。这就和释放锁是一个道理。如果不幸发生了信号量的泄露(申请了但没有释放),那么可以进入临界区的线程数量就会越来越少,直到所有的线程均不可访问。在本例中,同时开启20个线程。观察这段程序的输出,你就会发现系统以5个线程一组为单位,依次输出带有线程ID的提示文本。
    Semaphore 经常用来做限流。

    ReadWriteLock读写锁

    ReadWriteLock是JDK5中提供的读写分离锁。读写分离锁可以有效地帮助减少锁竞争,以提升系统性能。用锁分离的机制来提升性能非常容易理解。

    读写锁特性

    • 读-读不互斥:读读之间不阻塞。
    • 读-写互斥:读阻塞写,写也会阻塞读。
    • 写-写互斥:写写阻塞。
      案例
      如果在系统中,读操作次数远远大于写操作,则读写锁就可以发挥最大的功效,提升系统的性能。这里我给出一个稍微夸张点的案例,来说明读写锁对性能的帮助。
    01 public class ReadWriteLockDemo {
    02     private static Lock lock=new ReentrantLock();
    03     private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    04     private static Lock readLock = readWriteLock.readLock();
    05     private static Lock writeLock = readWriteLock.writeLock();
    06     private int value;
    07
    08     public Object handleRead(Lock lock) throws InterruptedException{
    09         try{
    10             lock.lock();                 //模拟读操作
    11             Thread.sleep(1000);          //读操作的耗时越多,读写锁的优势就越明显
    12             return value;
    13         }finally{
    14         lock.unlock();
    15         }
    16     }
    17
    18     public void handleWrite(Lock lock,int index) throws InterruptedException{
    19         try{
    20             lock.lock();                //模拟写操作
    21             Thread.sleep(1000);
    22             value=index;
    23         }finally{
    24         lock.unlock();
    25         }
    26     }
    27
    28     public static void main(String[] args) {
    29         final ReadWriteLockDemo demo=new ReadWriteLockDemo();
    30         Runnable readRunnale=new Runnable() {
    31             @Override
    32             public void run() {
    33                 try {
    34                     demo.handleRead(readLock);
    35 //                    demo.handleRead(lock);
    36                 } catch (InterruptedException e) {
    37                     e.printStackTrace();
    38                 }
    39             }
    40         };
    41         Runnable writeRunnale=new Runnable() {
    42             @Override
    43             public void run() {
    44                 try {
    45                     demo.handleWrite(writeLock,new Random().nextInt());
    46 //                    demo.handleWrite(lock,new Random().nextInt());
    47                 } catch (InterruptedException e) {
    48                     e.printStackTrace();
    49                 }
    50             }
    51         };
    52
    53         for(int i=0;i<18;i++){
    54             new Thread(readRunnale).start();
    55         }
    56
    57         for(int i=18;i<20;i++){
    58             new Thread(writeRunnale).start();
    59         }
    60     }
    61 }
    

    上述代码中,第11行和第21行分别模拟了一个非常耗时的操作,让线程耗时1秒钟。它们分别对应读耗时和写耗时。代码第34和45行,分别是读线程和写线程。在这里,第34行使用读锁,第35行使用写锁。第53~55行开启18个读线程,第57~59行,开启两个写线程。由于这里使用了读写分离,因此,<font color=red>读线程完全并行,而写会阻塞读,因此,实际上这段代码运行大约2秒多就能结束(写线程之间是实际串行的)。而如果使用第35行代替第34行,使用第46行代替第45行执行上述代码,即,使用普通的重入锁代替读写锁。那么所有的读和写线程之间都必须相互等待,因此整个程序的执行时间将长达20余秒。</font>

    倒计时器:CountDownLatch

    CountDownLatch是一个非常实用的多线程控制工具类。
    CountDownLatch的构造函数接收一个整数作为参数,即当前这个计数器的计数个数。

    public CountDownLatch(int count)
    

    案例

    01 public class CountDownLatchDemo implements Runnable {
    02     static final CountDownLatch end = new CountDownLatch(10);
    03     static final CountDownLatchDemo demo=new CountDownLatchDemo();
    04     @Override
    05     public void run() {
    06         try {
    07             //模拟检查任务
    08             Thread.sleep(new Random().nextInt(10)*1000);
    09             System.out.println("check complete");
    10             end.countDown();
    11         } catch (InterruptedException e) {
    12             e.printStackTrace();
    13         }
    14     }
    15     public static void main(String[] args) throws InterruptedException {
    16         ExecutorService exec = Executors.newFixedThreadPool(10);
    17         for(int i=0;i<10;i++){
    18             exec.submit(demo);
    19         }
    20         //等待检查
    21         end.await();
    22         //发射火箭
    23         System.out.println("Fire!");
    24         exec.shutdown();
    25     }
    26 }
    

    述代码第2行,生成一个CountDownLatch实例。计数数量为10。这表示需要有10个线程完成任务,等待在CountDownLatch上的线程才能继续执行。代码第10行,使用了CountDownLatch.countdown()方法,也就是通知CountDownLatch,一个线程已经完成了任务,倒计时器可以减1啦。第21行,使用CountDownLatch.await()方法,要求主线程等待所有10个检查任务全部完成。待10个任务全部完成后,主线程才能继续执行。
    主线程在CountDownLatch上等待,当所有检查任务全部完成后,主线程方能继续执行。

    循环栅栏:CyclicBarrier

    和CountDownLatch非常类似,它也可以实现线程间的计数等待,但它的功能比CountDownLatch更加复杂且强大。
    CyclicBarrier和CountDownLatch的区别在于:
    CyclicBarrier可以循环使用,而CountDownLatch只能使用一次。
    比CountDownLatch略微强大一些,CyclicBarrier可以接收一个参数作为barrierAction。所谓barrierAction就是当计数器一次计数完成后,系统会执行的动作。如下构造函数,其中,parties表示计数总数,也就是参与的线程总数。

    public CyclicBarrier(int parties, Runnable barrierAction)
    

    案例

    01 public class CyclicBarrierDemo {
    02     public static class Soldier implements Runnable {
    03         private String soldier;
    04         private final CyclicBarrier cyclic;
    05
    06         Soldier(CyclicBarrier cyclic, String soldierName) {
    07             this.cyclic = cyclic;
    08             this.soldier = soldierName;
    09         }
    10
    11         public void run() {
    12             try {
    13                 //等待所有士兵到齐
    14                 cyclic.await();
    15                 doWork();
    16                 //等待所有士兵完成工作
    17                 cyclic.await();
    18             } catch (InterruptedException e) {
    19                 e.printStackTrace();
    20             } catch (BrokenBarrierException e) {
    21                 e.printStackTrace();
    22             }
    23         }
    24
    25         void doWork() {
    26             try {
    27                 Thread.sleep(Math.abs(new Random().nextInt()%10000));
    28             } catch (InterruptedException e) {
    29                 e.printStackTrace();
    30             }
    31             System.out.println(soldier + ":任务完成");
    32         }
    33     }
    34
    35     public static class BarrierRun implements Runnable {
    36         boolean flag;
    37         int N;
    38         public BarrierRun(boolean flag, int N) {
    39             this.flag = flag;
    40             this.N = N;
    41         }
    42
    43         public void run() {
    44             if (flag) {
    45                 System.out.println("司令:[士兵" + N + "个,任务完成!]");
    46             } else {
    47                 System.out.println("司令:[士兵" + N + "个,集合完毕!]");
    48                 flag = true;
    49             }
    50         }
    51     }
    52
    53     public static void main(String args[]) throws InterruptedException {
    54         final int N = 10;
    55         Thread[] allSoldier=new Thread[N];
    56         boolean flag = false;
    57         CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
    58         //设置屏障点,主要是为了执行这个方法
    59         System.out.println("集合队伍!");
    60         for (int i = 0; i < N; ++i) {
    61             System.out.println("士兵 "+i+" 报道!");
    62             allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i));
    63             allSoldier[i].start();
    64         }
    65     }
    66 }
    

    输出

    集合队伍!
    士兵 0 报道!
    //篇幅有限,省略其他几个士兵
    士兵 9 报道!
    司令:[士兵10个,集合完毕!]
    士兵 0:任务完成
    //篇幅有限,省略其他几个士兵
    士兵 4:任务完成
    司令:[士兵10个,任务完成!]
    

    通过上面案例可知,CyclicBarrier可以循环使用。

    程阻塞工具类:LockSupport

    LockSupport的静态方法park()可以阻塞当前线程,类似的还有parkNanos()、parkUntil()等方法。它们实现了一个限时的等待。

    01 public class LockSupportDemo {
    02     public static Object u = new Object();
    03     static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    04     static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    05
    06     public static class ChangeObjectThread extends Thread {
    07         public ChangeObjectThread(String name){
    08             super.setName(name);
    09         }
    10         @Override
    11         public void run() {
    12             synchronized (u) {
    13                 System.out.println("in "+getName());
    14                 LockSupport.park();
    15             }
    16         }
    17     }
    18
    19     public static void main(String[] args) throws InterruptedException {
    20         t1.start();
    21         Thread.sleep(100);
    22         t2.start();
    23         LockSupport.unpark(t1);
    24         LockSupport.unpark(t2);
    25         t1.join();
    26         t2.join();
    27     }
    28 }
    

    按照以前的思想,上面代码23行unpark不一定发生在14行park后,也就有几率导致线程永久挂起。但是执行这段代码,你会发现,它自始至终都可以正常的结束,不会因为park()方法而导致线程永久性的挂起。
    这是因为LockSupport类使用类似信号量的机制。它为每一个线程准备了一个许可,如果许可可用,那么park()函数会立即返回,并且消费这个许可(也就是将许可变为不可用),如果许可不可用,就会阻塞。而unpark()则使得一个许可变为可用(但是和信号量不同的是,许可不能累加,你不可能拥有超过一个许可,它永远只有一个)。
    这个特点使得:即使<font color=red>unpark()操作发生在park()之前,它也可以使下一次的park()操作立即返回。这也就是上述代码可顺利结束的主要原因</font>。
    同时,处于park()挂起状态的线程不会像suspend()那样还给出一个令人费解的Runnable的状态。它会非常明确地给出一个WAITING状态,甚至还会标注是park()引起的:

    "t1" #8 prio=5 os_prio=0 tid=0x00b1a400 nid=0x1994 waiting on condition [0x1619f000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
            at geym.conc.ch3.ls.LockSupportDemo$ChangeObjectThread.run(LockSupportDemo.java:18)
            - locked <0x048b2680> (a java.lang.Object)
    

    使得分析问题时格外方便。此外,如果你使用park(Object)函数,还可以为当前线程设置一个阻塞对象。这个阻塞对象会出现在线程Dump中。这样在分析问题时,就更加方便了。
    比如,如果我们将上述代码第14行的park()改为:

    LockSupport.park(this);
    

    那么在线程Dump时,你可能会看到如下信息:

    "t1" #8 prio=5 os_prio=0 tid=0x0117ac00 nid=0x2034 waiting on condition [0x15d0f000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x048b4738> (a geym.conc.ch3.ls.LockSupport-
    Demo$ChangeObjectThread)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at geym.conc.ch3.ls.LockSupportDemo$ChangeObjectThread.run
    (LockSupportDemo.java:18)
            - locked <0x048b2808> (a java.lang.Object)
    

    注意,在堆栈中,我们甚至还看到了当前线程等待的对象,这里就是ChangeObjectThread实例
    除了有定时阻塞的功能外,LockSupport.park()还能支持中断影响。但是和其他接收中断的函数很不一样,LockSupport.park()不会抛出InterruptedException异常。它只是会默默的返回,但是我们可以从Thread.interrupted()等方法获得中断标记。

    01 public class LockSupportIntDemo {
    02     public static Object u = new Object();
    03     static ChangeObjectThread t1 = new ChangeObjectThread("t1");
    04     static ChangeObjectThread t2 = new ChangeObjectThread("t2");
    05
    06     public static class ChangeObjectThread extends Thread {
    07         public ChangeObjectThread(String name){
    08             super.setName(name);
    09         }
    10         @Override
    11         public void run() {
    12             synchronized (u) {
    13                 System.out.println("in "+getName());
    14                 LockSupport.park();
    15                 if(Thread.interrupted()){
    16                     System.out.println(getName()+" 被中断了");
    17                 }
    18             }
    19             System.out.println(getName()+"执行结束");
    20         }
    21     }
    22
    23     public static void main(String[] args) throws InterruptedException {
    24         t1.start();
    25         Thread.sleep(100);
    26         t2.start();
    27         t1.interrupt();
    28         LockSupport.unpark(t2);
    29     }
    30 }
    

    输出

    in t1
    t1 被中断了
    t1 执行结束
    in t2
    t2执行结束
    

    注意上述代码在第27行,中断了处于park()状态的t1。之后,t1可以马上响应这个中断,并且返回。之后在外面等待的t2才可以进入临界区,并最终由LockSupport.unpark(t2)操作使其运行结束。

    相关文章

      网友评论

          本文标题:《实战高并发程序设计》读书笔记-多线程团队协作:同步控制

          本文链接:https://www.haomeiwen.com/subject/heuydltx.html