美文网首页
【Java并发】如何实现非阻塞式生产者消费者?

【Java并发】如何实现非阻塞式生产者消费者?

作者: itbird01 | 来源:发表于2022-02-20 08:10 被阅读0次

    1.问题描述

    实现Java非阻塞式生产者消费者,用来解决,生产和消费对于资源访问不同步和造成资源冗余的问题

    2.实现思想

    • 针对于同一资源,生产者生产前会前检测资源是否大于0,如果大于0,则生产者线程释放资源锁,进入waiting阶段,如果小于0,则生产者线程持有锁,并且生产资源,生产一定资源之后,通知消费者。
    • 消费者线程,去消费之前,会检测资源是否大于0,如果小于0,则阻塞等待,并且通知生产者生产资源,如果大于0,则消费资源,消费完成,唤醒生产者,生产资源。
    • 这样就实现了每次生产者生产都是生产一定的资源,等待消费者消费完成之后,才去继续生产

    3.代码

    3.1使用wait、notify

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    public class Main {
        public static class Breads {
            private List<Integer> bordIntegers = new ArrayList<Integer>();
            public synchronized void add() {
                if (bordIntegers.size() > 0) {
                    System.out.println("生产者检测到目前资源大于0,不生产");
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                System.out.println("生产者检测到目前资源小于0,开始生产");
                for (int i = 0; i < 5; i++) {
                    bordIntegers.add(new Random().nextInt(1000));
                }
    
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                notify();
            }
    
            public synchronized void del() {
                if (bordIntegers.size() <= 0) {
                    System.out.println("消费者检测到目前资源小于0,通知生产者");
                    try {
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                System.out.println("消费者检测到目前资源大于0,开始消费");
                for (int i = 0; i < bordIntegers.size(); i++) {
                    System.out.println(bordIntegers.get(i));
                }
                bordIntegers.clear();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                notify();
            }
        }
        public static void main(String[] args) {
            Breads borad = new Breads();
            Thread createThread = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    while (true) {
                        borad.add();
                    }
                }
            });
    
            Thread consumethread = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    while (true) {
                        borad.del();
                    }
                }
            });
    
            createThread.start();
            consumethread.start();
            System.out.println("Main Thread exit!");
        }
    }
    

    3.2使用ArrayBlockingQueue

    import java.util.concurrent.ArrayBlockingQueue;
    
    public class Main {
        static ArrayBlockingQueue<Integer> bordBlockingQueue = new ArrayBlockingQueue<Integer>(
                10);
        public static void main(String[] args) {
            Thread createThread = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    while (true) {
                        for (int i = 0; i < 10; i++) {
                            try {
                                bordBlockingQueue.put(i);
                                System.out.println("生产者生产");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
    
            Thread consumethread = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    while (true) {
                        while (!bordBlockingQueue.isEmpty()) {
                            try {
                                System.out.println("消费者消费");
                                bordBlockingQueue.take();
                            } catch (InterruptedException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
    
            createThread.start();
            consumethread.start();
            System.out.println("Main Thread exit!");
        }
    }
    

    相关文章

      网友评论

          本文标题:【Java并发】如何实现非阻塞式生产者消费者?

          本文链接:https://www.haomeiwen.com/subject/iplllrtx.html