美文网首页
java两种方式实现生产消费者模式

java两种方式实现生产消费者模式

作者: Xigong | 来源:发表于2019-02-23 11:03 被阅读0次

一.通过线程锁实现

package 多线程;

import java.util.concurrent.atomic.AtomicInteger;

public class 生产消费者2 {


    /**
     * 消费者
     */
    static class Consumer extends Thread {
        private final AtomicInteger dataHolder;
        private final Object consumerLock;
        private final Object producerLock;

        Consumer(AtomicInteger dataHolder, Object consumerLock, Object producerLock) {
            this.dataHolder = dataHolder;
            this.consumerLock = consumerLock;
            this.producerLock = producerLock;
        }

        @Override
        public void run() {
            super.run();
            while (true) {

                // 阻塞,等待生产者唤醒
                try {
                    synchronized (consumerLock) {
                        consumerLock.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                // 消费数据
                System.out.println("消费->" + dataHolder.get());
                System.out.println();

                // 消费完了,通知生产者开始生产
                synchronized (producerLock) {
                    producerLock.notifyAll();
                }

            }
        }
    }

    /**
     * 消费者
     */
    static class Producer extends Thread {
        private final AtomicInteger dataHolder;
        private final Object consumeLock;
        private final Object productLock;

        Producer(AtomicInteger dataHolder, Object consumeLock, Object productLock) {
            this.dataHolder = dataHolder;
            this.consumeLock = consumeLock;
            this.productLock = productLock;
        }


        @Override
        public void run() {
            super.run();
            while (true) {

                // 生产数据
                System.out.println("生产->" + dataHolder.incrementAndGet());

                // 唤醒消费者,去消费数据
                synchronized (consumeLock) {
                    consumeLock.notifyAll();
                }

                // 阻塞生产者
                synchronized (productLock) {
                    try {
                        productLock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                // 睡眠一会儿,不要生产过快
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final AtomicInteger dataHolder = new AtomicInteger();
        final Object consumeLock = new Object();
        final Object productLock = new Object();
        Producer producer = new Producer(dataHolder, consumeLock, productLock);
        Consumer consumer = new Consumer(dataHolder, consumeLock, productLock);
        consumer.start();
        producer.start();
        consumer.join();
        producer.join();
    }
}

日志

生产->1
消费->1

生产->2
消费->2

生产->3
消费->3

生产->4
消费->4

这个方式简单易懂,关键点是锁时机

二.通过BlockingQueue实现

package 多线程;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class LogThread extends Thread {
    final protected void log(String message) {
        System.out.println(getName() + ":" + message);
    }
}

class ProductThread extends LogThread {
    private final BlockingQueue<Integer> queue;

    ProductThread(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        super.run();
        int i = 0;
        while (true) {
            try {
                queue.put(++i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log("生产->" + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class ConsumerThread extends LogThread {
    private final BlockingQueue<Integer> queue;

    public ConsumerThread(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            Integer data = null;
            try {
                data = this.queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (data != null) {
                log("消费->" + data);
                System.out.println();
            }
        }

    }
}

public class 生产消费者 {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
        ProductThread productThread = new ProductThread(queue);
        for (int i = 0; i < 5; i++) {
            ConsumerThread consumerThread = new ConsumerThread(queue);
            consumerThread.start();
        }
        productThread.start();
        productThread.join();
    }

}

日志如下:

Thread-0:生产->1
Thread-1:消费->1

Thread-0:生产->2
Thread-3:消费->2

Thread-0:生产->3
Thread-4:消费->3

Thread-0:生产->4
Thread-2:消费->4

Thread-0:生产->5
Thread-5:消费->5

原理如下:
BlockingQueue.put(v) 如何容器没有空间的,也就是数据没有消费完,则阻塞

BlockingQueue.take()方法定义,如果数据为空,也就是数据没有生产出来,则阻塞。
BlockingQueue 的容量,可以通过构造函数传入,默认是Integer.MAX_VALUE

相关文章

网友评论

      本文标题:java两种方式实现生产消费者模式

      本文链接:https://www.haomeiwen.com/subject/tptuyqtx.html