美文网首页
多线程之并发工具类

多线程之并发工具类

作者: OPice | 来源:发表于2019-09-23 13:06 被阅读0次

    CountDownLatch

    在开发过程中经常会碰到一个任务需要开启多个线程,然后将多个线程的执行结果汇总。比如说查询全量数据,考虑数据量的问题,我们基本上会做分页,这时候就需要多次循环调用。CountDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
    执行原理
    CountDownLatch内部实现了AQS,初始化CountDownLatch的时候,会调用Sync的构造方法将count赋值给state变量。多个线程调用countDown的时候,是使用CAS递减state的值;调用await方法的线程会被放在AQS阻塞队列中,等待计数器为0时,唤醒该线程。
    核心方法

    //构造方法,初始化计数器为count
     public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            //Sync实现了AQS
            this.sync = new Sync(count);
        }
    //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
    public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    //挂起timeout,超过这个时间继续执行
     public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    //将count -1
    public void countDown() {
            sync.releaseShared(1);
        }
    //获取当前count值
     public long getCount() {
            return sync.getCount();
        }
    

    走个栗子

    public class CountDownLatchDemo implements Runnable{
        public static CountDownLatch countDownLatch = new CountDownLatch(10);
        private Integer count;
    
        public CountDownLatchDemo( Integer count) {
            this.count = count;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("分页查询:"+count*100+"~"+(count+1)*100);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }
    
        public static void main(String[] args) throws InterruptedException{
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
    
            for (int i = 0; i < 10; i++) {
                CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo(i);
                threadPoolExecutor.submit(countDownLatchDemo);
            }
            countDownLatch.await();
            System.out.println("分页全量数据查询完毕");
        }
    }
    

    CyclicBarrier

    等待所有线程达到一个屏障时在执行.
    原理
    CyclicBarrier是由ReentrantLock可重入锁和Condition共同实现的。
    核心方法

        /** The lock for guarding barrier entry */
        private final ReentrantLock lock = new ReentrantLock();
        /** Condition to wait on until tripped */
        private final Condition trip = lock.newCondition();
        /** The number of parties */
        private final int parties;
        /* The command to run when tripped */
        private final Runnable barrierCommand;
        /** The current generation */
        private Generation generation = new Generation();
      
    //定义一个CyclicBarrier,parties 代表拦截的线程数量
     public CyclicBarrier(int parties) {
            this(parties, null);
        }
    
    //定义一个CyclicBarrier,parties 代表拦截的线程数量,由最后一个线程执行barrierAction
    public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
    //调用
    public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
    public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
    //返回当前在屏障处等待的设备
    public int getNumberWaiting() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return parties - count;
            } finally {
                lock.unlock();
            }
        }
    
    //屏障是否可用
    public boolean isBroken() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return generation.broken;
            } finally {
                lock.unlock();
            }
        }
    //返回参与屏障的数量
    public int getParties() {
            return parties;
        }
    
    

    跑个栗子

    @Slf4j
    public class CyclicBarrierTest implements Runnable {
        private CyclicBarrier barrier;
    
        private String name;
        private Long time;
    
        public CyclicBarrierTest(CyclicBarrier barrier, String name, Long time) {
            this.barrier = barrier;
            this.name = name;
            this.time = time;
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep(time * 1000);
                System.out.println(name + "到了");
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
    
    @Slf4j
    public class MainThread implements Runnable {
        @Override
        public void run() {
            System.out.println("人到齐了,开饭.....");
        }
    
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new MainThread());
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小花",2L));
            executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小红",1L));
            executorService.execute(new CyclicBarrierTest(cyclicBarrier,"小明",5L));
    
        }
    }
    
    cyclicbarriertest.gif

    其他例子

    public class CyclicBarrierDemo implements Runnable {
        CyclicBarrier cyclicBarrier;
    
        private Integer count;
    
        public CyclicBarrierDemo(CyclicBarrier cyclicBarrier, Integer count) {
            this.cyclicBarrier = cyclicBarrier;
            this.count = count;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("accept iot device run data:" + count);
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    
    
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Thread() {
                @Override
                public void run() {
                    System.out.println("run data count" + 10 + ",开始执行。。。。");
                }
            });
    
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            for (int i = 0; i < 10; i++) {
                CyclicBarrierDemo cyclicBarrierDemo = new CyclicBarrierDemo(cyclicBarrier,i);
                threadPoolExecutor.submit(cyclicBarrierDemo);
            }
        }
    }
    

    Semaphore

    Semaphore 信号量,控制并发时线程的数量。

    跑个栗子

    //健身房有好几个跑步机
    public class FitnessRoom {
    
        class TreadMill {
            private Integer num;
    
            public Integer getNum() {
                return num;
            }
    
            public void setNum(Integer num) {
                this.num = num;
            }
    
            public TreadMill(Integer num) {
                this.num = num;
            }
        }
    
        private TreadMill[] treadMills = new TreadMill[]{new TreadMill(1), new TreadMill(2), new TreadMill(3), new TreadMill(4)};
        private boolean[] use = new boolean[4];
    
        Semaphore semaphore = new Semaphore(4, true);
    
        //获取一个跑步机
        public TreadMill get() throws InterruptedException {
            semaphore.acquire(1);
            return getAvailable();
        }
    
        // 遍历找一个没人的跑步机
        public TreadMill getAvailable() {
            for (int i = 0; i < use.length; i++) {
                if (!use[i]) {
                    use[i] = true;
                    return treadMills[i];
                }
            }
            return null;
        }
    
        /**
         * 释放跑步机
         * @param treadMill
         */
        public void release(TreadMill treadMill) {
            for (int i = 0; i < use.length; i++) {
                if (treadMills[i] == treadMill) {
                    if (use[i]) {
                        use[i] = false;
                    }
                }
    
            }
        }
    }
    
    public class MainThread implements Runnable {
    
        private String name;
        private FitnessRoom fitnessRoom;
    
        public MainThread(String name, FitnessRoom fitnessRoom) {
            this.name = name;
            this.fitnessRoom = fitnessRoom;
        }
    
        @Override
        public void run() {
            try {
                FitnessRoom.TreadMill treadMill = fitnessRoom.get();
                if (null != treadMill) {
                    System.out.println(name + "在" + treadMill.getNum() + "号跑步机上跑步");
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println(name + "跑完了");
                    //跑步机腾出来了
                    fitnessRoom.release(treadMill);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
            }
        }
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            FitnessRoom fitnessRoom = new FitnessRoom();
            for (int i = 0; i < 20; i++) {
                executorService.execute(new MainThread(i + "", fitnessRoom));
            }
        }
    }
    
    semaphore.gif

    Exchanger

    两个线程之间交互数据工具类

    原子操作类

    AtomicBoolean
    AtomicInteger
    AtomicLong
    AtomicIntegerArray
    AtomicLongArray
    AtomicReferenceArray
    AtomicReference
    AtomicReferenceFieldUpdater
    AtomicMarkableReference
    AtomicIntegerFieldUpdater
    AtomicLongFieldUpdater
    AtomicStampedFieldUpdater
    AtomicReferenceFieldUpdater

    集合工具类

    ConcurrentHashMap
    ConcurrentSkipListMap
    CopyOnWriteArrayList
    CopyOnWriteArraySet

    相关文章

      网友评论

          本文标题:多线程之并发工具类

          本文链接:https://www.haomeiwen.com/subject/arwfjctx.html