美文网首页
java并发编程 - 6 - 并发容器

java并发编程 - 6 - 并发容器

作者: cf6bfeab5260 | 来源:发表于2019-05-15 15:14 被阅读0次

我们的容器,分为了List、Set、Map、Queue四大类。并不是所有容器都是线程安全的,比如我们经常用到的HashMap,ArrayList等就不是,java Collections类还专门提供了把非线程安全的容器转换成线程安全的容器的方法:

public  void collectionTest1(){
        //把非线程安全的容器,转换成线程安全的容器
        Map<String,String> map=new HashMap<String,String>();

        map=Collections.synchronizedMap(map);

        //List等其他的用法也一样。
    }

而线程安全的容器,分为了两大类:同步容器并发容器。早期的jdk的线程安全的容器,基本都是同步容器,如Vector、HashTable。同步容器虽然能保证线程安全,但是缺点也很明显,就是效率较低。所以java并发包里提供了并发容器:

image.png
并发容器也是这4大类,对每一类针对不同的场景,提供了不同的实现。
image.png
image.png
image.png
image.png
值得一提的是,非阻塞队列用 cas 替代了锁的方案,效率更高。
基本用法:
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class CollectionTest {
    @Test
    public void listTest(){
        CopyOnWriteArrayList list=new CopyOnWriteArrayList();
        list.add("CopyOnWriteArrayList");
        log.info(list.get(0).toString());
    }

    @Test
    public void mapTest(){
        ConcurrentHashMap concurrentHashMap=new ConcurrentHashMap();
        concurrentHashMap.put("concurrentHashMap","concurrentHashMap value");
        log.info(concurrentHashMap.get("concurrentHashMap").toString());


        ConcurrentSkipListMap concurrentSkipListMap=new ConcurrentSkipListMap();
        concurrentSkipListMap.put(2,"concurrentSkipListMap value2");
        concurrentSkipListMap.put(1,"concurrentSkipListMap value1");
        concurrentSkipListMap.put(3,"concurrentSkipListMap value3");

        log.info(concurrentSkipListMap.firstEntry().getValue().toString());

    }

    @Test
    public void setTest(){
        Set copyOnWriteArraySet=new CopyOnWriteArraySet();
        copyOnWriteArraySet.add(2);
        copyOnWriteArraySet.add(1);
        copyOnWriteArraySet.add(3);
        copyOnWriteArraySet.add(22222);
        copyOnWriteArraySet.add(111);
        copyOnWriteArraySet.add(33);
        Iterator it=copyOnWriteArraySet.iterator();
        while(it.hasNext()){
            log.info("copyOnWriteArraySet:"+it.next());
        }

        Set concurrentSkipListSet=new ConcurrentSkipListSet();
        concurrentSkipListSet.add(2);
        concurrentSkipListSet.add(1);
        concurrentSkipListSet.add(3);
        Iterator it1=concurrentSkipListSet.iterator();
        while(it1.hasNext()){
            log.info("concurrentSkipListSet:"+it1.next());
        }

    }


    @Test
    public void linkedBlockingQueueTest() throws InterruptedException {
        LinkedBlockingQueue queue=new LinkedBlockingQueue();

        Thread t1=new Thread(()->{
            //每隔往队列理塞一个元素
            int i=0;
            for(i=0;i<5;i++){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    log.info(i+" 开始加入队列");
                    queue.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info(i+" 加入队列成功");

            }
        });

        Thread t2=new Thread(()->{
            //取出队列里的元素
            int i=0;
            for(i=0;i<5;i++){
                int n= 0;
                try {
                    n = (int)queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("从队列取出:"+n);


            }
        });
        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }


    @Test
    public void arrayBlockingQueueTest() throws InterruptedException {
        ArrayBlockingQueue queue=new ArrayBlockingQueue(10);

        Thread t1=new Thread(()->{
            //每隔往队列理塞一个元素
            int i=0;
            for(i=0;i<5;i++){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    log.info(i+" 开始加入队列");
                    queue.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info(i+" 加入队列成功");

            }
        });

        Thread t2=new Thread(()->{
            //取出队列里的元素
            int i=0;
            for(i=0;i<5;i++){
                int n= 0;
                try {
                    n = (int)queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("从队列取出:"+n);


            }
        });
        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }

    @Test
    public void synchronousQueueTest() throws InterruptedException {
        SynchronousQueue queue=new SynchronousQueue();

        Thread t1=new Thread(()->{
            //每隔往队列理塞一个元素
            int i=0;
            for(i=0;i<5;i++){
                try {
                    log.info(i+" 开始加入队列");
                    queue.put(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info(i+" 加入队列成功");

            }
        });

        Thread t2=new Thread(()->{
            //取出队列里的元素
            int i=0;
            for(i=0;i<5;i++){
                int n= 0;
                try {
                    Thread.sleep(5000);
                    n = (int)queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("从队列取出:"+n);


            }
        });
        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }


    @Test
    public void linkedTransferQueueTest() throws InterruptedException {
        LinkedTransferQueue queue=new LinkedTransferQueue();

        Thread t1=new Thread(()->{
            //每隔往队列理塞一个元素
            int i=0;
            for(i=0;i<5;i++){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info(i+" 开始加入队列");
                boolean stat=true;
//                queue.put(i);
                try {
                    queue.transfer(i);//若当前有消费者线程等待,则直接把元素交给消费者,否则加入队列尾部,并计入阻塞状态,直到元素被接走
//                    stat=queue.tryTransfer(i);//若当前有消费者线程等待,则直接把元素交给消费者,并返回true,否则返回false
                    //sta=queue.tryTransfer(i,1, TimeUnit.SECONDS);//同上,只是会等待一段时间再返回true or false

                } catch (Exception e) {
                    e.printStackTrace();
                }
                if(stat){
                    log.info(i+" 加入队列成功");
                }else{
                    log.info(i+" 加入队列失败");
                }


            }
        });

//        Thread t2=new Thread(()->{
//            //取出队列里的元素
//            int i=0;
//            for(i=0;i<5;i++){
//                int n= 0;
//                try {
//                    n = (int)queue.take();
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                log.info("从队列取出:"+n);
//
//
//            }
//        });
        t1.start();
//        t2.start();

        t1.join();
//        t2.join();
    }


    @Test
    public void priorityBlockingQueueTest() throws InterruptedException {
        PriorityBlockingQueue queue=new PriorityBlockingQueue();

        Thread t1=new Thread(()->{
            //每隔往队列理塞一个元素
            int i=5;
            for(i=5;i>0;i--){

                try {
                    log.info(i+" 开始加入队列");
                    queue.put(i);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                log.info(i+" 加入队列成功");

            }
        });

        Thread t2=new Thread(()->{
            //取出队列里的元素
            int i=0;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for(i=0;i<5;i++){
                int n= 0;
                try {
                    n = (int)queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("从队列取出:"+n);


            }
        });
        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }

    @Test
    public void delayQueueTest() throws InterruptedException {
        DelayQueue queue=new DelayQueue();
        Thread t1=new Thread(()->{
            //每隔往队列理塞一个元素
            int i=0;
            for(i=0;i<5;i++){
                User user=new User(i,5000);//延迟5s
                try {
                    log.info(i+" 开始加入队列");
                    queue.put(user);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                log.info(user+" 加入队列成功");

            }
        });

        Thread t2=new Thread(()->{
            //取出队列里的元素
           while (true){
               User user=null;
               try {
                   user=(User)queue.take();
               } catch (Exception e) {
                   e.printStackTrace();
               }
               log.info("从队列取出:"+user);
           }
        });
        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }

    @Test
    public void linkedBlockingDequeTest() throws InterruptedException {
        LinkedBlockingDeque queue=new LinkedBlockingDeque(10);

        Thread t1=new Thread(()->{
            //每隔往队列理塞一个元素
            int i=0;
            for(i=0;i<5;i++){
                try {
                    log.info(i+" 开始加入队列");
                    queue.putFirst(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info(i+" 加入队列成功");

            }
        });

        Thread t2=new Thread(()->{
            //取出队列里的元素
            int i=0;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for(i=0;i<5;i++){

                int n= 0;
                try {
                    n = (int)queue.takeFirst();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("从队列取出:"+n);


            }
        });
        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }

    @Test
    public void concurrentLinkedQueueTest() throws InterruptedException {
        ConcurrentLinkedQueue queue=new ConcurrentLinkedQueue();

        Thread t1=new Thread(()->{
            //每隔往队列理塞一个元素
            int i=0;
            for(i=0;i<5;i++){
                try {
                    log.info(i+" 开始加入队列");
                    queue.add(i);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                log.info(i+" 加入队列成功");

            }
        });

        Thread t2=new Thread(()->{
            //取出队列里的元素
            int i=0;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for(i=0;i<6;i++){

                int n= 0;
                try {
                    n = (int)queue.poll();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                log.info("从队列取出:"+n);


            }
        });
        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }
}

下一章 java并发编程 - 7 -线程池

相关文章

网友评论

      本文标题:java并发编程 - 6 - 并发容器

      本文链接:https://www.haomeiwen.com/subject/dldeaqtx.html