美文网首页
多线程实现生产者消费者

多线程实现生产者消费者

作者: tanbin_tech | 来源:发表于2020-02-23 16:23 被阅读0次

    共享的buffer容器对象要处理两个问题

    • 取:如果buffer里没有元素,那么就释放锁去等待
    • 加:如果buffer里容量已经满了,那么也要释放锁去等待

    经典的实现

    import java.util.LinkedList;
    import java.util.Queue;
    
    public class SimpleCP {
        public static void main(String[] args) throws InterruptedException{
            Buffer buffer = new Buffer(2);
            Thread producer =new Thread(()->{
                try {
                    int i = 0;
                    while(true){
                        buffer.add(i);
                        System.out.println("product the value: " + i);
                        i++;
                        Thread.sleep(1000);
                    }
                } catch(InterruptedException e){
                    e.printStackTrace();
                }
    
            });
    
            Thread consumer = new Thread(()->{
                try {
                    while(true){
                        int value = buffer.get();
                        System.out.println("consumer the value: " + value);
                        Thread.sleep(1000);
                    }
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            });
    
            producer.start();
            consumer.start();
    
           //主线程等待子线程完成再往下执行
            producer.join();
            consumer.join();
        }
    
        static class Buffer {
            int size;
            Queue<Integer> buffer;
    
            public Buffer(int size) {
                this.size = size;
                this.buffer = new LinkedList<>();
            }
    
            public void add(int value) throws InterruptedException {
                synchronized (this) {
                  //经典的搭配,和wait配合的循环99%是while
                 //获得锁之后还要检查一次是否满足条件,存在仍然不满足的可能
                    while (buffer.size() >= size) {
                        wait(); //释放锁
                    }
                    buffer.add(value);
                    notify(); //唤醒一个等待线程
                }
            }
    
            public int get() throws InterruptedException {
    
                synchronized (this) {
                    while (buffer.size() == 0) {
                        wait();
                    }
                    int value = buffer.poll();
                    notify();
                    return value;
                }
            }
        }
    }
    
    

    使用BlockingQueue 实现

    相对于自己实现同步的容器对象,java已经提供了一种成熟的容器接口BlockingQueue,我们使用它的实现类 LinkedBlockingDeque

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingDeque;
    
    public class BlockingCP {
        public static void main(String[] args) throws InterruptedException{
            BlockingQueue<Integer> buffer = new LinkedBlockingDeque<>(2);
            Thread producer = new Thread(()->{
                try {
                    int i = 0;
                    while(true){
                        buffer.put(i);
                        System.out.println("product the value: " + i);
                        i++;
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
            });
    
            Thread consumer = new Thread(()->{
               try {
                   while(true){
                       int value = buffer.take();  //(1)
                       System.out.println("consume the value: " + value);
                       Thread.sleep(1000);
                   }
               } catch (InterruptedException e){
                   e.printStackTrace();
               }
            });
    
            consumer.start();
            producer.start();
    
            consumer.join();
            producer.join();
        }
    }
    
    

    (1)take方法和poll的区别

    • take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
    • poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

    (2)BlockingQueue添加元素的几个方法:

    • add:插入成功则返回true,否则抛出IllegalStateException异常
    • offer:插入成功返回true,否则返回false
    • offer(E e, long timeout, TimeUnit unit):插入元素成功则返回true,如果空间不足就等待指定时间,超时返回false
    • put:插入元素,如果没有容器没有剩余空间则等待(阻塞)

    相关文章

      网友评论

          本文标题:多线程实现生产者消费者

          本文链接:https://www.haomeiwen.com/subject/jsmjqhtx.html