美文网首页
多线程生产者-消费者模式的实现

多线程生产者-消费者模式的实现

作者: herohua | 来源:发表于2020-01-19 11:38 被阅读0次

    消息实体类:

    public class Message {
    
        private String data;
    
        public Message(String data) {
            this.data = data;
        }
    
        public String getData() {
            return data;
        }
    }
    

    消息队列:

    public class MessageQueue {
    
        private final LinkedList<Message> queue;
    
        private final static int DEFALULT_MAX_LIMIT = 100;
    
        private final int limit;
    
        public MessageQueue() {
            this(DEFALULT_MAX_LIMIT);
        }
    
        public MessageQueue(final int limit) {
            this.limit = limit;
            queue = new LinkedList<>();
        }
    
        /**
         * 向消息队列中放入消息
         */
        public void put(Message message) throws InterruptedException {
            synchronized (queue) {
                while (queue.size() >= limit) {
                    queue.wait();
                }
    
                queue.addLast(message);
                queue.notifyAll();
            }
        }
    
        /**
         * 从消息队列中取出消息
         */
        public Message take() throws InterruptedException {
            synchronized (queue) {
                while (queue.isEmpty()) {
                    queue.wait();
                }
    
                Message message = queue.removeFirst();
                queue.notifyAll();
                return message;
            }
        }
    
        public int getMaxLimit() {
            return this.limit;
        }
    
        public int getMessageSize() {
            synchronized (queue) {
                return queue.size();
            }
        }
    }
    

    生产者线程:

    /**
     * 生产者
     */
    public class ProducerThread extends Thread {
    
        private final MessageQueue queue;
    
        private final static Random random = new Random(System.currentTimeMillis());
    
        private final static AtomicInteger counter = new AtomicInteger();
    
        public ProducerThread(MessageQueue queue, int seq) {
            super("Producer-" + seq);
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Message message = new Message("Message-" + counter.getAndIncrement());
                    queue.put(message);
                    System.out.println(Thread.currentThread().getName() + " put " + message.getData());
    
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    break;
                }
    
            }
        }
    }
    

    消费者线程:

    /**
     * 消费者
     */
    public class ConsumerThread extends Thread {
    
        private final MessageQueue queue;
    
        private final static Random random = new Random(System.currentTimeMillis());
    
        public ConsumerThread(MessageQueue queue, int seq) {
            super("Consumer-" + seq);
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    Message message = queue.take();
                    System.out.println(Thread.currentThread().getName() + " take " + message.getData());
    
                    Thread.sleep(random.nextInt(1000));
                } catch (InterruptedException e) {
                    break;
                }
    
            }
        }
    }
    

    测试:

    /**
     * 生产者消费者模式
     */
    public class Client {
    
        public static void main(String[] args) {
    
            final MessageQueue messageQueue = new MessageQueue();
    
            IntStream.range(0, 5).forEach(i -> {new ProducerThread(messageQueue, i).start();});
            IntStream.range(0, 5).forEach(i -> {new ConsumerThread(messageQueue, i).start();});
        }
    }
    

    测试结果:

    测试结果.png

    相关文章

      网友评论

          本文标题:多线程生产者-消费者模式的实现

          本文链接:https://www.haomeiwen.com/subject/jlzkzctx.html