美文网首页
通过阻塞队列实现线程间的通信

通过阻塞队列实现线程间的通信

作者: 溪水散人 | 来源:发表于2019-05-04 00:10 被阅读0次

    话不多说,直接上代码:

    • 资源类:
    /**
     * 资源类
     */
    class DataSource{
        private volatile boolean FLAG=true;
        private AtomicInteger atomicInteger=new AtomicInteger();
    
        BlockingQueue<String> blockingDeque=null;
    
        public DataSource(BlockingQueue blockingDeque){
            this.blockingDeque=blockingDeque;
            System.out.println(blockingDeque.getClass().getName());
        }
    
        public void produce() throws  Exception{
            String data=null;
            boolean retValue;
            while (FLAG) {
                data=atomicInteger.incrementAndGet()+"";
                retValue=blockingDeque.offer(data,2L, TimeUnit.SECONDS);
                if (retValue){
                    System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
                }else {
                    System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
                }
                TimeUnit.SECONDS.sleep(1);
            }
            System.out.println(Thread.currentThread().getName()+"\t叫停:表示FLAG=false,生产动作结束");
        }
    
        public void consumer() throws Exception{
            String result=null;
            while (true){
                result=blockingDeque.poll(2L,TimeUnit.SECONDS);
                if (result ==null || result.equalsIgnoreCase("")){
                    System.out.println(Thread.currentThread().getName()+"\t超过2S没有取到,退出");
                    return;
                }
                System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
            }
    
        }
    
        public void stop(){
            FLAG=false;
        }
    
    • main方法
    public class ProducerConsumerBlockQueue {
        public static void main(String[] args) {
          DataSource dataSource= new DataSource(new SynchronousQueue(true));
          new Thread(() ->{
              System.out.println(Thread.currentThread().getName()+"\t生产线程启动");
              try {
                  dataSource.produce();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          },"producer").start();
    
            new Thread(() ->{
                System.out.println(Thread.currentThread().getName()+"\t消费线程启动");
                try {
                    dataSource.consumer();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },"consumer").start();
    
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("5s时间到,停止主线程");
            dataSource.stop();
        }
    }
    
    • 结果
    java.util.concurrent.SynchronousQueue
    producer    生产线程启动
    consumer    消费线程启动
    consumer    消费队列1成功
    producer    插入队列1成功
    producer    插入队列2成功
    consumer    消费队列2成功
    producer    插入队列3成功
    consumer    消费队列3成功
    consumer    消费队列4成功
    producer    插入队列4成功
    producer    插入队列5成功
    consumer    消费队列5成功
    5s时间到,停止主线程
    producer    叫停:表示FLAG=false,生产动作结束
    consumer    超过2S没有取到,退出
    
    • 注意:
      传入的参数可以是
    DataSource dataSource= new DataSource(new ArrayBlockingQueue(10));
    

    相关文章

      网友评论

          本文标题:通过阻塞队列实现线程间的通信

          本文链接:https://www.haomeiwen.com/subject/rcytoqtx.html