美文网首页
生产者消费者问题

生产者消费者问题

作者: Stringer | 来源:发表于2016-12-16 17:55 被阅读35次

    基于wait和notify的实现

    package shengchanzhexiaofeizhe;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class Test01 {
        public static void main(String[] args){
            List<Task> buffer=new ArrayList<>(Constants.MAX_BUFFER_SIZE);
            ExecutorService es=Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER+Constants.NUM_OF_PRODUCER);
            for(int i=1;i<=Constants.NUM_OF_PRODUCER;++i){
                es.execute(new Producer(buffer));
            }
            for(int i=1;i<=Constants.NUM_OF_CONSUMER;++i){
                es.execute(new Consumer(buffer));
            }
        }
    }
    
    /**
     * 公共常量
     * @author ZHANGTIANCHENG
     *
     */
    class Constants{
        public static final int MAX_BUFFER_SIZE = 10;
        public static final int NUM_OF_PRODUCER = 2;
        public static final int NUM_OF_CONSUMER = 3;
    }
    
    /**
     * 工作任务
     * @author ZHANGTIANCHENG
     *
     */
    class Task{
        private String id;
        
        public Task(){
            id=UUID.randomUUID().toString();
        }
        
        @Override
        public String toString(){
            return "Task["+id+"]";
        }
    }
    
    class Consumer implements Runnable{
        private List<Task> buffer;
        
        public Consumer(List<Task> buffer){
            this.buffer=buffer;
        }
    
        @Override
        public void run() {
            while(true){
                synchronized(buffer){
                    while(buffer.isEmpty()){
                        try{
                            buffer.wait();
                        }catch(InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                    Task task=buffer.remove(0);
                    buffer.notifyAll();
                    System.out.println("Consumer:"+Thread.currentThread().getName()+"]got"+task);
                }
            }
        }
        
    }
    
    class Producer implements Runnable{
        private List<Task> buffer;
        
        public Producer(List<Task> buffer){
            this.buffer=buffer;
        }
        
        @Override
        public void run() {
            while(true){
                synchronized(buffer){
                    while(buffer.size()>=Constants.MAX_BUFFER_SIZE){
                        try{
                            buffer.wait();
                        }catch(InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                    Task task=new Task();
                    buffer.add(task);
                    buffer.notifyAll();
                    System.out.println("Producer["+Thread.currentThread().getName()+"]put"+task);
                }
            }
        }
        
    }
    

    基于BlockingQueue的实现

    package shengchanzhexiaofeizhe;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.UUID;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Test01 {
        public static void main(String[] args){
            BlockingQueue<Task> buffer=new LinkedBlockingQueue<>(Constants.MAX_BUFFER_SIZE);
            ExecutorService es=Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER+Constants.NUM_OF_PRODUCER);
            for(int i=1;i<=Constants.NUM_OF_PRODUCER;++i){
                es.execute(new Producer(buffer));
            }
            for(int i=1;i<=Constants.NUM_OF_CONSUMER;++i){
                es.execute(new Consumer(buffer));
            }
        }
    }
    
    /**
     * 公共常量
     * @author ZHANGTIANCHENG
     *
     */
    class Constants{
        public static final int MAX_BUFFER_SIZE = 10;
        public static final int NUM_OF_PRODUCER = 2;
        public static final int NUM_OF_CONSUMER = 3;
    }
    
    /**
     * 工作任务
     * @author ZHANGTIANCHENG
     *
     */
    class Task{
        private String id;
        
        public Task(){
            id=UUID.randomUUID().toString();
        }
        
        @Override
        public String toString(){
            return "Task["+id+"]";
        }
    }
    
    class Consumer implements Runnable{
        private BlockingQueue<Task> buffer;
        
        public Consumer(BlockingQueue<Task> buffer){
            this.buffer=buffer;
        }
        
        @Override
        public void run() {
            while(true){
                try{
                    Task task=buffer.take();
                    System.out.println("Consumer["+Thread.currentThread().getName()+"]got"+task);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
        
    }
    
    class Producer implements Runnable{
        private BlockingQueue<Task> buffer;
        
        public Producer(BlockingQueue<Task> buffer){
            this.buffer=buffer;
        }
    
        @Override
        public void run() {
            while(true){
                try{
                    Task task=new Task();
                    buffer.put(task);
                    System.out.println("Producer["+Thread.currentThread().getName()+"]put"+task);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    

    相关文章

      网友评论

          本文标题:生产者消费者问题

          本文链接:https://www.haomeiwen.com/subject/fekgmttx.html