我们的容器,分为了List、Set、Map、Queue四大类。并不是所有容器都是线程安全的,比如我们经常用到的HashMap,ArrayList等就不是,java Collections类还专门提供了把非线程安全的容器转换成线程安全的容器的方法:
public void collectionTest1(){
//把非线程安全的容器,转换成线程安全的容器
Map<String,String> map=new HashMap<String,String>();
map=Collections.synchronizedMap(map);
//List等其他的用法也一样。
}
而线程安全的容器,分为了两大类:同步容器和并发容器。早期的jdk的线程安全的容器,基本都是同步容器,如Vector、HashTable。同步容器虽然能保证线程安全,但是缺点也很明显,就是效率较低。所以java并发包里提供了并发容器:
并发容器也是这4大类,对每一类针对不同的场景,提供了不同的实现。
image.png
image.png
image.png
image.png
值得一提的是,非阻塞队列用 cas 替代了锁的方案,效率更高。
基本用法:
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class CollectionTest {
@Test
public void listTest(){
CopyOnWriteArrayList list=new CopyOnWriteArrayList();
list.add("CopyOnWriteArrayList");
log.info(list.get(0).toString());
}
@Test
public void mapTest(){
ConcurrentHashMap concurrentHashMap=new ConcurrentHashMap();
concurrentHashMap.put("concurrentHashMap","concurrentHashMap value");
log.info(concurrentHashMap.get("concurrentHashMap").toString());
ConcurrentSkipListMap concurrentSkipListMap=new ConcurrentSkipListMap();
concurrentSkipListMap.put(2,"concurrentSkipListMap value2");
concurrentSkipListMap.put(1,"concurrentSkipListMap value1");
concurrentSkipListMap.put(3,"concurrentSkipListMap value3");
log.info(concurrentSkipListMap.firstEntry().getValue().toString());
}
@Test
public void setTest(){
Set copyOnWriteArraySet=new CopyOnWriteArraySet();
copyOnWriteArraySet.add(2);
copyOnWriteArraySet.add(1);
copyOnWriteArraySet.add(3);
copyOnWriteArraySet.add(22222);
copyOnWriteArraySet.add(111);
copyOnWriteArraySet.add(33);
Iterator it=copyOnWriteArraySet.iterator();
while(it.hasNext()){
log.info("copyOnWriteArraySet:"+it.next());
}
Set concurrentSkipListSet=new ConcurrentSkipListSet();
concurrentSkipListSet.add(2);
concurrentSkipListSet.add(1);
concurrentSkipListSet.add(3);
Iterator it1=concurrentSkipListSet.iterator();
while(it1.hasNext()){
log.info("concurrentSkipListSet:"+it1.next());
}
}
@Test
public void linkedBlockingQueueTest() throws InterruptedException {
LinkedBlockingQueue queue=new LinkedBlockingQueue();
Thread t1=new Thread(()->{
//每隔往队列理塞一个元素
int i=0;
for(i=0;i<5;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.info(i+" 开始加入队列");
queue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(i+" 加入队列成功");
}
});
Thread t2=new Thread(()->{
//取出队列里的元素
int i=0;
for(i=0;i<5;i++){
int n= 0;
try {
n = (int)queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("从队列取出:"+n);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
@Test
public void arrayBlockingQueueTest() throws InterruptedException {
ArrayBlockingQueue queue=new ArrayBlockingQueue(10);
Thread t1=new Thread(()->{
//每隔往队列理塞一个元素
int i=0;
for(i=0;i<5;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.info(i+" 开始加入队列");
queue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(i+" 加入队列成功");
}
});
Thread t2=new Thread(()->{
//取出队列里的元素
int i=0;
for(i=0;i<5;i++){
int n= 0;
try {
n = (int)queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("从队列取出:"+n);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
@Test
public void synchronousQueueTest() throws InterruptedException {
SynchronousQueue queue=new SynchronousQueue();
Thread t1=new Thread(()->{
//每隔往队列理塞一个元素
int i=0;
for(i=0;i<5;i++){
try {
log.info(i+" 开始加入队列");
queue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(i+" 加入队列成功");
}
});
Thread t2=new Thread(()->{
//取出队列里的元素
int i=0;
for(i=0;i<5;i++){
int n= 0;
try {
Thread.sleep(5000);
n = (int)queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("从队列取出:"+n);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
@Test
public void linkedTransferQueueTest() throws InterruptedException {
LinkedTransferQueue queue=new LinkedTransferQueue();
Thread t1=new Thread(()->{
//每隔往队列理塞一个元素
int i=0;
for(i=0;i<5;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(i+" 开始加入队列");
boolean stat=true;
// queue.put(i);
try {
queue.transfer(i);//若当前有消费者线程等待,则直接把元素交给消费者,否则加入队列尾部,并计入阻塞状态,直到元素被接走
// stat=queue.tryTransfer(i);//若当前有消费者线程等待,则直接把元素交给消费者,并返回true,否则返回false
//sta=queue.tryTransfer(i,1, TimeUnit.SECONDS);//同上,只是会等待一段时间再返回true or false
} catch (Exception e) {
e.printStackTrace();
}
if(stat){
log.info(i+" 加入队列成功");
}else{
log.info(i+" 加入队列失败");
}
}
});
// Thread t2=new Thread(()->{
// //取出队列里的元素
// int i=0;
// for(i=0;i<5;i++){
// int n= 0;
// try {
// n = (int)queue.take();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// log.info("从队列取出:"+n);
//
//
// }
// });
t1.start();
// t2.start();
t1.join();
// t2.join();
}
@Test
public void priorityBlockingQueueTest() throws InterruptedException {
PriorityBlockingQueue queue=new PriorityBlockingQueue();
Thread t1=new Thread(()->{
//每隔往队列理塞一个元素
int i=5;
for(i=5;i>0;i--){
try {
log.info(i+" 开始加入队列");
queue.put(i);
} catch (Exception e) {
e.printStackTrace();
}
log.info(i+" 加入队列成功");
}
});
Thread t2=new Thread(()->{
//取出队列里的元素
int i=0;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(i=0;i<5;i++){
int n= 0;
try {
n = (int)queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("从队列取出:"+n);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
@Test
public void delayQueueTest() throws InterruptedException {
DelayQueue queue=new DelayQueue();
Thread t1=new Thread(()->{
//每隔往队列理塞一个元素
int i=0;
for(i=0;i<5;i++){
User user=new User(i,5000);//延迟5s
try {
log.info(i+" 开始加入队列");
queue.put(user);
} catch (Exception e) {
e.printStackTrace();
}
log.info(user+" 加入队列成功");
}
});
Thread t2=new Thread(()->{
//取出队列里的元素
while (true){
User user=null;
try {
user=(User)queue.take();
} catch (Exception e) {
e.printStackTrace();
}
log.info("从队列取出:"+user);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
@Test
public void linkedBlockingDequeTest() throws InterruptedException {
LinkedBlockingDeque queue=new LinkedBlockingDeque(10);
Thread t1=new Thread(()->{
//每隔往队列理塞一个元素
int i=0;
for(i=0;i<5;i++){
try {
log.info(i+" 开始加入队列");
queue.putFirst(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(i+" 加入队列成功");
}
});
Thread t2=new Thread(()->{
//取出队列里的元素
int i=0;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(i=0;i<5;i++){
int n= 0;
try {
n = (int)queue.takeFirst();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("从队列取出:"+n);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
@Test
public void concurrentLinkedQueueTest() throws InterruptedException {
ConcurrentLinkedQueue queue=new ConcurrentLinkedQueue();
Thread t1=new Thread(()->{
//每隔往队列理塞一个元素
int i=0;
for(i=0;i<5;i++){
try {
log.info(i+" 开始加入队列");
queue.add(i);
} catch (Exception e) {
e.printStackTrace();
}
log.info(i+" 加入队列成功");
}
});
Thread t2=new Thread(()->{
//取出队列里的元素
int i=0;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(i=0;i<6;i++){
int n= 0;
try {
n = (int)queue.poll();
} catch (Exception e) {
e.printStackTrace();
}
log.info("从队列取出:"+n);
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
}
网友评论