美文网首页
Java高并发容器的选择

Java高并发容器的选择

作者: Minority | 来源:发表于2020-03-06 17:21 被阅读0次

Case1(ConcurrentHashMap ):

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;

public class T01_ConcurrentMap {
    public static void main(String[] args) {

        //以下的几种map,对外提供的api用法都是相同的,只不过是底层的实现不同



        Map<String,String> map= new Hashtable<>();
        /*Map<String,String> map= new ConcurrentHashMap<>();        //默认加锁的
        //Map<String,String> map= new ConcurrentSkipListMap<>();    //高并发并且排序,跳表结构
        //Map<String,String> map= new HashMap<>();                  //哈希表实现的,想要加锁就用collection.synchronizedXXX
       //Map<String, String> map1 = new TreeMap<>();              //树实现的,如果想在map中排好顺序,可以使用treemap,其底层是红黑树(AVL tree) */


        Random r=new Random();
        Thread[] ths=new Thread[100];  //线程数组:100个
        CountDownLatch latch=new CountDownLatch(ths.length);  //计数器为100的门闩
        long start=System.currentTimeMillis();   //记录起始时间

        for (int i = 0; i < ths.length; i++) {
            ths[i] = new Thread(()->{
                for (int j = 0; j < 1000000; j++) {
                    //每次装10000个随机数
                    map.put("a"+r.nextInt(100000),"a"+r.nextInt(100000));
                    //门闩减一
                    latch.countDown();
                }
            });
        }

        //线程数组启动
        Arrays.asList(ths).forEach(t->t.start());

      try {
           latch.await();
       } catch (InterruptedException e) {
          e.printStackTrace();
       }

        //计算执行时间
        long end=System.currentTimeMillis();
        System.out.println(end-start);
    }
}

以下的几种map,对外提供的api用法都是相同的,只不过是底层的实现不同,分别使用上面的程序打印时间为:

效率对比:

  • Hashtable:70
  • ConcurrentHashMap:203
  • ConcurrentSkipListMap:61
  • HashMap:线程不安全
  • TreeMap:线程不安全

hashtable 加锁是在整个容器上加锁,ConcurrentHashMap引入了分割(segmentation),不论它变得多么大,仅仅需要锁定map的某个部分,而其它的线程不需要等到迭代完成才能访问map。ConcurrentHashMap对整个容器分段(16段),加锁时只对其中的一段进行加锁。

上面的代码执行结果实际上是ConcurrentHashMap>hashtable的,原因是ConcurrentHashMap抛弃了分段锁??是因为1.8之后,使用cas替换了分段锁。放弃了segment臃肿的设计,取而代之的是node + cas + synchronized

Case2(CopyOnWriteArrayList):

package ConcurrentContainer;


import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class T02_CopyOnWriteList {
    public static void main(String[] args) {
        List<String> lists =  new CopyOnWriteArrayList<>();
        //new ArrayList<>();       //这个会出现并发问题,ArrayList是线程不安全的
        //new Vector<>();           //Vector的方法都是加了锁的
        
        Random r=new Random();
        Thread[] ths=new Thread[100];

        for (int i = 0; i < ths.length; i++) {
            Runnable task=new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 10000; j++) {
                        lists.add("a"+r.nextInt(10000));
                    }
                }
            };
            ths[i]=new Thread(task);
        }
        runAndConputeTime(ths);
        System.out.println(lists.size());
    }

    static void runAndConputeTime(Thread[] ths){
        long s1=System.currentTimeMillis();
        Arrays.asList(ths).forEach(t->t.start());
        Arrays.asList(ths).forEach(t->{
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        long s2=System.currentTimeMillis();
        System.out.println(s2-s1);
    }
}

写时复制容器:当添加时,把整个容器的内容复制一份,然后再从末尾加上要添加的元素,再把引用指向新的上面。CopyOnWriteArrayList在多线程环境下,写时效率非常低,读时效率非常高。这个容器的业务场景:适合写少,读多的环境(比如事件监听器的并发)

是上面的代码中ArrayList会出现并发问题,ArrayList是线程不安全的。Vector的方法都是加了锁的,所以不会出现线程安全的问题。效率比CopyOnWriteArrayList要高。

Case3(Collections.synchronizedXXX):

package ConcurrentContainer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

//Collections.synchronizedXXX的功能就是接受一个不加锁的容器,返回一个加了锁的容器

public class T03_SynchronizedList {
    //ArrayList是线程不安全的
    List<String> srts=new ArrayList<>();
    //通过Collections.synchronizedList一包装,返回一个线程安全的List
    List<String> strsSync= Collections.synchronizedList(srts);
    
}

上面提到,ArrayList会出现并发问题,ArrayList是线程不安全的,可以使用工具类Collections.synchronizedXXX对ArrayList进行封装,上例就是Collections.synchronizedList。Collections.synchronizedXXX的功能就是接受一个不加锁的容器,返回一个加了锁的容器。

注意:Collection是接口,Collections是集合工具类。

Case4(ConcurrentQueue):

package ConcurrentContainer;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class T04_ConcurrentQueue {
    public static void main(String[] args) {

        //并发链表队列,无界队列,什么时候内存耗完了,什么时候报错
        Queue<String> strs=new ConcurrentLinkedQueue<>();

        //offer == add ,offer有一个返回值,可以用来判断其加成功了还是没加成功
        for (int i = 0; i < 10; i++) {
            strs.offer("a"+i);
        }

        System.out.println(strs);

        System.out.println(strs.size());


        //poll是拿出来并删掉
        System.out.println(strs.poll());
        System.out.println(strs.size());

        //peek是只拿出来,并不删除
        System.out.println(strs.peek());
        System.out.println(strs.size());

    }
}

/*====================output========================
[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
10
a0
9
a1
9
*/

ConcurrentQueue是由队列实现的并发容器,上面的例子使用的是ConcurrentLinkedQueue,底层有链表实现。并发链表队列是无界队列,什么时候内存耗完了,什么时候报错。

知识点:

  • ConcurrentQueue的操作:
    1. offer:尾部添加元素,offer有一个返回值,可以用来判断其加成功了还是没加成功
    2. add:尾部添加元素,add无返回值
    3. poll:删除头元素,poll是返回头元素值并删掉
    4. peek:返回头元素值

Case5(LinkedBlockingQueue):

package ConcurrentContainer;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class T05_LinkedBlockingQueue {

    static BlockingQueue<String> strs=new LinkedBlockingQueue<>();

    static Random r=new Random();

    public static void main(String[] args) {
        new Thread(()->{
            for (int i = 0; i < 100; i++) {
                try {
                    //阻塞式容器的方法:put = add = offer
                    strs.put("a"+i);    //如果满了,就会等待
                    TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"p1").start();

        for (int i = 0; i < 5; i++) {
            new Thread(()->{
                //五个线程一直拿
                for (;;){
                    try {
                        //take = poll = remove
                        System.out.println(Thread.currentThread().getName()+" take -"+strs.take()); //如果空了,就会等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"c"+i).start();
        }
    }
}

LinkedBlockingQueue是无界队列(其实是默认有界,但是界的大小为Integer.MAX_VALUE),除非内存满了,否则可以一直向里面添加,同步阻塞式容器queue类似于生产者消费者模式。其中,由于LinkedBlockingQueue是无界队列,阻塞式的添加方法为put,阻塞式取出方法为take。

Case6(ArrayBlockingQueue):

package ConcurrentContainer;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class T06_ArrayBlockingQueue {
   
    static BlockingQueue<String> strs=new ArrayBlockingQueue<>(10);

    static Random r=new Random();

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            strs.put("a"+i);
        }

        //strs.put("aaa");  //满了就会等待,程序阻塞,不动了
        //strs.add("aaa");  //add满了会报异常:java.lang.IllegalStateException: Queue full
        //strs.offer("aaa");  //满了不会报异常,但是不会把aaa加进去,返回布尔值
        strs.offer("aaa",1, TimeUnit.SECONDS); //按时间段进行阻塞

        System.out.println(strs);
    }
}

ArrayBlockingQueue与LinkedBlockingQueue的不同之处在于ArrayBlockingQueue是有界队列,长度固定,可以传递参数来设置容器的大小,LinkedBlockingQueue是无界队列。

由于ArrayBlockingQueue是有界队列,所以添加方法有些差异:

  • strs.put("aaa"); //满了就会等待,程序阻塞,不动了
  • strs.add("aaa"); //add满了会报java.lang.IllegalStateException: Queue full异常
  • strs.offer("aaa"); //满了不会报异常,但是不会把aaa加进去,返回布尔值

Case7(DelayQueue):

package ConcurrentContainer;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;


public class T07_DelayQueue {

    static BlockingQueue<MyTask> tasks=new DelayQueue<>();

    /*
    *由于取的时候必须知道当前元素还有多长时间可以取出,所以,DelayQueue必须实现Delayed方法,在put的时候可以记录Delay的时间
    * */
    static class MyTask implements Delayed{

        long runningTime;

        MyTask(long rt){
            this.runningTime=rt;
        }

        @Override
        public long getDelay(TimeUnit unit) {

            return unit.convert(runningTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            if (this.getDelay(TimeUnit.MILLISECONDS)<o.getDelay(TimeUnit.MILLISECONDS)){
                return -1;
            }else if (this.getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS)){
                return 1;
            }
            return 0;
        }

        @Override
        public String toString() {
            return ""+runningTime;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        long now=System.currentTimeMillis();
        MyTask t1=new MyTask(now+1000);
        MyTask t2=new MyTask(now+2000);
        MyTask t3=new MyTask(now+1500);
        MyTask t4=new MyTask(now+2500);
        MyTask t5=new MyTask(now+500);

        tasks.put(t1);
        tasks.put(t2);
        tasks.put(t3);
        tasks.put(t4);
        tasks.put(t5);

        System.out.println(tasks);

        for (int i = 0; i < 5; i++) {
            System.out.println(tasks.take());
        }
    }
}

/**output:
 * [1583484952740, 1583484953240, 1583484953740, 1583484954740, 1583484954240]
 * 1583484952740
 * 1583484953240
 * 1583484953740
 * 1583484954240
 * 1583484954740*/

DelayQueue里面的每个元素都记录着还有多长时间可以被拿走。这个队列默认是排好序的,等待时间最长的先往外拿。DelayQueue用作定时任务。由于取的时候必须知道当前元素还有多长时间可以取出,所以,DelayQueue必须实现Delayed方法,在put的时候可以记录Delay的时间。

Case8(LinkedTransferQueue):

package ConcurrentContainer;

import java.util.concurrent.LinkedTransferQueue;


public class T08_TransferQueue {
    public static void main(String[] args) throws InterruptedException {

        //LinkedTransferQueue提供了一个特殊的方法transfer。其他的和别Queue差不多
        LinkedTransferQueue<String> strs=new LinkedTransferQueue<>();
        strs.add("d");
        //消费者先启动,在这儿等着拿数据
        new Thread(()->{
            try {
                System.out.println(strs.take());
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        strs.transfer("aaa");

        /*
        * 如果是生产者先启动,调用transfer,然后再启动消费者,那么下面的代码将不会执行,因为调用transfer会进行阻塞
        * */


        /*LinkedTransferQueue也提供了put方法,调用put就不会阻塞了
        * strs.put("aaa");
        * */

        /*new Thread(()->{
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();*/
    }
}

LinkedTransferQueue提供了一个特殊的方法transfer。其他的和别Queue差不多。

transfer的作用:比如启动了n和消费者,都等待着从队列中取,这时,生产者产出的东西直接会给消费者,而不再往队列里面扔啦。

transfer相当于“生产者”,LinkedTransferQueue也提供了put方法,调用put就不会阻塞了。

使用场景:用在更高的并发场景,不用往队列里面扔了,提高了并发的效率。避免生产者产生的消息过度,在游戏服务器转发消息的时候用的比较多。

Case9(SynchronousQueue):

package ConcurrentContainer;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

public class T09_SynchronusQueue {  //容量为0
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> strs=new SynchronousQueue<>();

        new Thread(()->{
            try {
                System.out.println(strs.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        strs.put("aaa");  //阻塞等待消费者消费,put里面就是用的transfer
        //strs.add("aaa");    //调用add是往队列中加元素,会报错:java.lang.IllegalStateException: Queue full
        System.out.println(strs.size());
    }
}
/*====================output========================
0
aaa
*/

SynchronousQueue是一种特殊的TransferQueue。TransferQueue中如果生产者生产了东西,并没有消费者来消费,这时候如果队列为空,还可以把生产的元素放到队列里面。SynchronousQueue叫做没有容量的队列:即生产者生产的东西必须消费者马上进行消费掉,如果不消费掉就会阻塞生产者。

在Case8中,还可以执行add方法向LinkedTransferQueue中添加元素供消费者消费,但是SynchronousQueue的size为0。只能使用put进行阻塞式添加。

总结:Java提供的高并发容器的选择

1.对于map/set的使用(下面的所有Map都可以换成set,下面的类型只是底层实现不同)
HashMap
TreeMap
LinkedHashMap


//如果并发性不太高的情况下:使用下面这两个
Hashtable
Collections.synchronizedXXX   //Hashtable之前如果加上Collections.synchronized,说明是对Hashtable进行了包装(加锁),然后返回一个加锁后的容器


//如果并发性比较高的情况下,使用ConcurrentHashMap
ConcurrentHashMap

//如果并发性比较高的情况下,并要求排序的情况下,使用ConcurrentSkipListMap
ConcurrentSkipListMap

2.在使用队列的情况下

//并发量比较低,不需要同步的队列
ArrayList
LinkedList

//写的时候非常少,读的时候非常多
CopyOnWriteList

//并发量比较低,需要同步
Collections.synchronizedXXX
Vector

Queue:
    //在高并发的时候可以使用两种队列
    CocurrentLinkedQueue  //内部加锁
    BlockingQueue          //阻塞式队列
        LinkedBlockingQueue   //阻塞式无界队列
        ArrayBlockingQueue    //阻塞式有界队列
        TransferQueue        //直接Productor transfer to consumer
        SynchronusQueue      //队列容量为0的TransferQueue

    DelayQueue执行定时任务

相关文章

网友评论

      本文标题:Java高并发容器的选择

      本文链接:https://www.haomeiwen.com/subject/eenirhtx.html