美文网首页
阻塞队列的写法

阻塞队列的写法

作者: _Kantin | 来源:发表于2017-09-24 21:12 被阅读37次

    基于linkedList的BlockingQueue的书写

        //java中阻塞队列的应用
    public class BlockingQueue {
        private List queue = new LinkedList<>();
        private int limit = 10;
        public BlockingQueue(int limit){
            this.limit = limit;
        }
        public synchronized void enqueue(Object item) throws Exception{
            while(this.queue.size() == this.limit){
                wait();
            }
            if(this.queue.size()==0){
                notify();
            }
            this.queue.add(item);
        }
        public synchronized void dequeue(Object item) throws Exception{
            while(this.queue.size() == this.limit){
                notify();
            }
            if(this.queue.size()==0){
                wait();
            }
            this.queue.remove(0);
        }
    }
    
    

    异步消息队列

    原始数据类

    public class PCData {
        private final int intData;
        public PCData (int d ){
            intData=d;
        }
        public PCData (String d ){
            intData=Integer.valueOf(d);
        }
        public int getData(){
            return intData;
        }
        @Override
        public String toString() {
            return "data: "+intData;
        }
        
    }
    
    

    消费者

    public class Consumer  implements Runnable{
        private BlockingQueue<PCData> queue;
        private static final int SLEEPTIME = 1000;
        public Consumer(BlockingQueue<PCData> queue){
            this.queue = queue;
        }
        
        @Override
        public void run() {
            System.out.println("start Consumer id :"+Thread.currentThread().getId());
            Random r = new Random();
            try{
                while(true){
                    //注意:take()都remove()都是删除元素并返回队列的头,但是take()操作为空不报错
                    PCData data = queue.take();
                    if(data != null)
                    {
                        int re = data.getData() * data.getData();
                        System.out.println("消费者消耗的数值:  "+MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re));
                        Thread.sleep(r.nextInt(SLEEPTIME));
                    }
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }
    
    
    
    

    生产者

    public class Producer implements Runnable{
        private volatile boolean isRunning = true;
        private BlockingQueue<PCData> queue;
        private static AtomicInteger count = new AtomicInteger();
         private static final int SLEEPTIME = 1000;
        public Producer( BlockingQueue<PCData> queue){
            this.queue = queue;
        }
        @Override
        public void run() {
            PCData data = null;
            Random random = new Random();
             System.out.println("start producting id:" + Thread.currentThread().getId());
             try {
                while(isRunning){
                    Thread.sleep(random.nextInt(SLEEPTIME));
                    data = new PCData(count.incrementAndGet());
                    System.out.println(data + "加入队列");
                    if(!queue.offer(data, 2, TimeUnit.SECONDS)){
                        System.out.println("加入队列失败");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
        public void stop(){
            isRunning = false;
        }
        
    }
    
    

    主测试方法

    public class Main {
        public static void main(String[] args) throws Exception {
            BlockingQueue<PCData> queue = new LinkedBlockingDeque<>(10);
            Producer p1 = new Producer(queue);
            Producer p2 = new Producer(queue);
            Producer p3 = new Producer(queue);
            Consumer c1 = new Consumer(queue);
            Consumer c2 = new Consumer(queue);
            Consumer c3 = new Consumer(queue);
            ExecutorService service = Executors.newCachedThreadPool();
            service.execute(p1);
            service.execute(p2);
            service.execute(p3);
            service.execute(c1);
            service.execute(c2);
            service.execute(c3);
            Thread.sleep(10*1000);
            p1.stop();
            p2.stop();
            p3.stop();
            Thread.sleep(3000);
            service.shutdown();
        }
    }
    

    相关文章

      网友评论

          本文标题:阻塞队列的写法

          本文链接:https://www.haomeiwen.com/subject/neslextx.html