Case1(ConcurrentHashMap ):
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
public class T01_ConcurrentMap {
public static void main(String[] args) {
//以下的几种map,对外提供的api用法都是相同的,只不过是底层的实现不同
Map<String,String> map= new Hashtable<>();
/*Map<String,String> map= new ConcurrentHashMap<>(); //默认加锁的
//Map<String,String> map= new ConcurrentSkipListMap<>(); //高并发并且排序,跳表结构
//Map<String,String> map= new HashMap<>(); //哈希表实现的,想要加锁就用collection.synchronizedXXX
//Map<String, String> map1 = new TreeMap<>(); //树实现的,如果想在map中排好顺序,可以使用treemap,其底层是红黑树(AVL tree) */
Random r=new Random();
Thread[] ths=new Thread[100]; //线程数组:100个
CountDownLatch latch=new CountDownLatch(ths.length); //计数器为100的门闩
long start=System.currentTimeMillis(); //记录起始时间
for (int i = 0; i < ths.length; i++) {
ths[i] = new Thread(()->{
for (int j = 0; j < 1000000; j++) {
//每次装10000个随机数
map.put("a"+r.nextInt(100000),"a"+r.nextInt(100000));
//门闩减一
latch.countDown();
}
});
}
//线程数组启动
Arrays.asList(ths).forEach(t->t.start());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//计算执行时间
long end=System.currentTimeMillis();
System.out.println(end-start);
}
}
以下的几种map,对外提供的api用法都是相同的,只不过是底层的实现不同,分别使用上面的程序打印时间为:
效率对比:
- Hashtable:70
- ConcurrentHashMap:203
- ConcurrentSkipListMap:61
- HashMap:线程不安全
- TreeMap:线程不安全
hashtable 加锁是在整个容器上加锁,ConcurrentHashMap引入了分割(segmentation),不论它变得多么大,仅仅需要锁定map的某个部分,而其它的线程不需要等到迭代完成才能访问map。ConcurrentHashMap对整个容器分段(16段),加锁时只对其中的一段进行加锁。
上面的代码执行结果实际上是ConcurrentHashMap>hashtable的,原因是ConcurrentHashMap抛弃了分段锁??是因为1.8之后,使用cas替换了分段锁。放弃了segment臃肿的设计,取而代之的是node + cas + synchronized
Case2(CopyOnWriteArrayList):
package ConcurrentContainer;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
public class T02_CopyOnWriteList {
public static void main(String[] args) {
List<String> lists = new CopyOnWriteArrayList<>();
//new ArrayList<>(); //这个会出现并发问题,ArrayList是线程不安全的
//new Vector<>(); //Vector的方法都是加了锁的
Random r=new Random();
Thread[] ths=new Thread[100];
for (int i = 0; i < ths.length; i++) {
Runnable task=new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
lists.add("a"+r.nextInt(10000));
}
}
};
ths[i]=new Thread(task);
}
runAndConputeTime(ths);
System.out.println(lists.size());
}
static void runAndConputeTime(Thread[] ths){
long s1=System.currentTimeMillis();
Arrays.asList(ths).forEach(t->t.start());
Arrays.asList(ths).forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long s2=System.currentTimeMillis();
System.out.println(s2-s1);
}
}
写时复制容器:当添加时,把整个容器的内容复制一份,然后再从末尾加上要添加的元素,再把引用指向新的上面。CopyOnWriteArrayList在多线程环境下,写时效率非常低,读时效率非常高。这个容器的业务场景:适合写少,读多的环境(比如事件监听器的并发)
是上面的代码中ArrayList会出现并发问题,ArrayList是线程不安全的。Vector的方法都是加了锁的,所以不会出现线程安全的问题。效率比CopyOnWriteArrayList要高。
Case3(Collections.synchronizedXXX):
package ConcurrentContainer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
//Collections.synchronizedXXX的功能就是接受一个不加锁的容器,返回一个加了锁的容器
public class T03_SynchronizedList {
//ArrayList是线程不安全的
List<String> srts=new ArrayList<>();
//通过Collections.synchronizedList一包装,返回一个线程安全的List
List<String> strsSync= Collections.synchronizedList(srts);
}
上面提到,ArrayList会出现并发问题,ArrayList是线程不安全的,可以使用工具类Collections.synchronizedXXX对ArrayList进行封装,上例就是Collections.synchronizedList。Collections.synchronizedXXX的功能就是接受一个不加锁的容器,返回一个加了锁的容器。
注意:Collection是接口,Collections是集合工具类。
Case4(ConcurrentQueue):
package ConcurrentContainer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class T04_ConcurrentQueue {
public static void main(String[] args) {
//并发链表队列,无界队列,什么时候内存耗完了,什么时候报错
Queue<String> strs=new ConcurrentLinkedQueue<>();
//offer == add ,offer有一个返回值,可以用来判断其加成功了还是没加成功
for (int i = 0; i < 10; i++) {
strs.offer("a"+i);
}
System.out.println(strs);
System.out.println(strs.size());
//poll是拿出来并删掉
System.out.println(strs.poll());
System.out.println(strs.size());
//peek是只拿出来,并不删除
System.out.println(strs.peek());
System.out.println(strs.size());
}
}
/*====================output========================
[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
10
a0
9
a1
9
*/
ConcurrentQueue是由队列实现的并发容器,上面的例子使用的是ConcurrentLinkedQueue,底层有链表实现。并发链表队列是无界队列,什么时候内存耗完了,什么时候报错。
知识点:
- ConcurrentQueue的操作:
- offer:尾部添加元素,offer有一个返回值,可以用来判断其加成功了还是没加成功
- add:尾部添加元素,add无返回值
- poll:删除头元素,poll是返回头元素值并删掉
- peek:返回头元素值
Case5(LinkedBlockingQueue):
package ConcurrentContainer;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class T05_LinkedBlockingQueue {
static BlockingQueue<String> strs=new LinkedBlockingQueue<>();
static Random r=new Random();
public static void main(String[] args) {
new Thread(()->{
for (int i = 0; i < 100; i++) {
try {
//阻塞式容器的方法:put = add = offer
strs.put("a"+i); //如果满了,就会等待
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"p1").start();
for (int i = 0; i < 5; i++) {
new Thread(()->{
//五个线程一直拿
for (;;){
try {
//take = poll = remove
System.out.println(Thread.currentThread().getName()+" take -"+strs.take()); //如果空了,就会等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"c"+i).start();
}
}
}
LinkedBlockingQueue是无界队列(其实是默认有界,但是界的大小为Integer.MAX_VALUE),除非内存满了,否则可以一直向里面添加,同步阻塞式容器queue类似于生产者消费者模式。其中,由于LinkedBlockingQueue是无界队列,阻塞式的添加方法为put,阻塞式取出方法为take。
Case6(ArrayBlockingQueue):
package ConcurrentContainer;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class T06_ArrayBlockingQueue {
static BlockingQueue<String> strs=new ArrayBlockingQueue<>(10);
static Random r=new Random();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
strs.put("a"+i);
}
//strs.put("aaa"); //满了就会等待,程序阻塞,不动了
//strs.add("aaa"); //add满了会报异常:java.lang.IllegalStateException: Queue full
//strs.offer("aaa"); //满了不会报异常,但是不会把aaa加进去,返回布尔值
strs.offer("aaa",1, TimeUnit.SECONDS); //按时间段进行阻塞
System.out.println(strs);
}
}
ArrayBlockingQueue与LinkedBlockingQueue的不同之处在于ArrayBlockingQueue是有界队列,长度固定,可以传递参数来设置容器的大小,LinkedBlockingQueue是无界队列。
由于ArrayBlockingQueue是有界队列,所以添加方法有些差异:
strs.put("aaa");
//满了就会等待,程序阻塞,不动了strs.add("aaa");
//add满了会报java.lang.IllegalStateException: Queue full异常strs.offer("aaa");
//满了不会报异常,但是不会把aaa加进去,返回布尔值
Case7(DelayQueue):
package ConcurrentContainer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class T07_DelayQueue {
static BlockingQueue<MyTask> tasks=new DelayQueue<>();
/*
*由于取的时候必须知道当前元素还有多长时间可以取出,所以,DelayQueue必须实现Delayed方法,在put的时候可以记录Delay的时间
* */
static class MyTask implements Delayed{
long runningTime;
MyTask(long rt){
this.runningTime=rt;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS)<o.getDelay(TimeUnit.MILLISECONDS)){
return -1;
}else if (this.getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS)){
return 1;
}
return 0;
}
@Override
public String toString() {
return ""+runningTime;
}
}
public static void main(String[] args) throws InterruptedException {
long now=System.currentTimeMillis();
MyTask t1=new MyTask(now+1000);
MyTask t2=new MyTask(now+2000);
MyTask t3=new MyTask(now+1500);
MyTask t4=new MyTask(now+2500);
MyTask t5=new MyTask(now+500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for (int i = 0; i < 5; i++) {
System.out.println(tasks.take());
}
}
}
/**output:
* [1583484952740, 1583484953240, 1583484953740, 1583484954740, 1583484954240]
* 1583484952740
* 1583484953240
* 1583484953740
* 1583484954240
* 1583484954740*/
DelayQueue里面的每个元素都记录着还有多长时间可以被拿走。这个队列默认是排好序的,等待时间最长的先往外拿。DelayQueue用作定时任务。由于取的时候必须知道当前元素还有多长时间可以取出,所以,DelayQueue必须实现Delayed方法,在put的时候可以记录Delay的时间。
Case8(LinkedTransferQueue):
package ConcurrentContainer;
import java.util.concurrent.LinkedTransferQueue;
public class T08_TransferQueue {
public static void main(String[] args) throws InterruptedException {
//LinkedTransferQueue提供了一个特殊的方法transfer。其他的和别Queue差不多
LinkedTransferQueue<String> strs=new LinkedTransferQueue<>();
strs.add("d");
//消费者先启动,在这儿等着拿数据
new Thread(()->{
try {
System.out.println(strs.take());
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.transfer("aaa");
/*
* 如果是生产者先启动,调用transfer,然后再启动消费者,那么下面的代码将不会执行,因为调用transfer会进行阻塞
* */
/*LinkedTransferQueue也提供了put方法,调用put就不会阻塞了
* strs.put("aaa");
* */
/*new Thread(()->{
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();*/
}
}
LinkedTransferQueue提供了一个特殊的方法transfer。其他的和别Queue差不多。
transfer的作用:比如启动了n和消费者,都等待着从队列中取,这时,生产者产出的东西直接会给消费者,而不再往队列里面扔啦。
transfer相当于“生产者”,LinkedTransferQueue也提供了put方法,调用put就不会阻塞了。
使用场景:用在更高的并发场景,不用往队列里面扔了,提高了并发的效率。避免生产者产生的消息过度,在游戏服务器转发消息的时候用的比较多。
Case9(SynchronousQueue):
package ConcurrentContainer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class T09_SynchronusQueue { //容量为0
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strs=new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.put("aaa"); //阻塞等待消费者消费,put里面就是用的transfer
//strs.add("aaa"); //调用add是往队列中加元素,会报错:java.lang.IllegalStateException: Queue full
System.out.println(strs.size());
}
}
/*====================output========================
0
aaa
*/
SynchronousQueue是一种特殊的TransferQueue。TransferQueue中如果生产者生产了东西,并没有消费者来消费,这时候如果队列为空,还可以把生产的元素放到队列里面。SynchronousQueue叫做没有容量的队列:即生产者生产的东西必须消费者马上进行消费掉,如果不消费掉就会阻塞生产者。
在Case8中,还可以执行add方法向LinkedTransferQueue中添加元素供消费者消费,但是SynchronousQueue的size为0。只能使用put进行阻塞式添加。
总结:Java提供的高并发容器的选择
1.对于map/set的使用(下面的所有Map都可以换成set,下面的类型只是底层实现不同)
HashMap
TreeMap
LinkedHashMap
//如果并发性不太高的情况下:使用下面这两个
Hashtable
Collections.synchronizedXXX //Hashtable之前如果加上Collections.synchronized,说明是对Hashtable进行了包装(加锁),然后返回一个加锁后的容器
//如果并发性比较高的情况下,使用ConcurrentHashMap
ConcurrentHashMap
//如果并发性比较高的情况下,并要求排序的情况下,使用ConcurrentSkipListMap
ConcurrentSkipListMap
2.在使用队列的情况下
//并发量比较低,不需要同步的队列
ArrayList
LinkedList
//写的时候非常少,读的时候非常多
CopyOnWriteList
//并发量比较低,需要同步
Collections.synchronizedXXX
Vector
Queue:
//在高并发的时候可以使用两种队列
CocurrentLinkedQueue //内部加锁
BlockingQueue //阻塞式队列
LinkedBlockingQueue //阻塞式无界队列
ArrayBlockingQueue //阻塞式有界队列
TransferQueue //直接Productor transfer to consumer
SynchronusQueue //队列容量为0的TransferQueue
DelayQueue执行定时任务
网友评论