美文网首页
显示锁(Lock)和阻塞对列(BlockingQueue)

显示锁(Lock)和阻塞对列(BlockingQueue)

作者: SilenceDut | 来源:发表于2016-07-18 20:20 被阅读545次

    synchronized是不错,但它并不完美。它有一些功能性的限制:比如它无法中断一个正在等候获得锁的线程;

    显示锁LocK

    java.util.concurrent.lock 中的Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,而不是作为语言的特性来实现。这就为Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。

    ReentrantLock

    ReentrantLock 类实现了Lock ,它拥有与synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。)

    class LockStudy {            
        private Lock lock = new ReentrantLock();// 锁对象            
        public void output(String name) {                              
            lock.lock();      // 得到锁                
            try {                   
                //doSomething            
            } finally {                  
                lock.unlock();// 释放锁                
            }            
        }        
    }   
    

    需要注意的是,用synchronized修饰的方法或者语句块在代码执行完之后锁自动释放,而是用Lock需要我们手动释放锁,所以为了保证锁最终被释放(发生异常情况),要把互斥区放在try内,释放锁放在finally内。

    Condition

    ReentrantLock里有个函数newCondition(),该函数得到一个锁上的"条件",用于实现线程间的通信,条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。
    Condition拥有await(),signal(),signalAll(),await对应于Object.waitsignal对应于Object.notifysignalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法()因为任何类都拥有这些方法。
    每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。下面是一个用Lock和Condition实现的一个生产者消费者的模式:

    import java.util.concurrent.locks.Condition;  
    import java.util.concurrent.locks.Lock;  
    import java.util.concurrent.locks.ReentrantLock;
    public class ProductQueue<T> {  
    
        private final T[] items;  
    
        private final Lock lock = new ReentrantLock();  
    
        private Condition notFull = lock.newCondition();  
    
        private Condition notEmpty = lock.newCondition();  
    
    
        private int head, tail, count;  
    
        public ProductQueue(int maxSize) {  
            items = (T[]) new Object[maxSize];  
        }  
    
        public ProductQueue() {  
            this(10);  
        }  
    
        public void put(T t) throws InterruptedException {  
            lock.lock();  
            try {  
                while (count == getCapacity()) {  
                    notFull.await();  
                }  
                items[tail] = t;  
                if (++tail == getCapacity()) {  
                    tail = 0;  
                }  
                ++count;  
                notEmpty.signalAll();  
            } finally {  
                lock.unlock();  
            }  
        }  
    
        public T take() throws InterruptedException {  
            lock.lock();  
            try {  
                while (count == 0) {  
                    notEmpty.await();  
                }  
                T ret = items[head];  
                items[head] = null;//GC  
          
                if (++head == getCapacity()) {  
                    head = 0;  
                }  
                --count;  
                notFull.signalAll();  
                return ret;  
            } finally {  
                lock.unlock();  
            }  
        }  
    
        public int getCapacity() {  
            return items.length;  
        }  
    
        public int size() {  
            lock.lock();  
            try {  
                return count;  
            } finally {  
                lock.unlock();  
            }  
        }
    }  
    

    这就是多个Condition的强大之处,假设缓存队列中已经存满,那么阻塞的肯定是写线程,唤醒的肯定是读线程,相反,阻塞的肯定是读线程,唤醒的肯定是写线程,那么假设只有一个Condition会有什么效果呢,缓存队列中已经存满,这个Lock不知道唤醒的是读线程还是写线程了,如果唤醒的是读线程,皆大欢喜,如果唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这时又去唤醒,这样就浪费了很多时间。

    读写锁ReadWriteLock

    读-写锁定允许对共享数据进行更高级别的并发访问。虽然一次只有一个线程(writer 线程)可以修改共享数据,但在许多情况下,任何数量的线程可以同时读取共享数据(reader 线程)。

    阻塞队列——BlockingQueue

    阻塞队列(BlockingQueue)是java.util.concurrent下的主要用来控制线程同步的工具。如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒。同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作。
    阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。BlockingQueue是具体实现的接口。具体的实现类有LinkedBlockingQueue,ArrayBlockingQueued等。一般其内部的都是通过Lock和Condition来实现阻塞和唤醒。下面是一个通过BlockingQueue实现的生产者消费者的例子。

    生产者:

    public class Producer implements Runnable {  
        BlockingQueue<String> queue;  
    
        public Producer(BlockingQueue<String> queue) {  
            this.queue = queue;  
        }  
    
        @Override  
        public void run() {  
            try {  
                String temp = "A Product, 生产线程:"  
                    + Thread.currentThread().getName(    );  
                System.out.println("I have made a product:"  
                    + Thread.currentThread().getName());  
                queue.put(temp);//如果队列是满的话,会阻塞当前线程  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
    

    消费者:

    public class Consumer implements Runnable{  
        BlockingQueue<String> queue;  
      
        public Consumer(BlockingQueue<String> queue){  
            this.queue = queue;  
        }  
      
        @Override  
        public void run() {  
            try {  
                String temp = queue.take();//如果队列为空,会阻塞当前线程  
                System.out.println(temp);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
    

    测试类:

    public class ProducerConsumeTest{  
    
        public static void main(String[] args) {  
            BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);  
            // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();  
            //不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE  
          
            // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);  
            Consumer consumer = new Consumer(queue);  
            Producer producer = new Producer(queue);  
            for (int i = 0; i < 5; i++) {  
                new Thread(producer, "Producer" + (i + 1)).start();  
                new Thread(consumer, "Consumer" + (i + 1)).start();  
            }  
        }  
    }
    

    参考:Java线程(九):Condition-线程通信更高效的方式

    相关文章

      网友评论

          本文标题:显示锁(Lock)和阻塞对列(BlockingQueue)

          本文链接:https://www.haomeiwen.com/subject/ndxmjttx.html