多线程之并发工具类(七)

作者: Real_man | 来源:发表于2018-04-09 22:03 被阅读41次

    “工欲善其事必先利其器”,有了这些并发工具,多线程控制变得So easy。

    与文无关

    JDK中已经给我们内置了很多并发工具,都属于应用类型,知道具体如何使用就好,主要讲以下几个类:

    • CountDownLatch
    • CyclicBarrier
    • Semaphore
    • LockSupport
    • BlockingQueue

    这次的几个案例都需要实际运行,看运行效果才明白怎么回事,代码可以直接复制粘贴。

    CountDownLatch

    多线程控制类,计数器栅栏,当计数器满足条件的时候,再开始执行接下来的操作。

    public class CountDownLatchTest {
        static final int THREAD_COUNT = 10;
        static final CountDownLatch end = new CountDownLatch(THREAD_COUNT);
    
        public static void main(String[] args) throws InterruptedException {
            Runnable demo = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("检查完成");
                    end.countDown();
                }
            };
            
            //线程池内有5个线程方便看效果
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            for (int i = 0; i < THREAD_COUNT; i++) {
                executorService.submit(demo);
            }
    
            end.await();
            System.out.println("一切就绪");
            executorService.shutdown();
        }
    }
    
    CountDownLatch 运行效果

    CyclicBarrier

    循环栅栏,可以看做CountDownLatch的重复利用。当满足一定的条件时候,才开始执行某线程。

    // 当线程的数量满足条件时候,才开始执行。
    public class CyclicBarrierTest {
    
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {
                @Override
                public void run() {
                    System.out.println("一切就绪,准备出发");
                }
            });
    
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        System.out.println(Thread.currentThread().getId() + ":就绪");
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            ExecutorService executorService = Executors.newFixedThreadPool(4);
            for (int i = 0; i < 4; i++) {
                executorService.submit(task);
            }
    
    
            executorService.shutdown();
    
        }
    }
    
    CyclicBarrier运行结果

    所有的线程都在等待,当等待的线程达到一定的数量,然后开始执行接下来的操作。

    Semaphore

    Semaphore,也是控制线程的一种手段,可以控制并发线程的数量,某些时候我们线程数过多,在访问有限的资源时候,可以使用Semaphore来控制线程的数量。

    public class SemaphoreDemo implements Runnable {
        Semaphore mSemaphore = new Semaphore(5);
    
        @Override
        public void run() {
            try {
                mSemaphore.acquire();
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getId() + " done!");
                mSemaphore.release();
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(20);
            SemaphoreDemo demo = new SemaphoreDemo();
            for (int i = 0; i < 20; i++) {
                executorService.submit(demo);
            }
            executorService.shutdown();
            ;
        }
    }
    
    Semaphore 运行案例

    LockSupport

    LockSupport提供了一些静态方法用于阻塞线程,和唤醒线程的功能。
    处于park()挂起状态的线程是Waiting状态,park()方法阻塞的线程还支持中断,不抛出中断异常的同时设置中断标志位,然后我们可以通过中断标志位来检查。

    public class LockDemo implements Runnable{
        public static Object sObject = new Object();
    
        @Override
        public void run() {
            synchronized (sObject){
                System.out.println("当前线程名称:" + Thread.currentThread().getName());
                LockSupport.park();
    
                if (Thread.currentThread().isInterrupted()){
                    System.out.println( Thread.currentThread().getName() +  "被中断了");
                }
                System.out.println("执行结束");
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            LockDemo demo = new LockDemo();
            Thread t1 = new Thread(demo,"t1");
            Thread t2 = new Thread(demo,"t2");
            t1.start();
            Thread.sleep(3000);
            t2.start();
            t1.interrupt();
            LockSupport.unpark(t2);
        }
    }
    
    LockSupport的demo

    BlockingQueue

    Java的Queue也是面试中经常提到的知识点,这次因为我们只涉及到并发相关知识,所以只提一些并发相关的Queue,关于Queue的具体分析等后面的数据结构系列的时候再详细解说。

    BlockingQueue是Java中的阻塞队列,JDK中提供了7个阻塞队列

    • ArrayBlockingQueue : 数组实现的有界队列,对元素进行FIFO(先进先出)的原则排序。
    • LinkedBlockingQueue: 链表组成的有界队列,长度默认最大值为Integer.MAX_VALUE,元素按FIFO原则排序,性能好于ArrayBlockingQueue。
    • PriorityBlockingQueue:支持优先级的无界阻塞队列。
    • DelayQueue: 支持延迟获取元素的无界阻塞队列
    • SynchronousQueue:不存储元素的阻塞队列。每一个put操作必须等待take操作,否则不能继续添加元素。
    • LinkedTransferQueue:链表组成的无界传输队列
    • LinkedBlockingDeque:由链表组成的双向阻塞队列。可以从两段插入和移除元素。

    带大家看一下LinkedBlockingQueue的几个关键方法:

       //LinkedBlockingQueue 方法探索
      // 添加元素
        public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            //如果队列满了,直接返回false
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                return false;
            // 创建新的节点
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                // 如果队列不满的话,就让元素加入队列。
                //然后判断,当前队列元素是否满了,不满的话,通知notFull条件。
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            // 假如添加的是第一个元素,通知队列不为空了。
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }
    
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
           
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                // 当队列满的时候进行等待。若不满入队
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
              
               // 同offer
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            // 同offer
            if (c == 0)
                signalNotEmpty();
        }
    

    可以看出添加元素上:

    • 当队列满的时候,offer不添加元素,立刻返回。put则会阻塞操作,直到队列为不满。
    • 还有一个带参数的offer方法,和put的唯一区别就是有超时时间,在一段时间内队列还不空的话,就返回。
       //LinkedBlockingQueue 方法探索
      // 移除
        public E poll() {
            final AtomicInteger count = this.count;
            // 队列为空,返回null
            if (count.get() == 0)
                return null;
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                //队列有元素的话,取出元素
                //取出元素后如果队列是不为空,发出不为空的信号。
                if (count.get() > 0) {
                    x = dequeue();
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            //如果取出元素之前,队列是满的,因为取出了元素,现在发出不满的信号
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
               // 队列为空的话,就等待队列不为空。
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    可以看出LinkedBlockingQueue的移除操作poll和take方法:

    • poll不阻塞,take会阻塞
    • poll(long timeout, TimeUnit unit),当队列为空的时候,等待指定的时间,如果队列扔为空,那么就返回。

    这次是带领大家一起看了下LinkedBlockingQueue的关键方法,其它的队列的操作也都类似,望大家自行查看,JDK中Queue的实现并不难理解。

    最后

    这次主要介绍了几个并发中可能会用到的工具类,最后说了下JDK并发包中的阻塞队列,阻塞队列相对比较重要,就简单的分析了其实现。
    希望能帮助到大家。

    参考

    • 《Java并发实战》
    • 《Java高并发程序设计》
    • 《并发编程的艺术》

    相关文章

      网友评论

        本文标题:多线程之并发工具类(七)

        本文链接:https://www.haomeiwen.com/subject/yroihftx.html