美文网首页
用BlockingQueue实现生产者和消费者

用BlockingQueue实现生产者和消费者

作者: 就这些吗 | 来源:发表于2019-12-26 20:42 被阅读0次

这次不多比比,直接上代码,发现网上很多Demo都没有去测一下offer、add、put的方法的区别就乱用,完全没用到blockingqueue的特性,相当于用了一个普通queue而已。这里简单贴一下

offer:如果队列满,会返回false,
add:如果队列满,会报错,
put:如果队列满,会阻塞该线程,直到队列有位置放

remove:如果队列为空,会报错
poll:如果队列为空,返回null
take:如果队列为空,阻塞线程,直到队列中有值

知道了这些我们再来贴代码就好理解啦
实体类:

public class Data {
    int num;

    public int getNum() {
        return num;
    }

    public Data(int num) {
        super();
        this.num = num;
    }

}


生产者:

public class Producer implements Runnable {

    private volatile boolean isRunning = true;
    private  BlockingQueue<Data> queue;
    private static AtomicInteger count = new AtomicInteger();
    private static final int SLEEPTIME = 1000;

    public Producer(BlockingQueue<Data> queue) {
        super();
        this.queue = queue;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        System.out.println("线程id为" + Thread.currentThread().getId() + "的生产者启动了");
        Data data = null;
        Random r = new Random();
        while (isRunning) {
            try {
                //这边的睡就是用来模仿生产者生产过程啦,实际业务中哪有瞬间生产完的
                Thread.sleep(r.nextInt(SLEEPTIME));
                //多线程的情况下当然要用原子类来操作啦
                data = new Data(count.incrementAndGet());
                //这边只能用put(),用其他add,offer的话队列满了不会阻塞线程。
                queue.put(data);
    System.out.println("生产者" + Thread.currentThread().getId() + "把 data为:" 
                + data.getNum() + "BlockingQueue里去啦,现在队列里还有"+queue.size()+"个");
                
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                
            }

        }

    }
    //这是外部用来调用停止生产的方法
    public void stop() {
        this.isRunning=false;
    }
}

消费者:

public class Consumer implements Runnable {
    
    private BlockingQueue <Data> queue;
    private static final int SLEEPTIME =1000;
    
    public Consumer(BlockingQueue<Data> queue) {
        super();
        this.queue = queue;
    }


    @Override
    public void run() {
        // TODO Auto-generated method stub
        System.out.println("线程id为"+Thread.currentThread().getId()+"的消费者启动了");
        Random r=new Random();
        while(true) {
            Data data;
            try {
                //注意哦 这里只能用take()方法,用其他的话如果队列空了BlockingQueue是不会阻塞线程的。
                data = queue.take();
                //模仿消费者工作,这里就是简单的把数组拿出来算出平方,然后随便睡一会。
                System.out.println("消费者"+Thread.currentThread().getId()+"处理数据"+Math.pow(data.getNum(), 2)
                +"现在队列里还有"+queue.size()+"个");
                Thread.sleep(r.nextInt(SLEEPTIME));
                
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    

 
} 

主方法:

public class start {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Data> queue = new LinkedBlockingQueue<>(1);
        Producer p1 = new Producer(queue);
        Producer p2 = new Producer(queue);
        Producer p3 = new Producer(queue);
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);
        Consumer c3 = new Consumer(queue);
        ExecutorService service = Executors.newCachedThreadPool();
        service.execute(p1);
        service.execute(p2);
        service.execute(p3);
        service.execute(c1);
        service.execute(c2);
        service.execute(c3);
        //这边的sleep是停止主线程,也就是main方法执行的这个,是为了待会再关掉生产者
        //否则主线程一路执行下来,咱们生产者还没造出来东西呢,就给stop了
        Thread.sleep(3 * 1000);
        p1.stop();
        p2.stop();
        p3.stop();
        //这边是为了给消费者时间去消费队列里的东西。
        Thread.sleep(1 * 1000);
        //线程池使用完后如果不调用shutdown会导致线程池资源一直不会被释放,记得释放哦
        service.shutdown();

    }

}

相关文章

网友评论

      本文标题:用BlockingQueue实现生产者和消费者

      本文链接:https://www.haomeiwen.com/subject/jivloctx.html