美文网首页
java并发编程 - 6 - 并发容器

java并发编程 - 6 - 并发容器

作者: cf6bfeab5260 | 来源:发表于2019-05-15 15:14 被阅读0次

    我们的容器,分为了List、Set、Map、Queue四大类。并不是所有容器都是线程安全的,比如我们经常用到的HashMap,ArrayList等就不是,java Collections类还专门提供了把非线程安全的容器转换成线程安全的容器的方法:

    public  void collectionTest1(){
            //把非线程安全的容器,转换成线程安全的容器
            Map<String,String> map=new HashMap<String,String>();
    
            map=Collections.synchronizedMap(map);
    
            //List等其他的用法也一样。
        }
    

    而线程安全的容器,分为了两大类:同步容器并发容器。早期的jdk的线程安全的容器,基本都是同步容器,如Vector、HashTable。同步容器虽然能保证线程安全,但是缺点也很明显,就是效率较低。所以java并发包里提供了并发容器:

    image.png
    并发容器也是这4大类,对每一类针对不同的场景,提供了不同的实现。
    image.png
    image.png
    image.png
    image.png
    值得一提的是,非阻塞队列用 cas 替代了锁的方案,效率更高。
    基本用法:
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @Slf4j
    public class CollectionTest {
        @Test
        public void listTest(){
            CopyOnWriteArrayList list=new CopyOnWriteArrayList();
            list.add("CopyOnWriteArrayList");
            log.info(list.get(0).toString());
        }
    
        @Test
        public void mapTest(){
            ConcurrentHashMap concurrentHashMap=new ConcurrentHashMap();
            concurrentHashMap.put("concurrentHashMap","concurrentHashMap value");
            log.info(concurrentHashMap.get("concurrentHashMap").toString());
    
    
            ConcurrentSkipListMap concurrentSkipListMap=new ConcurrentSkipListMap();
            concurrentSkipListMap.put(2,"concurrentSkipListMap value2");
            concurrentSkipListMap.put(1,"concurrentSkipListMap value1");
            concurrentSkipListMap.put(3,"concurrentSkipListMap value3");
    
            log.info(concurrentSkipListMap.firstEntry().getValue().toString());
    
        }
    
        @Test
        public void setTest(){
            Set copyOnWriteArraySet=new CopyOnWriteArraySet();
            copyOnWriteArraySet.add(2);
            copyOnWriteArraySet.add(1);
            copyOnWriteArraySet.add(3);
            copyOnWriteArraySet.add(22222);
            copyOnWriteArraySet.add(111);
            copyOnWriteArraySet.add(33);
            Iterator it=copyOnWriteArraySet.iterator();
            while(it.hasNext()){
                log.info("copyOnWriteArraySet:"+it.next());
            }
    
            Set concurrentSkipListSet=new ConcurrentSkipListSet();
            concurrentSkipListSet.add(2);
            concurrentSkipListSet.add(1);
            concurrentSkipListSet.add(3);
            Iterator it1=concurrentSkipListSet.iterator();
            while(it1.hasNext()){
                log.info("concurrentSkipListSet:"+it1.next());
            }
    
        }
    
    
        @Test
        public void linkedBlockingQueueTest() throws InterruptedException {
            LinkedBlockingQueue queue=new LinkedBlockingQueue();
    
            Thread t1=new Thread(()->{
                //每隔往队列理塞一个元素
                int i=0;
                for(i=0;i<5;i++){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        log.info(i+" 开始加入队列");
                        queue.put(i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info(i+" 加入队列成功");
    
                }
            });
    
            Thread t2=new Thread(()->{
                //取出队列里的元素
                int i=0;
                for(i=0;i<5;i++){
                    int n= 0;
                    try {
                        n = (int)queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("从队列取出:"+n);
    
    
                }
            });
            t1.start();
            t2.start();
    
            t1.join();
            t2.join();
        }
    
    
        @Test
        public void arrayBlockingQueueTest() throws InterruptedException {
            ArrayBlockingQueue queue=new ArrayBlockingQueue(10);
    
            Thread t1=new Thread(()->{
                //每隔往队列理塞一个元素
                int i=0;
                for(i=0;i<5;i++){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        log.info(i+" 开始加入队列");
                        queue.put(i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info(i+" 加入队列成功");
    
                }
            });
    
            Thread t2=new Thread(()->{
                //取出队列里的元素
                int i=0;
                for(i=0;i<5;i++){
                    int n= 0;
                    try {
                        n = (int)queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("从队列取出:"+n);
    
    
                }
            });
            t1.start();
            t2.start();
    
            t1.join();
            t2.join();
        }
    
        @Test
        public void synchronousQueueTest() throws InterruptedException {
            SynchronousQueue queue=new SynchronousQueue();
    
            Thread t1=new Thread(()->{
                //每隔往队列理塞一个元素
                int i=0;
                for(i=0;i<5;i++){
                    try {
                        log.info(i+" 开始加入队列");
                        queue.put(i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info(i+" 加入队列成功");
    
                }
            });
    
            Thread t2=new Thread(()->{
                //取出队列里的元素
                int i=0;
                for(i=0;i<5;i++){
                    int n= 0;
                    try {
                        Thread.sleep(5000);
                        n = (int)queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("从队列取出:"+n);
    
    
                }
            });
            t1.start();
            t2.start();
    
            t1.join();
            t2.join();
        }
    
    
        @Test
        public void linkedTransferQueueTest() throws InterruptedException {
            LinkedTransferQueue queue=new LinkedTransferQueue();
    
            Thread t1=new Thread(()->{
                //每隔往队列理塞一个元素
                int i=0;
                for(i=0;i<5;i++){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info(i+" 开始加入队列");
                    boolean stat=true;
    //                queue.put(i);
                    try {
                        queue.transfer(i);//若当前有消费者线程等待,则直接把元素交给消费者,否则加入队列尾部,并计入阻塞状态,直到元素被接走
    //                    stat=queue.tryTransfer(i);//若当前有消费者线程等待,则直接把元素交给消费者,并返回true,否则返回false
                        //sta=queue.tryTransfer(i,1, TimeUnit.SECONDS);//同上,只是会等待一段时间再返回true or false
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if(stat){
                        log.info(i+" 加入队列成功");
                    }else{
                        log.info(i+" 加入队列失败");
                    }
    
    
                }
            });
    
    //        Thread t2=new Thread(()->{
    //            //取出队列里的元素
    //            int i=0;
    //            for(i=0;i<5;i++){
    //                int n= 0;
    //                try {
    //                    n = (int)queue.take();
    //                } catch (InterruptedException e) {
    //                    e.printStackTrace();
    //                }
    //                log.info("从队列取出:"+n);
    //
    //
    //            }
    //        });
            t1.start();
    //        t2.start();
    
            t1.join();
    //        t2.join();
        }
    
    
        @Test
        public void priorityBlockingQueueTest() throws InterruptedException {
            PriorityBlockingQueue queue=new PriorityBlockingQueue();
    
            Thread t1=new Thread(()->{
                //每隔往队列理塞一个元素
                int i=5;
                for(i=5;i>0;i--){
    
                    try {
                        log.info(i+" 开始加入队列");
                        queue.put(i);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    log.info(i+" 加入队列成功");
    
                }
            });
    
            Thread t2=new Thread(()->{
                //取出队列里的元素
                int i=0;
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for(i=0;i<5;i++){
                    int n= 0;
                    try {
                        n = (int)queue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("从队列取出:"+n);
    
    
                }
            });
            t1.start();
            t2.start();
    
            t1.join();
            t2.join();
        }
    
        @Test
        public void delayQueueTest() throws InterruptedException {
            DelayQueue queue=new DelayQueue();
            Thread t1=new Thread(()->{
                //每隔往队列理塞一个元素
                int i=0;
                for(i=0;i<5;i++){
                    User user=new User(i,5000);//延迟5s
                    try {
                        log.info(i+" 开始加入队列");
                        queue.put(user);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    log.info(user+" 加入队列成功");
    
                }
            });
    
            Thread t2=new Thread(()->{
                //取出队列里的元素
               while (true){
                   User user=null;
                   try {
                       user=(User)queue.take();
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
                   log.info("从队列取出:"+user);
               }
            });
            t1.start();
            t2.start();
    
            t1.join();
            t2.join();
        }
    
        @Test
        public void linkedBlockingDequeTest() throws InterruptedException {
            LinkedBlockingDeque queue=new LinkedBlockingDeque(10);
    
            Thread t1=new Thread(()->{
                //每隔往队列理塞一个元素
                int i=0;
                for(i=0;i<5;i++){
                    try {
                        log.info(i+" 开始加入队列");
                        queue.putFirst(i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info(i+" 加入队列成功");
    
                }
            });
    
            Thread t2=new Thread(()->{
                //取出队列里的元素
                int i=0;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for(i=0;i<5;i++){
    
                    int n= 0;
                    try {
                        n = (int)queue.takeFirst();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    log.info("从队列取出:"+n);
    
    
                }
            });
            t1.start();
            t2.start();
    
            t1.join();
            t2.join();
        }
    
        @Test
        public void concurrentLinkedQueueTest() throws InterruptedException {
            ConcurrentLinkedQueue queue=new ConcurrentLinkedQueue();
    
            Thread t1=new Thread(()->{
                //每隔往队列理塞一个元素
                int i=0;
                for(i=0;i<5;i++){
                    try {
                        log.info(i+" 开始加入队列");
                        queue.add(i);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    log.info(i+" 加入队列成功");
    
                }
            });
    
            Thread t2=new Thread(()->{
                //取出队列里的元素
                int i=0;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for(i=0;i<6;i++){
    
                    int n= 0;
                    try {
                        n = (int)queue.poll();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    log.info("从队列取出:"+n);
    
    
                }
            });
            t1.start();
            t2.start();
    
            t1.join();
            t2.join();
        }
    }
    

    下一章 java并发编程 - 7 -线程池

    相关文章

      网友评论

          本文标题:java并发编程 - 6 - 并发容器

          本文链接:https://www.haomeiwen.com/subject/dldeaqtx.html