美文网首页
Java中的并发工具类

Java中的并发工具类

作者: 全栈未遂工程师 | 来源:发表于2016-08-13 16:55 被阅读516次

    CountDownLatch等待多线程完成

    CountDownLatch允许一个或多个线程等待其他线程完成操作。

    譬如:解析一个excel,一个线程解析一个sheet页,当所有线程解析完成之后,提示解析完成。可以使用join来实现,也可以用CountDownLatch

    使用join
    join让当前执行线程等待join线程执行结束。

    package com.thread;
    
    public class JoinCountDownLatchTest {
        public static void main(String[] args) {
            Thread parser1 = new Thread(new Runnable(){
                @Override
                public void run() {
                    
                }
            });
            Thread parser2 = new Thread(new Runnable(){
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000*2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("parser2 finished!");
                }
            });
            parser1.start();
            parser2.start();
            try {
                parser1.join();
                parser2.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("All parser finished!");
        }
    }
    

    使用CountDownLatch
    CountDownLatch传入一个N当做计数器,每次执行countDown的时候N就会减1,CountDownLatch的await方法就会阻塞当前线程,直到N变成零。countDown可以是一个线程中的N个步骤或者是N个线程。

    一个线程调用countDown方法,一个线程调用await方法。

    package com.thread;
    
    import java.util.concurrent.CountDownLatch;
    
    public class CountDownLatchTest {
        public static CountDownLatch cdl = new CountDownLatch(2);
        
        public static void main(String[] args) {
            new Thread(new Runnable(){
                @Override
                public void run() {
                    System.out.println(1);
                    cdl.countDown();
                    System.out.println(2);
                    cdl.countDown();
                }
            }).start();
            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(3);
        }
    }
    

    CyclicBarrier同步屏障

    让一组线程到达一个屏障(或者是同步点)的时候被阻塞,直到最后一个线程到达屏障,屏障才会打开,所有的线程继续往下执行。

    package com.thread;
    
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierTest {
        static CyclicBarrier c = new CyclicBarrier(2);
        
        public static void main(String[] args) throws Exception {
            new Thread(new Runnable(){
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                        System.out.println(2);
                        c.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            System.out.println(1);
            c.await();
            System.out.println(3);
        }
    }
    /*
    1
    2
    3
    */
    

    ** CyclicBarrier升级版

    高级的构造方法CyclicBarrier(int parties, Runnable barrierAction):当所有线程到达同步点之后,优先执行barrierAction,等待该线程执行完之后,再继续执行await后面的方法。

    package com.thread;
    
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierTest {
        static CyclicBarrier c = new CyclicBarrier(2, new A());
        
        public static void main(String[] args) throws Exception {
            new Thread(new Runnable(){
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                        System.out.println(2);
                        c.await();
                        System.out.println(2.1);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            System.out.println(1);
            c.await();
            System.out.println(1.1);
        }
        
        static class A implements Runnable{
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(3);
            }
        }
    }
    /*
    1
    2
    3
    2.1
    1.1
    */
    

    CyclicBarrier应用场景

    CyclicBarrierCountDownLatch的区别

    CountDownBatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。因此CyclicBarrier可以实现更加复杂的功能。例如:处理计算错误,可以重置计数器,让线程重新执行一次。

    CyclicBarrier的其他方法:

    • getNumberWaiting:获取阻塞的线程数量。
    • isBroken()用来了解阻塞的线程是否被中断。
    package com.thread;
    
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierTest2 {
        static CyclicBarrier c = new CyclicBarrier(2);
        
        public static void main(String[] args) {
            Thread t1 = new Thread(new Runnable(){
                @Override
                public void run() {
                    try {
                        c.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }       
            });
            t1.start();
            t1.interrupt();
            try {
                c.await();
            } catch (Exception e) {//这里为什么会抛出异常呢???不明白
                System.out.println(c.isBroken());
                e.printStackTrace();
            }
        }
    }
    

    控制并发线程数的Semaphore

    Semaphore(信号量)用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源。

    • int availablePermits:返回此信号量中当前可用的许可证数。
    • int getQueueLength():返回正在等待获取许可证的线程数。
    • boolean hasQueueThreads:是否有线程正在等待获取许可证
    • void reducePermits(int reduction):减少reduction个许可证。
    • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

    下面同时开启了30个线程,都进入了run方法内,但是同时运行在s.acquire();***s.release();之间的只能有10个线程。

    package com.thread;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    public class SemaphoreTest {
        private static final int THREAD_COUNT = 30;
        private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
        private static Semaphore s = new Semaphore(10);
        
        public static void main(String[] args) {
            for(int i=0; i<THREAD_COUNT; i++){
                threadPool.execute(new MyThread(i, s));
            }
            threadPool.shutdown();
        }
        
    }
    class MyThread implements Runnable{
        int c = 0;
        Semaphore s;
        public MyThread(int c, Semaphore s) {
            this.c = c;
            this.s = s;
        }
        @Override
        public void run() {
            try {
                System.out.println(c + " begin:");
                s.acquire();
                System.out.println("saveDate=" + c);
                Thread.sleep(3000);
                s.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    /*
    0 begin:
    1 begin:
    saveDate=0
    saveDate=1
    2 begin:
    saveDate=2
    8 begin:
    6 begin:
    7 begin:
    saveDate=7
    5 begin:
    saveDate=5
    3 begin:
    4 begin:
    saveDate=3
    11 begin:
    saveDate=11
    9 begin:
    saveDate=8
    saveDate=6
    12 begin:
    10 begin:
    saveDate=4
    16 begin:
    20 begin:
    14 begin:
    19 begin:
    15 begin:
    13 begin:
    25 begin:
    29 begin:
    27 begin:
    23 begin:
    17 begin:
    21 begin:
    28 begin:
    24 begin:
    22 begin:
    18 begin:
    26 begin:
    //下面是3s之后
    saveDate=9
    saveDate=12
    saveDate=10
    saveDate=16
    saveDate=20
    saveDate=14
    saveDate=19
    saveDate=15
    saveDate=13
    saveDate=25
    //下面是6s之后
    saveDate=29
    saveDate=27
    saveDate=23
    saveDate=17
    saveDate=21
    saveDate=28
    saveDate=24
    saveDate=22
    saveDate=18
    saveDate=26
    */
    

    线程之间交换数据Exchanger

    Exchanger是个用于线程间协作的工具类,用于线程之间的数据交换。
    它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。第一个线程先执行exchange()方法,第二个线程也执行exchange()方法,当两个线程同时到达同步点,这两个线程就可以交换数据。

    如果一个线程一直没有执行exchange()方法,那么会一直等下去,如果担心特殊情况,可以使用exchange(V v,longtimeout, TimeUnit unit)设置最大等待时间。

    package com.thread;
    
    import java.util.concurrent.Exchanger;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ExchangerTest {
        private static final Exchanger<String> exgr = new Exchanger<String>();
        private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
        
        public static void main(String[] args) {
            threadPool.execute(new Runnable(){
                @Override
                public void run() {
                    String a = "银行流水A";
                    try {
                        String b = exgr.exchange(a);
                        System.out.println("a中数据交换完毕.a=" + a+";b="+b);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            threadPool.execute(new Runnable(){
                @Override
                public void run() {
                    String b = "银行流水B";
                    try {
                        Thread.sleep(3000);
                        String a = exgr.exchange(b);//传递b数据并获得a的数据
                        System.out.println("b中数据交换完毕.a=" + a+";b="+b);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    /*
    b中数据交换完毕.a=银行流水A;b=银行流水B
    a中数据交换完毕.a=银行流水A;b=银行流水B
    */
    

    相关文章

      网友评论

          本文标题:Java中的并发工具类

          本文链接:https://www.haomeiwen.com/subject/rwtysttx.html