美文网首页个人学习
Java并发编程 并发容器

Java并发编程 并发容器

作者: 香沙小熊 | 来源:发表于2020-03-11 13:59 被阅读0次

    1.并发容器概述

    • ConcurrentHashMap:线程安全的HashMap
    • CopyOnWriteArrayList:线程安全的List
    • BlockingQueue:这是一个接口,表示阻塞队列,非常适合用于作为数据共享的通道
    • ConcurrentLinkedQueue:高效的非阻塞并发队列,使用链表实现。可以看做一个线程安全的LinkedList
    • ConcurrentSkipListMap:是一个Map,使用跳表的数据结构进行快速查找

    2.已淘汰并发容器

    • Vector和Hashtable

    3.ConcurrentHashMap

    为什么HashMap是线程不安全的?
    同时put碰撞导致数据丢失
    同时put扩容导致数据丢失
    死循环造成的CPU100%

    1.7的ConcurrentHashMap实现和分析
    • Java 7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,任然是数组和链表组成的拉链法
    • 每个segment独立上ReentrantLock锁,每个segment之间互不影响,提高了并发效率
    • ConcurrentHashMap默认有16个Segments,所以最多可以同时支持16个线程并发写(操作分别分布在不同Segment上)。这个默认值可以在初始化,一旦初始化以后,是不可以扩容的。
    1.8的ConcurrentHashMap实现和分析
    putVal流程
    • 判断key value不为空
    • 计算hash值
    • 根据对应位置节点的类型,来赋值,或者helpTransfer,或者增长链表,或者给红黑树增加加点
    • 检查瞒住阈值就“红黑树化”
    • 返回oldVal
    get流程
    • 计算hash值
    • 找到对应的位置,根据情况进行:
    • 直接取值
    • 红黑树里找值
    • 遍历链表取值
    • 返回找到的结果
    组合操作并不保证线程安全

    即get又set

    public class OptionsNotSafe implements Runnable {
    
        private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<String, Integer>();
    
        public static void main(String[] args) throws InterruptedException {
            scores.put("小明", 0);
            Thread t1 = new Thread(new OptionsNotSafe());
            Thread t2 = new Thread(new OptionsNotSafe());
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println(scores);
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                Integer score = scores.get("小明");
                Integer newScore = score + 1;
                scores.put("小明", newScore);
            }
        }
    }
    
    {小明=1019}
    

    解决

    public class OptionsNotSafe implements Runnable {
    
        private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<String, Integer>();
    
        public static void main(String[] args) throws InterruptedException {
            scores.put("小明", 0);
            Thread t1 = new Thread(new OptionsNotSafe());
            Thread t2 = new Thread(new OptionsNotSafe());
            t1.start();
            t2.start();
            t1.join();
            t2.join();
            System.out.println(scores);
        }
    
    
        @Override
        public void run() {
            for (int i = 0; i < 1000; i++) {
                while (true) {
                    Integer score = scores.get("小明");
                    Integer newScore = score + 1;
                    boolean b = scores.replace("小明", score, newScore);
                    if (b) {
                        break;
                    }
                }
            }
    
        }
    }
    
    {小明=2000}
    

    4.CopyOnWriteArrayList

    • 代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样
    • Vector和SynchronizedList的锁的粒度太大,并发效率相对比较低,并且迭代时无法编辑
    • CopyOnWrite并发容器还包括CopyOnWriteArraySet,用来替代同步Set
      CopyOnWriteArrayList适用场景
    • 读操作可以尽快地快,而写即使慢一些也没有太大关系
    • 读多写少:黑名单,每日更新;监听器:迭代操作远多于修改操作
      CopyOnWriteArrayList读写规则
      回顾读写锁:读读共享、其他都互斥(写写互斥、读写互斥、写读互斥)
      读写锁规则的升级:读取是完全不用加锁的,并且更厉害的是,写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待
    public class CopyOnWriteArrayListDemo {
    
        public static void main(String[] args) {
            CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
    
            list.add("1");
            list.add("2");
            list.add("3");
            list.add("4");
            list.add("5");
    
            Iterator<String> iterator = list.iterator();
    
            while (iterator.hasNext()) {
                System.out.println("list is" + list);
                String next = iterator.next();
                System.out.println(next);
    
                if (next.equals("2")) {
                    list.remove("5");
                }
                if (next.equals("3")) {
                    list.add("3 found");
                }
            }
        }
    }
    
    list is[1, 2, 3, 4, 5]
    1
    list is[1, 2, 3, 4, 5]
    2
    list is[1, 2, 3, 4]
    3
    list is[1, 2, 3, 4, 3 found]
    4
    list is[1, 2, 3, 4, 3 found]
    5
    
    public class CopyOnWriteArrayListDemo {
    
        public static void main(String[] args) throws InterruptedException {
            CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(new Integer[]{1, 2, 3});
    
            System.out.println(list);
            Iterator<Integer> itr1 = list.iterator();
            list.add(4);
            System.out.println(list);
            Iterator<Integer> itr2 = list.iterator();
            itr1.forEachRemaining(System.out::println);
            itr2.forEachRemaining(System.out::println);
    
        }
    }
    
    [1, 2, 3]
    [1, 2, 3, 4]
    1
    2
    3
    1
    2
    3
    4
    
    CopyOnWriteArrayList源码
    add
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                newElements[len] = e;
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();
            }
        }
    
    remove
    public E remove(int index) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                Object[] elements = getArray();
                int len = elements.length;
                E oldValue = get(elements, index);
                int numMoved = len - index - 1;
                if (numMoved == 0)
                    setArray(Arrays.copyOf(elements, len - 1));
                else {
                    Object[] newElements = new Object[len - 1];
                    System.arraycopy(elements, 0, newElements, 0, index);
                    System.arraycopy(elements, index + 1, newElements, index,
                                     numMoved);
                    setArray(newElements);
                }
                return oldValue;
            } finally {
                lock.unlock();
            }
        }
    
    
    get
    private E get(Object[] a, int index) {
            return (E) a[index];
        }
    
    CopyOnWrite的缺点

    CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。

    内存占用问题。

    因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。

    针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。

    数据一致性问题。

    CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。

    CopyOnWriteArrayList为什么并发安全且性能比Vector好?

    我知道Vector是增删改查方法都加了synchronized,保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于Vector,CopyOnWriteArrayList支持读多写少的并发情况。

    5.并发队列Queue(阻塞队列、非阻塞队列)

    为什么要使用队列?
    • 用队列可以在线程间传递数据:生产者消费者模式、银行转账
    • 考虑锁等线程安全问题的重任从“你”转移到了“队列”上
    并发队列简介
    常用队列
    5.1ArrayBlockingQueue
    • 有界
    • 指定容量
    • 公平:还可以指定是需要保证公平,如果想保证公平的话,那么等待了最长时间的线程会被优先处理,不过这会同时带来一定的性能损耗
    public class ArrayBlockingQueueDemo {
    
    
        public static void main(String[] args) {
    
            ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3);
    
            Interviewer r1 = new Interviewer(queue);
            Consumer r2 = new Consumer(queue);
            new Thread(r1).start();
            new Thread(r2).start();
        }
    }
    
    class Interviewer implements Runnable {
    
        BlockingQueue<String> queue;
    
        public Interviewer(BlockingQueue queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            System.out.println("10个候选人都来啦");
            for (int i = 0; i < 10; i++) {
                String candidate = "Candidate" + i;
                try {
                    queue.put(candidate);
                    System.out.println("安排好了" + candidate);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                queue.put("stop");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    class Consumer implements Runnable {
    
        BlockingQueue<String> queue;
    
        public Consumer(BlockingQueue queue) {
    
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String msg;
            try {
                while(!(msg = queue.take()).equals("stop")){
                    System.out.println(msg + "到了");
                }
                System.out.println("所有候选人都结束了");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    10个候选人都来啦
    安排好了Candidate0
    安排好了Candidate1
    安排好了Candidate2
    安排好了Candidate3
    Candidate0到了
    Candidate1到了
    Candidate2到了
    Candidate3到了
    安排好了Candidate4
    安排好了Candidate5
    安排好了Candidate6
    安排好了Candidate7
    Candidate4到了
    Candidate5到了
    安排好了Candidate8
    安排好了Candidate9
    Candidate6到了
    Candidate7到了
    Candidate8到了
    Candidate9到了
    所有候选人都结束了
    
    ArrayBlockingQueue源码
     public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    

    如果当前队列数据等于队里最大容量,就等待,否则,数据加入到队列。

    5.2LinkedBlockingQueue
    • 无界
    • 容量Integer.MAX_VALUE
    • 内部结构:Node、两把锁。分析put方法
    public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                while (count.get() == capacity) {
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    
    5.3 PriorityBlockingQueue

    PriorityBlockingQueue是一个无界队列,它没有限制,在内存允许的情况下可以无限添加元素;它又是具有优先级的队列,是通过构造函数传入的对象来判断,传入的对象必须实现comparable接口。

    • 支持优先级
    • 自然顺序(而不是先进先出)
    • 无界队列
    • PriorityQueue的线程安全版本
    public class Person implements Comparable<Person>{
        private int id;
        private String name;
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public Person(int id, String name) {
            super();
            this.id = id;
            this.name = name;
        }
        public Person() {
        }
        @Override
        public String toString() {
            return this.id + ":" + this.name;
        }
        @Override
        public int compareTo(Person person) {
            return this.id > person.getId() ? 1 : ( this.id < person.getId() ? -1 :0);
        }
    }
    
    public class PriorityBlockingQueueDemo {
        public static void main(String[] args) throws InterruptedException {
            PriorityBlockingQueue<Person> pbq = new PriorityBlockingQueue<>();
            pbq.add(new Person(3,"person3"));
            System.err.println("容器为:" + pbq);
            pbq.add(new Person(2,"person2"));
            System.err.println("容器为:" + pbq);
            pbq.add(new Person(1,"person1"));
            System.err.println("容器为:" + pbq);
            pbq.add(new Person(4,"person4"));
            System.err.println("容器为:" + pbq);
            System.err.println("分割线----------------------------------------------------------------" );
    
    
            System.err.println("获取元素 " + pbq.take().getId());
            System.err.println("容器为:" + pbq);
            System.err.println("分割线----------------------------------------------------------------" );
    
            System.err.println("获取元素 " + pbq.take().getId());
            System.err.println("容器为:" + pbq);
            System.err.println("分割线----------------------------------------------------------------" );
    
            System.err.println("获取元素 " + pbq.take().getId());
            System.err.println("容器为:" + pbq);
            System.err.println("分割线----------------------------------------------------------------" );
    
            System.err.println("获取元素 " + pbq.take().getId());
            System.err.println("容器为:" + pbq);
            System.err.println("分割线----------------------------------------------------------------" );
        }
    }
    
    容器为:[3:person3]
    容器为:[2:person2, 3:person3]
    容器为:[1:person1, 3:person3, 2:person2]
    容器为:[1:person1, 3:person3, 2:person2, 4:person4]
    分割线----------------------------------------------------------------
    获取元素 1
    容器为:[2:person2, 3:person3, 4:person4]
    分割线----------------------------------------------------------------
    获取元素 2
    容器为:[3:person3, 4:person4]
    分割线----------------------------------------------------------------
    获取元素 3
    容器为:[4:person4]
    分割线----------------------------------------------------------------
    获取元素 4
    容器为:[]
    分割线----------------------------------------------------------------
    
    PriorityBlockingQueue源码
     public boolean offer(E e) {
            if (e == null)
                throw new NullPointerException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            int n, cap;
            Object[] array;
            while ((n = size) >= (cap = (array = queue).length))
                tryGrow(array, cap);
            try {
                Comparator<? super E> cmp = comparator;
                if (cmp == null)
                    siftUpComparable(n, e, array);
                else
                    siftUpUsingComparator(n, e, array, cmp);
                size = n + 1;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
            return true;
        }
    
    5.4 SynchronousQueue
    • 它的容量为0
    • 需要注意的是,SynchronousQueue的容量不是1而是0,因为SynchronousQueue不需要去持有元素,它所做的就是直接传递(direct handoff)
    • 效率很高
    public class SynchronousQueueDemo {
        static class SynchronousQueueProducer implements Runnable {
    
            protected BlockingQueue<String> blockingQueue;
            final Random random = new Random();
    
            public SynchronousQueueProducer(BlockingQueue<String> queue) {
                this.blockingQueue = queue;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        String data = UUID.randomUUID().toString();
                        System.out.println("Put: " + data);
                        blockingQueue.put(data);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
        static class SynchronousQueueConsumer implements Runnable {
    
            protected BlockingQueue<String> blockingQueue;
    
            public SynchronousQueueConsumer(BlockingQueue<String> queue) {
                this.blockingQueue = queue;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        String data = blockingQueue.take();
                        System.out.println(Thread.currentThread().getName()
                                + " take(): " + data);
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
        public static void main(String[] args) {
            final BlockingQueue<String> synchronousQueue = new SynchronousQueue<String>();
    
            SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(
                    synchronousQueue);
            new Thread(queueProducer).start();
    
            SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(
                    synchronousQueue);
            new Thread(queueConsumer1).start();
    
            SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(
                    synchronousQueue);
            new Thread(queueConsumer2).start();
    
        }
    }
    
    Put: 1a26d098-cf95-4419-a8d6-892b12e10c0e
    Thread-2 take(): 1a26d098-cf95-4419-a8d6-892b12e10c0e
    Put: 072c059f-cc6a-48d9-bfad-aa7f867b2f23
    Thread-1 take(): 072c059f-cc6a-48d9-bfad-aa7f867b2f23
    Put: ced8b199-9dc6-479b-97e2-1ef811e21cba
    Thread-2 take(): ced8b199-9dc6-479b-97e2-1ef811e21cba
    Put: 368071a7-3301-4b9e-8634-b480aff2de47
    Thread-1 take(): 368071a7-3301-4b9e-8634-b480aff2de47
    Put: cd9c6d2b-c3f7-4603-a411-6ba44eca09b5
    Thread-2 take(): cd9c6d2b-c3f7-4603-a411-6ba44eca09b5
    ......
    

    插入数据的线程和获取数据的线程,交替执行

    源码解析
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (e == null) throw new NullPointerException();
            if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
                return true;
            if (!Thread.interrupted())
                return false;
            throw new InterruptedException();
        }
    
    
    应用场景

    Executors.newCachedThreadPool()

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    

    使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。

    SynchronousQueue注意点
    • SynchronousQueue没有peek等函数,因为peek的含义是取出头结点,但是SynchronousQueue的容量是0,所以连头结点都没有,也就是没有peek方法。同理,没有iterate相关方法
    • 是一个极好的用来直接传递的并发数据结构

    5.5 DelayQueue

    java延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用poll()方法会返回null值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于0来判断。延时队列不能存放空元素。

    延时队列实现了Iterator接口,但iterator()遍历顺序不保证是元素的实际存放顺序。

    DelayedQuene的优先级队列使用的排序方式是队列元素的compareTo方法,优先级队列存放顺序是从小到大的,所以队列元素的compareTo方法影响了队列的出队顺序。

    若compareTo方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。

    public class DelayedQueneDemo {
        public static void main(String[] args) throws InterruptedException {
            Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
            Item item2 = new Item("item2", 10, TimeUnit.SECONDS);
            Item item3 = new Item("item3", 15, TimeUnit.SECONDS);
            DelayQueue<Item> queue = new DelayQueue<>();
            queue.put(item1);
            queue.put(item2);
            queue.put(item3);
            System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
            for (int i = 0; i < 3; i++) {
                Item take = queue.take();
                System.out.format("name:{%s}, time:{%s}\n", take.name, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
            }
        }
    }
    
    class Item implements Delayed {
        /* 触发时间*/
        private long time;
        String name;
    
        public Item(String name, long time, TimeUnit unit) {
            this.name = name;
            this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }
    
        @Override
        public int compareTo(Delayed o) {
            Item item = (Item) o;
            long diff = this.time - item.time;
            if (diff <= 0) {// 改成>=会造成问题
                return -1;
            } else {
                return 1;
            }
        }
    
        @Override
        public String toString() {
            return "Item{" +
                    "time=" + time +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
    
    begin time:2020-03-11T11:19:00.706
    name:{item1}, time:{2020-03-11T11:19:05.659}
    name:{item2}, time:{2020-03-11T11:19:10.659}
    name:{item3}, time:{2020-03-11T11:19:15.659}
    

    5.6 非阻塞并发队列

    ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。

    6.各并发容器总结

    相关文章

      网友评论

        本文标题:Java并发编程 并发容器

        本文链接:https://www.haomeiwen.com/subject/otfmdhtx.html