美文网首页
Java高并发容器的选择

Java高并发容器的选择

作者: Minority | 来源:发表于2020-03-06 17:21 被阅读0次

    Case1(ConcurrentHashMap ):

    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    import java.util.concurrent.CountDownLatch;
    
    public class T01_ConcurrentMap {
        public static void main(String[] args) {
    
            //以下的几种map,对外提供的api用法都是相同的,只不过是底层的实现不同
    
    
    
            Map<String,String> map= new Hashtable<>();
            /*Map<String,String> map= new ConcurrentHashMap<>();        //默认加锁的
            //Map<String,String> map= new ConcurrentSkipListMap<>();    //高并发并且排序,跳表结构
            //Map<String,String> map= new HashMap<>();                  //哈希表实现的,想要加锁就用collection.synchronizedXXX
           //Map<String, String> map1 = new TreeMap<>();              //树实现的,如果想在map中排好顺序,可以使用treemap,其底层是红黑树(AVL tree) */
    
    
            Random r=new Random();
            Thread[] ths=new Thread[100];  //线程数组:100个
            CountDownLatch latch=new CountDownLatch(ths.length);  //计数器为100的门闩
            long start=System.currentTimeMillis();   //记录起始时间
    
            for (int i = 0; i < ths.length; i++) {
                ths[i] = new Thread(()->{
                    for (int j = 0; j < 1000000; j++) {
                        //每次装10000个随机数
                        map.put("a"+r.nextInt(100000),"a"+r.nextInt(100000));
                        //门闩减一
                        latch.countDown();
                    }
                });
            }
    
            //线程数组启动
            Arrays.asList(ths).forEach(t->t.start());
    
          try {
               latch.await();
           } catch (InterruptedException e) {
              e.printStackTrace();
           }
    
            //计算执行时间
            long end=System.currentTimeMillis();
            System.out.println(end-start);
        }
    }
    
    

    以下的几种map,对外提供的api用法都是相同的,只不过是底层的实现不同,分别使用上面的程序打印时间为:

    效率对比:

    • Hashtable:70
    • ConcurrentHashMap:203
    • ConcurrentSkipListMap:61
    • HashMap:线程不安全
    • TreeMap:线程不安全

    hashtable 加锁是在整个容器上加锁,ConcurrentHashMap引入了分割(segmentation),不论它变得多么大,仅仅需要锁定map的某个部分,而其它的线程不需要等到迭代完成才能访问map。ConcurrentHashMap对整个容器分段(16段),加锁时只对其中的一段进行加锁。

    上面的代码执行结果实际上是ConcurrentHashMap>hashtable的,原因是ConcurrentHashMap抛弃了分段锁??是因为1.8之后,使用cas替换了分段锁。放弃了segment臃肿的设计,取而代之的是node + cas + synchronized

    Case2(CopyOnWriteArrayList):

    package ConcurrentContainer;
    
    
    import java.util.*;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    public class T02_CopyOnWriteList {
        public static void main(String[] args) {
            List<String> lists =  new CopyOnWriteArrayList<>();
            //new ArrayList<>();       //这个会出现并发问题,ArrayList是线程不安全的
            //new Vector<>();           //Vector的方法都是加了锁的
            
            Random r=new Random();
            Thread[] ths=new Thread[100];
    
            for (int i = 0; i < ths.length; i++) {
                Runnable task=new Runnable() {
                    @Override
                    public void run() {
                        for (int j = 0; j < 10000; j++) {
                            lists.add("a"+r.nextInt(10000));
                        }
                    }
                };
                ths[i]=new Thread(task);
            }
            runAndConputeTime(ths);
            System.out.println(lists.size());
        }
    
        static void runAndConputeTime(Thread[] ths){
            long s1=System.currentTimeMillis();
            Arrays.asList(ths).forEach(t->t.start());
            Arrays.asList(ths).forEach(t->{
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            long s2=System.currentTimeMillis();
            System.out.println(s2-s1);
        }
    }
    
    

    写时复制容器:当添加时,把整个容器的内容复制一份,然后再从末尾加上要添加的元素,再把引用指向新的上面。CopyOnWriteArrayList在多线程环境下,写时效率非常低,读时效率非常高。这个容器的业务场景:适合写少,读多的环境(比如事件监听器的并发)

    是上面的代码中ArrayList会出现并发问题,ArrayList是线程不安全的。Vector的方法都是加了锁的,所以不会出现线程安全的问题。效率比CopyOnWriteArrayList要高。

    Case3(Collections.synchronizedXXX):

    package ConcurrentContainer;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    //Collections.synchronizedXXX的功能就是接受一个不加锁的容器,返回一个加了锁的容器
    
    public class T03_SynchronizedList {
        //ArrayList是线程不安全的
        List<String> srts=new ArrayList<>();
        //通过Collections.synchronizedList一包装,返回一个线程安全的List
        List<String> strsSync= Collections.synchronizedList(srts);
        
    }
    

    上面提到,ArrayList会出现并发问题,ArrayList是线程不安全的,可以使用工具类Collections.synchronizedXXX对ArrayList进行封装,上例就是Collections.synchronizedList。Collections.synchronizedXXX的功能就是接受一个不加锁的容器,返回一个加了锁的容器。

    注意:Collection是接口,Collections是集合工具类。

    Case4(ConcurrentQueue):

    package ConcurrentContainer;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class T04_ConcurrentQueue {
        public static void main(String[] args) {
    
            //并发链表队列,无界队列,什么时候内存耗完了,什么时候报错
            Queue<String> strs=new ConcurrentLinkedQueue<>();
    
            //offer == add ,offer有一个返回值,可以用来判断其加成功了还是没加成功
            for (int i = 0; i < 10; i++) {
                strs.offer("a"+i);
            }
    
            System.out.println(strs);
    
            System.out.println(strs.size());
    
    
            //poll是拿出来并删掉
            System.out.println(strs.poll());
            System.out.println(strs.size());
    
            //peek是只拿出来,并不删除
            System.out.println(strs.peek());
            System.out.println(strs.size());
    
        }
    }
    
    /*====================output========================
    [a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
    10
    a0
    9
    a1
    9
    */
    

    ConcurrentQueue是由队列实现的并发容器,上面的例子使用的是ConcurrentLinkedQueue,底层有链表实现。并发链表队列是无界队列,什么时候内存耗完了,什么时候报错。

    知识点:

    • ConcurrentQueue的操作:
      1. offer:尾部添加元素,offer有一个返回值,可以用来判断其加成功了还是没加成功
      2. add:尾部添加元素,add无返回值
      3. poll:删除头元素,poll是返回头元素值并删掉
      4. peek:返回头元素值

    Case5(LinkedBlockingQueue):

    package ConcurrentContainer;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class T05_LinkedBlockingQueue {
    
        static BlockingQueue<String> strs=new LinkedBlockingQueue<>();
    
        static Random r=new Random();
    
        public static void main(String[] args) {
            new Thread(()->{
                for (int i = 0; i < 100; i++) {
                    try {
                        //阻塞式容器的方法:put = add = offer
                        strs.put("a"+i);    //如果满了,就会等待
                        TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"p1").start();
    
            for (int i = 0; i < 5; i++) {
                new Thread(()->{
                    //五个线程一直拿
                    for (;;){
                        try {
                            //take = poll = remove
                            System.out.println(Thread.currentThread().getName()+" take -"+strs.take()); //如果空了,就会等待
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },"c"+i).start();
            }
        }
    }
    
    

    LinkedBlockingQueue是无界队列(其实是默认有界,但是界的大小为Integer.MAX_VALUE),除非内存满了,否则可以一直向里面添加,同步阻塞式容器queue类似于生产者消费者模式。其中,由于LinkedBlockingQueue是无界队列,阻塞式的添加方法为put,阻塞式取出方法为take。

    Case6(ArrayBlockingQueue):

    package ConcurrentContainer;
    
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class T06_ArrayBlockingQueue {
       
        static BlockingQueue<String> strs=new ArrayBlockingQueue<>(10);
    
        static Random r=new Random();
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 10; i++) {
                strs.put("a"+i);
            }
    
            //strs.put("aaa");  //满了就会等待,程序阻塞,不动了
            //strs.add("aaa");  //add满了会报异常:java.lang.IllegalStateException: Queue full
            //strs.offer("aaa");  //满了不会报异常,但是不会把aaa加进去,返回布尔值
            strs.offer("aaa",1, TimeUnit.SECONDS); //按时间段进行阻塞
    
            System.out.println(strs);
        }
    }
    

    ArrayBlockingQueue与LinkedBlockingQueue的不同之处在于ArrayBlockingQueue是有界队列,长度固定,可以传递参数来设置容器的大小,LinkedBlockingQueue是无界队列。

    由于ArrayBlockingQueue是有界队列,所以添加方法有些差异:

    • strs.put("aaa"); //满了就会等待,程序阻塞,不动了
    • strs.add("aaa"); //add满了会报java.lang.IllegalStateException: Queue full异常
    • strs.offer("aaa"); //满了不会报异常,但是不会把aaa加进去,返回布尔值

    Case7(DelayQueue):

    package ConcurrentContainer;
    
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    
    public class T07_DelayQueue {
    
        static BlockingQueue<MyTask> tasks=new DelayQueue<>();
    
        /*
        *由于取的时候必须知道当前元素还有多长时间可以取出,所以,DelayQueue必须实现Delayed方法,在put的时候可以记录Delay的时间
        * */
        static class MyTask implements Delayed{
    
            long runningTime;
    
            MyTask(long rt){
                this.runningTime=rt;
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
    
                return unit.convert(runningTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
            }
    
            @Override
            public int compareTo(Delayed o) {
                if (this.getDelay(TimeUnit.MILLISECONDS)<o.getDelay(TimeUnit.MILLISECONDS)){
                    return -1;
                }else if (this.getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS)){
                    return 1;
                }
                return 0;
            }
    
            @Override
            public String toString() {
                return ""+runningTime;
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            long now=System.currentTimeMillis();
            MyTask t1=new MyTask(now+1000);
            MyTask t2=new MyTask(now+2000);
            MyTask t3=new MyTask(now+1500);
            MyTask t4=new MyTask(now+2500);
            MyTask t5=new MyTask(now+500);
    
            tasks.put(t1);
            tasks.put(t2);
            tasks.put(t3);
            tasks.put(t4);
            tasks.put(t5);
    
            System.out.println(tasks);
    
            for (int i = 0; i < 5; i++) {
                System.out.println(tasks.take());
            }
        }
    }
    
    /**output:
     * [1583484952740, 1583484953240, 1583484953740, 1583484954740, 1583484954240]
     * 1583484952740
     * 1583484953240
     * 1583484953740
     * 1583484954240
     * 1583484954740*/
    

    DelayQueue里面的每个元素都记录着还有多长时间可以被拿走。这个队列默认是排好序的,等待时间最长的先往外拿。DelayQueue用作定时任务。由于取的时候必须知道当前元素还有多长时间可以取出,所以,DelayQueue必须实现Delayed方法,在put的时候可以记录Delay的时间。

    Case8(LinkedTransferQueue):

    package ConcurrentContainer;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    
    public class T08_TransferQueue {
        public static void main(String[] args) throws InterruptedException {
    
            //LinkedTransferQueue提供了一个特殊的方法transfer。其他的和别Queue差不多
            LinkedTransferQueue<String> strs=new LinkedTransferQueue<>();
            strs.add("d");
            //消费者先启动,在这儿等着拿数据
            new Thread(()->{
                try {
                    System.out.println(strs.take());
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            strs.transfer("aaa");
    
            /*
            * 如果是生产者先启动,调用transfer,然后再启动消费者,那么下面的代码将不会执行,因为调用transfer会进行阻塞
            * */
    
    
            /*LinkedTransferQueue也提供了put方法,调用put就不会阻塞了
            * strs.put("aaa");
            * */
    
            /*new Thread(()->{
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();*/
        }
    }
    
    

    LinkedTransferQueue提供了一个特殊的方法transfer。其他的和别Queue差不多。

    transfer的作用:比如启动了n和消费者,都等待着从队列中取,这时,生产者产出的东西直接会给消费者,而不再往队列里面扔啦。

    transfer相当于“生产者”,LinkedTransferQueue也提供了put方法,调用put就不会阻塞了。

    使用场景:用在更高的并发场景,不用往队列里面扔了,提高了并发的效率。避免生产者产生的消息过度,在游戏服务器转发消息的时候用的比较多。

    Case9(SynchronousQueue):

    package ConcurrentContainer;
    
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    public class T09_SynchronusQueue {  //容量为0
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> strs=new SynchronousQueue<>();
    
            new Thread(()->{
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            strs.put("aaa");  //阻塞等待消费者消费,put里面就是用的transfer
            //strs.add("aaa");    //调用add是往队列中加元素,会报错:java.lang.IllegalStateException: Queue full
            System.out.println(strs.size());
        }
    }
    /*====================output========================
    0
    aaa
    */
    

    SynchronousQueue是一种特殊的TransferQueue。TransferQueue中如果生产者生产了东西,并没有消费者来消费,这时候如果队列为空,还可以把生产的元素放到队列里面。SynchronousQueue叫做没有容量的队列:即生产者生产的东西必须消费者马上进行消费掉,如果不消费掉就会阻塞生产者。

    在Case8中,还可以执行add方法向LinkedTransferQueue中添加元素供消费者消费,但是SynchronousQueue的size为0。只能使用put进行阻塞式添加。

    总结:Java提供的高并发容器的选择

    1.对于map/set的使用(下面的所有Map都可以换成set,下面的类型只是底层实现不同)
    HashMap
    TreeMap
    LinkedHashMap
    
    
    //如果并发性不太高的情况下:使用下面这两个
    Hashtable
    Collections.synchronizedXXX   //Hashtable之前如果加上Collections.synchronized,说明是对Hashtable进行了包装(加锁),然后返回一个加锁后的容器
    
    
    //如果并发性比较高的情况下,使用ConcurrentHashMap
    ConcurrentHashMap
    
    //如果并发性比较高的情况下,并要求排序的情况下,使用ConcurrentSkipListMap
    ConcurrentSkipListMap
    
    2.在使用队列的情况下
    
    //并发量比较低,不需要同步的队列
    ArrayList
    LinkedList
    
    //写的时候非常少,读的时候非常多
    CopyOnWriteList
    
    //并发量比较低,需要同步
    Collections.synchronizedXXX
    Vector
    
    Queue:
        //在高并发的时候可以使用两种队列
        CocurrentLinkedQueue  //内部加锁
        BlockingQueue          //阻塞式队列
            LinkedBlockingQueue   //阻塞式无界队列
            ArrayBlockingQueue    //阻塞式有界队列
            TransferQueue        //直接Productor transfer to consumer
            SynchronusQueue      //队列容量为0的TransferQueue
    
        DelayQueue执行定时任务
    

    相关文章

      网友评论

          本文标题:Java高并发容器的选择

          本文链接:https://www.haomeiwen.com/subject/eenirhtx.html