美文网首页JAVA 高级
并发编程艺术-8

并发编程艺术-8

作者: e86dab508bc1 | 来源:发表于2018-04-24 09:25 被阅读0次

    本文主要介绍的是Java 并发编程的里面几个工具类: CountDownLatch, CyclicBarrier, Semaphore, Exchanger, 分析以及使用介绍。

    (1) CountDownLatch 类用一个继承了AQS 抽象类作为内部类,实现了让一个线程或者多个线程等待到达到某一个条件,源码里面使用State 来记录,当state 等于 0 时,所有等待线程都被释放。

    A code CountDownLatch is a versatile synchronization tool and can be used for a number of purposes. A
    CountDownLatch initialized with a count of one serves as a simple on/off latch, or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown. A CountDownLatch initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.

    Example :

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 
     * @author Eric
     *
     */
    public class CountDownLatchUsage {
    
        public static void main(String[] args) throws InterruptedException {
    
            CountDownLatch latch = new CountDownLatch(2);
    
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        latch.await();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("This is the thread one");
    
                }
    
            }).start();
    
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        latch.await();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    System.out.println("This is the thread Two");
    
                }
    
            }).start();
    
            System.out.println("Going to release the latch");
            latch.countDown();//state = 1
            TimeUnit.SECONDS.sleep(10);
            latch.countDown();//state = 0
    
            System.out.println("The end");
    
        }
    
    }
    

    (2) CyclicBarrier 用于一些线程等待其他线程到达同一个栅栏点,最后一个线程到达之前,所有的线程都被阻塞,而且所有等待线程释放后,CyclicBarrier 可以被复用。

    Example :

    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierUsage {
    
        public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
    
            CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
    
                @Override
                public void run() {
                    System.out.println("Let's go!!!");
    
                }
            });
    
            new Thread(new Runnable() {
    
                @Override
                public void run() {
                    System.out.println("Thread 1 is waiting");
                    try {
                        barrier.await();
                        System.out.println("Thread 1 is running");
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
    
                }
    
            }).start();
    
            new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    System.out.println("Thread 2 is waiting");
                    try {
                        barrier.await();
                        System.out.println("Thread 2 is running");
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
    
                }
    
            }).start();
    
            while (Thread.activeCount() > 1) {
                Thread.yield();
            }
        }
    
    }
    

    (3) Semaphore 用来维护一系列许可证,经常用来限制线程的数目,用于资源的访问,同样在内部有一个继承了AQS的Sync,用于公平NonfairSync和FairSync,两者的区别是

    公平锁(首先检查等待队列中是否已有线程在等待)

    protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    

    非公平锁: 直接检查是否能获取许可,可以的话直接运行,否者进入等待队列。

            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    

    Example :

    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    public class SemaphoreUsage {
    
        public static void main(String[] args) {
            Semaphore sem = new Semaphore(2, true);
            
            
            new Thread(new Runnable(){
    
                @Override
                public void run() {
                    
                    try {
                        sem.acquire(1);
                        System.out.println("This is Thread 1, get the permit");
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                        sem.release();
                    }
                    
                    System.out.println("This is Thread 1");
                }
                
            }).start();
            
            new Thread(new Runnable(){
    
                @Override
                public void run() {
    
                    
                    try {
                        sem.acquire(1);
                        System.out.println("This is Thread 2, get the permit");
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                        sem.release();
                    }
                    
                    System.out.println("This is Thread 2");
                    
                
                }
                
            }).start();
            
            new Thread(new Runnable(){
    
                @Override
                public void run() {
    
                    
                    try {
                        sem.acquire(1);
                        System.out.println("This is Thread 3, get the permit");
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                        sem.release();
                    }
                    
                    System.out.println("This is Thread 3");
                    
                
                }
                
            }).start();
            
            new Thread(new Runnable(){
    
                @Override
                public void run() {
    
                    
                    try {
                        sem.acquire(1);
                        System.out.println("This is Thread 4, get the permit");
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                        sem.release();
                    }
                    
                    System.out.println("This is Thread 4");
                    
                }
                
            }).start();
            
            new Thread(new Runnable(){
    
                @Override
                public void run() {
                    try {
                        sem.acquire(1);
                        System.out.println("This is Thread 5, get the permit");
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                        sem.release();
                    }
                    
                    System.out.println("This is Thread 5");
                
                }
                
            }).start();
            
            new Thread(new Runnable(){
    
                @Override
                public void run() {
    
                    try {
                        sem.acquire(1);
                        System.out.println("This is Thread 6, get the permit");
                        TimeUnit.SECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }finally{
                        sem.release();
                    }
                    System.out.println("This is Thread 6");
                }   
            }).start();
        }
    
    }
    
    

    (4) Exchanger: 用来实现线程之间在同步点交换数据,用于基因算法以及管道的设计。

    A synchronization point at which threads can pair and swap elements
    within pairs. Each thread presents some object on entry to the
    {@link #exchange exchange} method, matches with a partner thread,
    and receives its partner's object on return. An Exchanger may be
    viewed as a bidirectional form of a {@link SynchronousQueue}.
    Exchangers may be useful in applications such as genetic algorithms
    and pipeline designs.

    核心算法:

       for (;;) {
       if (slot is empty) {                       // offer
            place item in a Node;
           if (can CAS slot from empty to node) {
              wait for release;
             return matching item in node;
           }
         }
         else if (can CAS slot from node to empty) { // release
           get the item in node;
         set matching item in node;
           release waiting thread;
          }
          // else retry on CAS failure
        }
    

    Example :

    class FillAndEmpty {
        Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
        DataBuffer initialEmptyBuffer = ... a made-up type
       DataBuffer initialFullBuffer = ...
     
        class FillingLoop implements Runnable {
          public void run() {
            DataBuffer currentBuffer = initialEmptyBuffer;
            try {
              while (currentBuffer != null) {
                addToBuffer(currentBuffer);
                if (currentBuffer.isFull())
                  currentBuffer = exchanger.exchange(currentBuffer);
             }
           } catch (InterruptedException ex) { ... handle ... }
          }
        }
     
        class EmptyingLoop implements Runnable {
          public void run() {
          DataBuffer currentBuffer = initialFullBuffer;
            try {
              while (currentBuffer != null) {
                takeFromBuffer(currentBuffer);
              if (currentBuffer.isEmpty())
                 currentBuffer = exchanger.exchange(currentBuffer);
           }
            } catch (InterruptedException ex) { ... handle ...}
          }
        }
    
        void start() {
        new Thread(new FillingLoop()).start();
        new Thread(new EmptyingLoop()).start();
        }
     }}
    

    Exception one(会一直waiting,找找原因):

    
    import java.util.concurrent.Exchanger;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class ExchangerUsage {
    
        
        private static Exchanger<String> data = new Exchanger<String>();
        
        
        public static void main(String[] args) {
            
            ExchangerServer server = new ExchangerServer();
            ExchangerClient client = new ExchangerClient();
            
            server.run();
            client.run();
        }
        
        
        
        private static class ExchangerServer implements Runnable{
    
            @Override
            public void run() {
                String A = "A";
                        try {
                            try {
                                System.out.println("Data from client" + data.exchange(A,5,TimeUnit.SECONDS));
                            } catch (TimeoutException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                    
                }
                
            }
            
            
            
            
        }
        
        
        
        private static class ExchangerClient implements Runnable{
    
            @Override
            public void run() {
    
                String B = "B";
                        try {
                            try {
                                System.out.println("Data from client" + data.exchange(B,5,TimeUnit.SECONDS));
                            } catch (TimeoutException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                    
                }
                
            
            }
        }
    
    }
    
    

    相关文章

      网友评论

        本文标题:并发编程艺术-8

        本文链接:https://www.haomeiwen.com/subject/dofqlftx.html