美文网首页
java多线程与高并发(七)并发容器

java多线程与高并发(七)并发容器

作者: 小偷阿辉 | 来源:发表于2021-04-27 09:24 被阅读0次

    1.并发容器

    java容器

    1.1.第一类Collection,也叫做集合

    集合的意思就是说这个容器是什么结构,你都可以把一个元素一个元素的往里面添加。从数据结构的角度来说,这个存储的数据结构也就两种
    连续存储的数组Array,和非连续存储的一个指向另外一个的链表,但是逻辑结构就很多了。Collection又分成了3大类,分别为Set,List,Queue,Queue队列接口就是为了高并发准备的,Set不会有重复的元素。队列最主要的原因就是实现了任务的状态和获取,叫做阻塞队列,其中有一个子接口Deque叫做双端队列,一般的队列只能一端进另外一端出,但是双端队列却可以同进同出。

    1.2.第二类Map,集合特殊的变种,一对一对的

    1.2.1.HashTable

    最开始java1.0的容器里只有两个,Vector和Hashtable,但是这两个容器所有的默认方法都是加了锁synchronized的,这是最早设计不太合理的地方。后来又加了HashMap,它是完全没有加锁的,后来为了适配有锁的场景又加了SynchronizedMap,它是HashMap的加锁版本。
    Vector和Hashtable 自带锁,基本不用

    1.2.2.HashMap

    我们来看看这HashMap,同学们想一下这个HashMap往里头插会不会有问题,因为HashMap没有锁,线程不安全。虽然他速度比较快,但是数据会出问题,并且可能报各种异常。

    主要是因为它内部会把这个变成TreeNode。具体细节不再细究

    package com.learn.thread.six;
    
    import com.learn.thread.six.enums.Constans;
    
    import java.util.HashMap;
    import java.util.Hashtable;
    import java.util.UUID;
    
    public class TestHashMap {
    
        private static HashMap<UUID, UUID> m = new HashMap<>();
    
        static int count = Constans.COUNT;
    
        static UUID[] keys = new UUID[count];
    
        static UUID[] values = new UUID[count];
    
        static final int THREAD_COUNT = Constans.THREAD_COUNT;
    
        static {
            for (int i = 0; i < count; i++) {
                keys[i] = UUID.randomUUID();
                values[i] = UUID.randomUUID();
            }
        }
    
        static class Mythread extends Thread {
    
            /**
             * 开始位置
             */
            int start;
    
            /**
             * 每一个线程放入的个数
             */
            int gap = count/THREAD_COUNT;
    
            public Mythread(int start) {
                this.start = start;
            }
    
            @Override
            public void run() {
                for (int i = start; i < start + gap; i++) {
                    m.put(keys[i], values[i]);
                }
            }
    
            public static void main(String[] args) {
                long start = System.currentTimeMillis();
    
                Thread[] threads = new Thread[THREAD_COUNT];
    
                for (int i = 0; i < threads.length; i++) {
                    threads[i] = new Mythread(i * (count/THREAD_COUNT));
                }
                for (Thread item : threads) {
                    item.start();
                }
    
                for (Thread thread : threads) {
                    try {
                        thread.join();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                long end = System.currentTimeMillis();
    
                System.out.println(end - start);
    
                System.out.println(m.size());
            }
        }
    }
    

    可以看到,HashMap不是线程安全的

    1.2.3.SynchronizedHashMap

    这个是给HashMap手动加了锁,它的源码自己做了一个Object,然后每次都是SynchronizedObject,严格来讲他和那个HashMap效率上区别不大

    package com.learn.thread.six;
    
    import com.learn.thread.six.enums.Constans;
    
    import java.util.*;
    
    public class TestSynchronziedHashMap {
    
      private static Map<UUID, UUID> m = Collections.synchronizedMap(new HashMap<UUID, UUID>());
    
      static int count = Constans.COUNT;
    
      static UUID[] keys = new UUID[count];
    
      static UUID[] values = new UUID[count];
    
      static final int THREAD_COUNT = Constans.THREAD_COUNT;
    
      static {
          for (int i = 0; i < count; i++) {
              keys[i] = UUID.randomUUID();
              values[i] = UUID.randomUUID();
          }
      }
    
      static class Mythread extends Thread {
    
          /**
           * 开始位置
           */
          int start;
    
          /**
           * 每一个线程放入的个数
           */
          int gap = count/THREAD_COUNT;
    
          public Mythread(int start) {
              this.start = start;
          }
    
          @Override
          public void run() {
              for (int i = start; i < start + gap; i++) {
                  m.put(keys[i], values[i]);
              }
          }
    
          public static void main(String[] args) {
              long start = System.currentTimeMillis();
    
              Thread[] threads = new Thread[THREAD_COUNT];
    
              for (int i = 0; i < threads.length; i++) {
                  threads[i] = new Mythread(i * (count/THREAD_COUNT));
              }
              for (Thread item : threads) {
                  item.start();
              }
    
              for (Thread thread : threads) {
                  try {
                      thread.join();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
              long end = System.currentTimeMillis();
    
              System.out.println(end - start);
    
              System.out.println(m.size());
    
              // ---------------------------------
              start = System.currentTimeMillis();
    
              for (int i = 0; i < threads.length; i++) {
                  threads[i] = new Thread(() -> {
                      for (int j = 0; j < 10; j++) {
                          System.out.println(m.get(keys[10]));
                      }
                  });
              }
    
              for (Thread item : threads) {
                  item.start();
              }
              for (Thread item : threads) {
                  try {
                      item.join();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
              end = System.currentTimeMillis();
              System.out.println(end - start);
          }
      }
    }
    
    

    1.2.4.ConcurrentHashMap

    ConcurrentHashMap 是多线程里边真正用得到的,这个效率主要体现在读上面,由于它往里边插的时候做了各种判断,本来是链表的,后来改成了红黑树,然后又做了各种各样的CAS判断,所以插入的时候是更低一点。
    HashMap和HashTable层虽然读的效率稍微低一点,但是他往里插的时候检查的东西比较少,就加个锁往后一插。
    所以,各种Map的使用,看你实际当中的需求。下面用个程序看看ConcurrentHashMap的查询速度和插入速度

    package com.learn.thread.six;
    
    import com.learn.thread.six.enums.Constans;
    
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class TestConcurrentHashMap {
    
        private static Map<UUID, UUID> m = new ConcurrentHashMap<>();
    
        static int count = Constans.COUNT;
    
        static UUID[] keys = new UUID[count];
    
        static UUID[] values = new UUID[count];
    
        static final int THREAD_COUNT = Constans.THREAD_COUNT;
    
        static {
            for (int i = 0; i < count; i++) {
                keys[i] = UUID.randomUUID();
                values[i] = UUID.randomUUID();
            }
        }
    
        static class Mythread extends Thread {
    
            /**
             * 开始位置
             */
            int start;
    
            /**
             * 每一个线程放入的个数
             */
            int gap = count/THREAD_COUNT;
    
            public Mythread(int start) {
                this.start = start;
            }
    
            @Override
            public void run() {
                for (int i = start; i < start + gap; i++) {
                    m.put(keys[i], values[i]);
                }
            }
    
            public static void main(String[] args) {
                long start = System.currentTimeMillis();
    
                Thread[] threads = new Thread[THREAD_COUNT];
    
                for (int i = 0; i < threads.length; i++) {
                    threads[i] = new Mythread(i * (count/THREAD_COUNT));
                }
                for (Thread item : threads) {
                    item.start();
                }
    
                for (Thread thread : threads) {
                    try {
                        thread.join();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                long end = System.currentTimeMillis();
    
                System.out.println(end - start);
    
                System.out.println(m.size());
    
                // ---------------------------------
                start = System.currentTimeMillis();
    
                for (int i = 0; i < threads.length; i++) {
                    threads[i] = new Thread(() -> {
                        for (int j = 0; j < 10; j++) {
                            System.out.println(m.get(keys[10]));
                        }
                    });
                }
    
                for (Thread item : threads) {
                    item.start();
                }
                for (Thread item : threads) {
                    try {
                        item.join();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                end = System.currentTimeMillis();
                System.out.println(end - start);
            }
        }
    }
    
    

    1.2.5.ConcurrentSkipListMap

    通过跳表来实现的高并发容器,并且这个Map是有排序的
    跳表: 底层本身存储的是一个链表元素,是排好序的。如果链表是排好序的,往里边插入和取值都变得特别麻烦。因为你得从头到尾的找。这时候跳表就出现了
    跳表在链表的基础上拿出了一些关键元素,在上面做一层链表,如果这一层数据量还是很大,就在这个基础上再抽一些元素做一个链表。所以查找数据的时候是一层一层的往下查找,实现上比TreeMap 又容易了很多。

    下面做一个小程序,试试这几个Map 的性能

    package com.learn.thread.six.map;
    
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    import java.util.concurrent.CountDownLatch;
    
    public class TestConcurrentSkipListMap {
        public static void main(String[] args) {
            Map<String, String> map1 = new ConcurrentSkipListMap<>();
            Map<String, String> map2 = new ConcurrentHashMap<>();
            Map<String, String> map3 = new HashMap<>();
            Hashtable<String, String> map4 = new Hashtable<>();
            test(map1);
            test(map2);
            test(map3);
            test(map4);
        }
        public static void test(Map<String, String> map) {
            Random random = new Random();
            Thread[] tds = new Thread[100];
            CountDownLatch count = new CountDownLatch(tds.length);
            long start = System.currentTimeMillis();
            for (int i = 0; i < tds.length; i++) {
                tds[i] = new Thread(() -> {
                    for (int j = 0; j < 10; j++) {
                        map.put(UUID.randomUUID().toString(), "a" + random.nextInt(100000));
                    }
                    count.countDown();
                });
            }
            Arrays.asList(tds).forEach(item -> item.start());
            try {
                count.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long end = System.currentTimeMillis();
            System.out.println(end - start);
            System.out.println(map.size());
    
    
        }
    }
    

    1.2.6.ArraryList

    我们先来认识一下vector到Queue的发展历程,下面有一个程序展示超卖的现象

    package com.learn.thread.six.list;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class TestTicketSeller {
    
        private static List<String> tickets = new ArrayList<>();
    
        static {
            for (int i = 0; i < 1000; i++) {
                tickets.add("票编号" + i);
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    while (tickets.size() >0 ){
                        System.out.println("销售了 ---" + tickets.remove(0));
                    }
                }).start();
            }
        }
    }
    
    

    运行一下,我们发现卖了超过1000条

    1.2.7.Vector

    我们来看看最早的容器Vector,内部是带锁的,(add 和remove 都是带锁的),但是我们的代码逻辑中间Vector 是不会带锁的。

    package com.learn.thread.six.list;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Vector;
    
    public class TestSeller2 {
        private static Vector<String> tickets = new Vector<>();
    
        static {
            for (int i = 0; i < 100; i++) {
                tickets.add("票编号" + i);
            }
        }
        public static void main(String[] args) {
            testSeller();
        }
    
        public static void testSeller() {
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    while (tickets.size() >0 ){
                        // 下面这段代码并没有加锁
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("销售了 ---" + tickets.remove(0));
                    }
                }).start();
            }
        }
    }
    
    

    可以看到,中间代码睡眠2秒是存在线程安全问题的,如果把它去掉,整个程序不会报异常。所以还是得最外层加一个锁。如下所示

    public static void testSynchronizedSeller() {
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    // 保证原子性操作
                    synchronized (tickets) {
                        while (tickets.size() >0 ){
                            // 下面这段代码并没有加锁
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("销售了 ---" + tickets.remove(0));
                        } 
                    }
                }).start();
            }
        }
    

    1.2.8.LinkedList

    LinkList是一个双链表,在添加和删除元素时具有比ArrayList更好的性能.但在get与set方面弱于ArrayList.当然,这些对比都是指数据量很大或者操作很频繁。

    LinkedList并不需要连续的空间,大小不确定

    
    package com.learn.thread.six.list;
    
    import java.util.Vector;
    
    public class TestLinkedList {
        private static Vector<String> tickets = new Vector<>();
    
        static {
            for (int i = 0; i < 10; i++) {
                tickets.add("票编号" + i);
            }
        }
    
        public static void main(String[] args) {
            testSeller();
        }
    
        public static void testSeller() {
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    while (true) {
                        synchronized (tickets) {
                            if (tickets.size() <= 0) {
                                return;
                            }
    
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("销售了------" + tickets.remove(0));
                        }
                    }
                }).start();
            }
        }
    }
    
    

    1.2.9.Queue

    效率最高的就是这个Queue,这是Java 最新的一个借口,主要目标就是为了高并发使用的,就是为了多线程,所以以后多线程就考虑Queue。
    Queue 使用最多的就是ConcurrentLinkedQueue,然后里边没有加锁,调用一个叫poll的方法去取值,如果取空了就说明里面的值已经没了。

    package com.learn.thread.six.queue;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedDeque;
    
    public class TestQueue {
        static Queue<String> queue = new ConcurrentLinkedDeque<>();
    
        static {
            for (int i = 0; i < 10; i++) {
                queue.add("票编号" + i);
            }
        }
    
        public static void main(String[] args) {
            testQueue();
        }
        public static void testQueue() {
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    while (true) {
                        String s = queue.poll();
                        if (s == null) {
                            return;
                        }
                        System.out.println("销售了" + s);
                    }
                }).start();
            }
        }
    }
    

    1.2.10.CopyOnWrite

    再来看看并发的时候经常使用的一个类,CopyOnWrite ,CopyOnWirteList,CopyOnWirteSet, CopyOnwirte的意思就是复制,并且是写时复制

    public boolean add(E e) {
        // 添加的时候,上锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 原本的数组
            Object[] elements = getArray();
            // 原本数组的长度
            int len = elements.length;
            // 调用native方法进行复制
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            // 新的元素
            newElements[len] = e;
            // 替换数组
            setArray(newElements);
            // 成功
            return true;
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    
    
    package com.learn.thread.six.queue;
    
    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    public class TestCopyOnwirte {
        public static void main(String[] args) {
            List<Integer> list1 = new CopyOnWriteArrayList<>();
            List<Integer> list2 = new ArrayList<>();
            List<Integer> list3 = new Vector<>();
            List<Integer> list4 = new LinkedList<>();
            test(list1);
            test(list2);
            test(list3);
            test(list4);
        }
    
        public static void test(List<Integer> list) {
            long start = System.currentTimeMillis();
            for (int i = 0; i < 10; i++) {
                for (int j = 0; j < 100; j++) {
                    int finalJ = j;
                    new Thread(() -> {
                        list.add(finalJ);
                    }) .start();
                }
            }
            System.out.println(list.size());
            long end = System.currentTimeMillis();
            System.out.println("time" + (end - start));
        }
    }
    

    可以发现CopyOnWirte的写是最慢的,所以这个适用于,多读少写的场景。如果对象很大,CopyOnWirte存在内存溢出的情况,因为写操作都是复制一份,需要开辟另外一个空间

    3.队列

    3.1.BlockingQueue

    这个是后面线程池的时候需要讲的内存,重点就是Block阻塞,顾名思义,阻塞队列,我们可以在其方法上做到线程自动阻塞。
    offer offer是往里头添加,加没加进去会返回一个boolean值
    add add 同样往里头加,但是如果没加进去是会抛出异常的
    poll 提取数据,并且remove掉
    peek 提取数据,但是不remove

    package com.learn.thread.six.queue;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedDeque;
    
    public class TestBlockingQueue {
    
        public static void main(String[] args) {
            Queue<String> queue = new ConcurrentLinkedDeque<>();
            for (int i = 0; i < 10; i++) {
                queue.offer("a" + i);
            }
            System.out.println(queue);
            System.out.println(queue.size());
            // 队列先进后出,poll是提取最后一个进去的
            System.out.println(queue.poll());
            System.out.println(queue.peek());
            System.out.println(queue.size());
        }
    }
    

    3.2.LinkedBlockingQueue

    LinkedBlockingQueue 是用链表实现的无界队列,一直往里头加,直到内存装满了为止
    LinkedBlockingQueue 在Queue的基础上,添加两个很重要的方法
    put 往队列里边加,加满了,这个线程就会阻塞住。
    take 往外取数据,如果空了,线程也会阻塞住
    所以LinkedBlockingQueue是实现生产者消费者最好的容器。
    下面以一个小程序演示一下一个线程a到i 装数据,没装一个睡一秒,另外启5个线程不断的从里面take,空了我就等着,什么时候新加了我就立马把他取出来

    package com.learn.thread.six.queue;
    
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class TestLinkedBlockingQueue {
        static BlockingQueue<String> queue = new LinkedBlockingQueue<>(3);
    
        static Random random = new Random();
    
        public static void main(String[] args) {
            new Thread(() -> {
                for (int i = 0; i < 5; i++) {
                    // 如果满了就会等待
                    try {
                        queue.put("a" + i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
            },"p1").start();
    
            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    while (true) {
                        try {
                            System.out.println(Thread.currentThread().getName() + queue.take());
                        } catch (Exception ex) {
                        }
                    }
                }).start();
            }
        }
    }
    

    3.3.ArrayBlockingQueue

    ArrayBlockingQueue 是有界的,你可以指定一个固定的值,他的容器就是10,那么当你往里面扔内容的时候,一旦满了put方法就是阻塞住,如果继续add 的话就会抛出异常.
    offer 用返回值来判断是否添加成功,还有另外一个写法,你可以指定一个时间尝试往里边添加,比如一秒钟,如果一秒钟内没有添加成功,他就返回了。
    面试题经常会被问到 Queue和List 的区别是什么
    答:添加了offer\peek\poll\put\take 这些方法对线程友好的阻塞或者等待。

    package com.learn.thread.six.queue;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class TestArrayLBlockingQueue {
        public static BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 10; i++) {
                queue.put("a" + i);
            }
            // 满了就会阻塞
            queue.put("a");
    //        queue.add("a");
            queue.offer("a");
            queue.offer("a", 1, TimeUnit.SECONDS);
            System.out.println(queue);
        }
    }
    

    下面看看几个比较特殊的Queue,这些都是BlockingQueue ,同样也是阻塞的

    3.4.DelayQueue

    DelayQueue 可以实现时间上的排序,这个DelayQueue能实现按里面等待的时间来排序。
    但是要实现Comparable接口重写compareTo来确定是怎么排序的,DelayQueue就是按时间顺序进行任务调度

    package com.learn.thread.six.queue;
    
    import lombok.ToString;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class TestDelayQueue {
        private static BlockingQueue queue = new DelayQueue<>();
    
        @ToString
        static class MyThread implements Delayed {
            String name;
    
            long runningTime;
    
            public MyThread(String name, long runningTime) {
                this.name = name;
                this.runningTime = runningTime;
            }
    
            /**
             * 根据这个时间去排序
             *
             * @param unit
             * @return
             */
            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
    
            // 修改排序
            @Override
            public int compareTo(Delayed o) {
                if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
                    return -1;
                } else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                    return 1;
                } else {
                    return 0;
                }
            }
    
            public static void main(String[] args) throws InterruptedException {
                long now = System.currentTimeMillis();
                MyThread t1 = new MyThread("t1", now + 1000);
                MyThread t2 = new MyThread("t2", now + 2000);
                MyThread t3 = new MyThread("t3", now + 1500);
                MyThread t4 = new MyThread("t4", now + 2500);
                MyThread t5 = new MyThread("t5", now + 500);
                queue.put(t1);
                queue.put(t2);
                queue.put(t3);
                queue.put(t4);
                queue.put(t5);
                System.out.println(queue);
                for (int i = 0; i < 5; i++) {
                    System.out.println(queue.take());
                }
            }
        }
    }
    

    DelayQueue本质上是用了个PriorityQueue,PriorityQueue是AbstractQueue继承的。PriorityQueue的特点是它内部往里装的时候不是按顺序的,而是内部进行了一个排序。按照优先级,最小的优先,内部结构是一个二叉树,这个二叉树可以认为是堆排序里面那个最小堆值排在最上面。

    package com.learn.thread.six.queue;
    
    import java.util.PriorityQueue;
    
    public class TestPriorityQueue {
        public static void main(String[] args) {
            PriorityQueue<String> queue = new PriorityQueue<>();
    
            queue.add("a");
            queue.add("e");
            queue.add("b");
            queue.add("c");
            queue.add("f");
            for (int i = 0; i < 5; i++) {
                System.out.println(queue.poll());
            }
        }
    }
    

    3.5.SynchronousQueue

    SynchronousQueue的容量为0,这个容器不是用来装内容的,专门用来两个线程之间传内容的,给线程下达任务的,跟之前讲过的Exchange一样。
    add方法会直接报错,原因是容量为0,不能添加。只能用put调用,并且是要求前面有人拿着这个东西你才可以往里面装
    SynchronousQueue线程池多数情况下会使用,很多线程取任务,互相之间进行任务调度的都是他

    package com.learn.thread.six.queue;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    public class TestSynchronousQueue {
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> queue = new SynchronousQueue<>();
            new Thread(() -> {
                try {
                    System.out.println(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            // 阻塞等待消费者消费,如果没有被消费会一直等待
            queue.put("aaa");
            System.out.println("niadsasdasd");
            System.out.println(queue.size());
        }
    }
    
    

    3.6.TransferQueue

    Transfer意思为传递,实际上和SynchronousQueue一样,不过SynchronousQueue只能传递一个,它能传递多个,并且它还有一个transfer方法,意思就是装完后就在这里等着被消费。
    一般使用场景,我做了一件事情,这个事情一定要有一个结果,有了结果之后我才可以继续进行我下面的事情。
    比如说我支付了钱,这个订单付款完了,也是要一直等这个付账结果完了了我才可以给客户反馈

    package com.learn.thread.six.queue;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    import static java.lang.Thread.sleep;
    
    public class TestTransferQueue {
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue<String> queue = new LinkedTransferQueue<String>();
            new Thread(() -> {
                try {
                    System.out.println(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            queue.transfer("asdasdasd");
            new Thread(() -> {
                try {
                    System.out.println(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    

    容器这章主要为了后面的线程池

    总结
    从hashTable到ConcurrentHashMap,这不是一个替代关系,而是不同场景下不同的使用
    Vector 到Queue这样一个过程,主要是Vector添加了许多对线程友好的API,offer\peek\poll,它的一个子类叫BlockingQueue又添加了put和take 实现了阻塞操作。

    相关文章

      网友评论

          本文标题:java多线程与高并发(七)并发容器

          本文链接:https://www.haomeiwen.com/subject/xmklrltx.html