美文网首页
线程中的生产者/消费者场景

线程中的生产者/消费者场景

作者: UncleYee | 来源:发表于2014-05-06 21:40 被阅读262次

    生产者-消费者问题可以说是线程中最基础,最经典的场景了。它把并发编程中涉及到的一些常见概念都披露了出来,可以说是线程入门绕不开的场景。什么是'生产者-消费者问题'呢,通俗的定义就是:
    在指定容量的容器中,同时存在两种对象对容器进行生产或者消费的动作,由于容器的容量有限,使得"生产"不能太多(太多没有意义,容器装不下),“消费”不能无限(容器中不一定含有那么多消费量)。具体详情可以参见生产者消费者问题

    在Java的线程模型中,我总结了对于这一问题的3种处理模式,可以分别比较一下。

    1. 普通模式 synchronized+notify+await

    这种模式应该最为普遍,不需要了解JDK1.5以后的相关线程类工具,直接使用内置关键字synchronized保证线程访问的同步性,同时使用继承至Object对象的wait,notify方法可以根据业务需求控制线程的实际访问权限。详情如下,这里业务场景是:一个盘子一次只能装一个鸡蛋,分别有放鸡蛋的线程和取鸡蛋的线程对盘子进行存取操作。

    public class Plate {
    
    
        // 容器
        private List<Egg> eggs = new ArrayList<Egg>();
    
    
        // 取鸡蛋的业务逻辑
        public synchronized void getEgg() {
            while (eggs.size() == 0) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Egg e = eggs.get(0);
            eggs.clear();
            System.out.println(">>>>>>>>>>get egg:" + e.getName());
            notify();
        }
    
    
        // 放鸡蛋的业务逻辑
        public synchronized void putEgg(Egg egg) {
            while (eggs.size() != 0) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            eggs.add(egg);
            System.out.println(">>>>>>>>>>put egg:" + egg.getName());
            notify();
        }
    
        // 生产者线程
       static class PutThread implements Runnable{
    
           private  Plate plate;
    
           PutThread(Plate plate) {
               this.plate = plate;
           }
    
           @Override
            public void run() {
               plate.putEgg(new Egg("egg[" + RandomUtils.nextInt(10) + "]"));
            }
        }
    
        //消费者线程
        static  class GetThread implements Runnable{
            private  Plate plate;
    
            GetThread(Plate plate) {
                this.plate = plate;
            }
    
            @Override
            public void run() {
                   plate.getEgg();
            }
        }
    
    
        static class Egg {
            private String name;
            String getName() {
                return name;
            }
            void setName(String name) {
                this.name = name;
            }
    
            Egg(String name) {
                this.name = name;
            }
        }
    
    
        public static void main(String [] args){
            Plate p = new Plate();
            while(true)  {
                new Thread(new PutThread(p)).start();
                new Thread(new GetThread(p)).start();
            }
    
        }
    
    }
    

    2. 巧妙模式 Semaphore

    Semaphore 是JDK5推出线程工具类之一,JDK5推出的一系列线程工具类大大简化了并发编程,覆盖了一些常见的业务场景,后面我会有篇文章单独讲讲这些工具类。
    下面看看如何用Semaphore 进行生产者消费者问题的解决思路。

    public class NewPlate {
    
        private Semaphore fullSema = new Semaphore(10); // 定义容器的最大容量条件
        private Semaphore emptySema = new Semaphore(0);// 定义容器的最小容量条件
        private Semaphore mutex = new Semaphore(1);// 这个非常重要,用来控制 消费/生产逻辑一次只有一个线程来访问,说白了就是模拟Synchronized的语义。
        
        private ArrayList<Object> list = new ArrayList<Object>();
    
        public void set(Object data) {
            try {
                // 先判断是否满了,语义就是wait()
                fullSema.acquire();
    
                // 保证一次只有一个线程访问,语义就是synchronized
                mutex.acquire();
                System.out.printf("=====before set , current size:%d\n", list.size());
                list.add(data);
                TimeUnit.SECONDS.sleep(1);
                System.out.printf("=====after set , current size:%d\n", list.size());
                mutex.release();
                // 为空条件释放一个,语义就是notify()
                emptySema.release();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public Object get() {
            Object ret = null;
            try {
                emptySema.acquire();
                mutex.acquire();
                System.out.printf(">>>>>>before get , current size:%d\n", list.size());
                ret = list.remove(0);
                TimeUnit.SECONDS.sleep(4);
                System.out.printf(">>>>>>after get, current size:%d\n", list.size());
                mutex.release();
                fullSema.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ret;
        }
    
        public static void main(String[] args) {
    
            final NewPlate newPlate = new NewPlate();
            Thread setThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        newPlate.set(new Object());
                    }
                }
            });
            Thread getThead = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        newPlate.get();
                    }
                }
            });
    
            setThread.start();
            getThead.start();
    
    
        }
    
    }
    

    3. 高阶模式 Lock + Condition

    Lock 提供了与synchronized相似的语义,但是功能更为强大,我个人认为这种模式从语义上更好理解,更类似人类的语言逻辑。

    public class BoundedBuffer {
    
        
        private int maxSize;// 容器的容量
        private LinkedList<Object> buffer;  // 容器
    
        private Lock lock;// 锁,用来加锁 生产/消费逻辑,保证一次只有一个线程访问
        private Condition notFull;//  非满条件,在容量已满的情况下,控制生产者继续生产
        private Condition notEmpty;// 非空条件,在容量是空的情况下,控制消费线程继续消费
    
        BoundedBuffer() {
            maxSize = 5;
            buffer = new LinkedList<Object>();
            lock = new ReentrantLock();
            notFull = lock.newCondition();
            notEmpty = lock.newCondition();
        }
    
        public void set(Object data) {
            lock.lock();
            try {
                // 容量已满,则生产者等待
                while (buffer.size() == maxSize) {
                    notFull.await();
                }
                buffer.offer(data);
                TimeUnit.SECONDS.sleep(2);
                System.out.printf("set %s, size:%d\n", Thread.currentThread().getName(), buffer.size());
    
                // 生产完毕,提醒所有消费者可以消费了
                notEmpty.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public Object get() {
            lock.lock();
            Object ret = null;
            try {
                // 容量为空,则消费者等待
                while (buffer.size() == 0) {
                    notEmpty.await();
                }
                ret = buffer.poll();
                TimeUnit.SECONDS.sleep(5);
                System.out.printf("get %s,get one, current size:%d\n", Thread.currentThread().getName(), buffer.size());
    
                // 消费完毕,提醒所有生产者者可以继续生产
                notFull.signalAll();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
            return ret;
        }
    
        static class Setter implements Runnable {
    
            private BoundedBuffer boundedBuffer;
    
            Setter(BoundedBuffer boundedBuffer) {
                this.boundedBuffer = boundedBuffer;
            }
    
            @Override
            public void run() {
                boundedBuffer.set(new Object());
            }
        }
    
        static class Getter implements Runnable {
            private BoundedBuffer boundedBuffer;
    
            Getter(BoundedBuffer boundedBuffer) {
                this.boundedBuffer = boundedBuffer;
            }
    
            @Override
            public void run() {
                boundedBuffer.get();
            }
        }
    
        public static void main(String[] args) {
    
            BoundedBuffer boundedBuffer = new BoundedBuffer();
            Setter setter = new Setter(boundedBuffer);
            Getter getter = new Getter(boundedBuffer);
            for (int i = 0; i < 10; i++) {
                Thread setThread = new Thread(setter);
                Thread getThread = new Thread(getter);
                setThread.start();
                getThread.start();
            }
        }
    

    相关文章

      网友评论

          本文标题:线程中的生产者/消费者场景

          本文链接:https://www.haomeiwen.com/subject/yxprtttx.html