Exchanger使用

作者: 桐桑入梦 | 来源:发表于2020-03-20 12:53 被阅读0次

    生成器:

    public interface Generator<T> {
        T next();
    }
    
    public class BasicGenerator<T> implements Generator<T>{
        private Class<T> type;
        public BasicGenerator(Class<T> type){
            this.type = type;
        }
        public T next(){
            try{
                return type.getDeclaredConstructor().newInstance();
            }catch(Exception e){
                throw new RuntimeException(e);
            }
        }
    
        public static <T> Generator<T> create(Class<T> type){
            return new BasicGenerator<>(type);
        }
    }
    
    class ExchangerProducer<T> implements Runnable{
        //用于创建T类型对象
        private Generator<T> generator;
        //用于交换保存了多个T类型对象容器的Exchanger
        private Exchanger<List<T>> exchanger;
        //保存多个T类型对象的容器
        private List<T> holder;
    
        public ExchangerProducer(Exchanger<List<T>> exchanger, Generator<T> generator, List<T> holder){
            this.generator = generator;
            this.exchanger = exchanger;
            this.holder = holder;
        }
    
        @Override
        public void run(){
            try{
                while(!Thread.interrupted()){
                    //生产n个对象
                    for(int i = 0; i < ExchangerDemo.size; i++)
                        holder.add(generator.next());
                    //交换对象的列表,用满的交换成空的容器
                    holder = exchanger.exchange(holder);
                }
            }catch(InterruptedException e){
                //异常处理
            }
        }
    }
    
    class ExchangerConsumer<T> implements Runnable{
        //用于交换保存了多个T类型对象容器的Exchanger
        private Exchanger<List<T>> exchanger;
        private List<T> holder;
        private volatile T value;
    
        public ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> holder){
            this.exchanger = exchanger;
            this.holder = holder;
        }
    
        @Override
        public void run(){
            try{
                while(!Thread.interrupted()){
                    //进行交换,如果对方没有执行exchange,那么阻塞等待
                    holder = exchanger.exchange(holder);
                    //处理数据
                    System.out.println(holder);
                    for(T item : holder){
                        value = item;
                        holder.remove(item);
                    }
                }
            }catch(InterruptedException e){
                //异常处理
            }
            System.out.println("Final value = " + value);
        }
    }
    
    public class ExchangerDemo {
        public static int size = 10;
        public static int delay = 5;
    
        public static void main(String[] args) throws InterruptedException{
            //设置存储对象的容器的大小和延迟时间
            if(args.length > 0)
                size = Integer.parseInt(args[0]);
            if(args.length > 1)
                delay = Integer.parseInt(args[1]);
    
            //创建线程池
            ExecutorService exec = Executors.newCachedThreadPool();
            //创建Exchanger,用来交换两个List<Fat>对象
            Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
            //这种List允许在列表遍历的时候调用remove(),使用redis快照相同的copy on write技术
            List<Fat> producerList = new CopyOnWriteArrayList<Fat>();
            List<Fat> consumerList = new CopyOnWriteArrayList<Fat>();
            //提交生产者和消费者任务
            exec.execute(new ExchangerProducer<Fat>(xc, BasicGenerator.create(Fat.class), producerList));
            exec.execute(new ExchangerConsumer<Fat>(xc, consumerList));
    
            TimeUnit.SECONDS.sleep(delay);
            exec.shutdownNow();
        }
    }
    
    

    Copy-On-Write思想

    相关文章

      网友评论

        本文标题:Exchanger使用

        本文链接:https://www.haomeiwen.com/subject/pundyhtx.html