美文网首页多线程
Java 并发特性 延迟队列/StampedLock/并发累加器

Java 并发特性 延迟队列/StampedLock/并发累加器

作者: 运气爆棚lsw | 来源:发表于2022-04-25 16:09 被阅读0次

    Java 并发特性 延迟队列/StampedLock/并发累加器/Bit Set/Phaser

    1.延迟队列

    在 Java 中有类型众多的集合。那么你使用过 DelayQueue 吗?它是一个特殊类型的 Java 集合
    允许我们根据元素的延迟时间对其进行排序。尽管 DelayQueue 类是 Java 集合的成员之一,但是它位于 java.util.concurrent 包中。它实现了 BlockingQueue 接口。只有当元素的时间过期时,才能从队列中取出
    要使用这个集合,首先,我们的类需要实现 Delayed 接口的 getDelay 方法。当然,它不一定必须是类,也可以是 Java Record

    package com.example.demo.back.vo;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    
    /**
     * @author lisanwei24282
     */
    @Slf4j
    public class DelayedEvent implements Delayed {
    
        /**
         * 开始时间
         */
        public long startTime;
        /**
         * 数据参数
         */
        public String msg;
    
        public DelayedEvent(long startTime, String msg) {
            this.startTime = startTime;
            this.msg = msg;
        }
    
        @Override
        public int compareTo(Delayed o) {
            return (int) (this.startTime - ((DelayedEvent) o).startTime);
        }
    
    
        @Override
        public long getDelay(TimeUnit unit) {
            long diff = startTime - System.currentTimeMillis();
            return unit.convert(diff, TimeUnit.MILLISECONDS);
        }
    
        /**
         * 假设我们想要把元素延迟 10 秒钟,那么我们只需要在 DelayedEvent 类上将时间设置成当前时间加上 10 秒钟即可
         *
         * @param args
         * @throws InterruptedException
         */
        public static void main(String[] args) throws InterruptedException {
            long start = System.currentTimeMillis();
    
            final DelayQueue<DelayedEvent> delayQueue = new DelayQueue<>();
            final long timeFirst = System.currentTimeMillis() + 10000;
            delayQueue.offer(new DelayedEvent(timeFirst, "1"));
    
    
            log.info(delayQueue.take().msg);
            long end = System.currentTimeMillis();
            log.info(" ========= 执行完毕 Done,消耗时间:{} ==============", end - start);
        }
    
    
    }
    

    2.StampedLock

    我认为Java Concurrent 是非常棒的 Java 包,当开发人员主要使用 web 框架的时候更是如此。我们有多少人曾经在 Java 中使用过锁呢?锁是一种比 synchronized 块更灵活的线程同步机制。从 Java 8 开始,我们可以使用一种叫做 StampedLock 的新锁。StampedLock 是 ReadWriteLock 的一个替代方案。它允许对读操作进行乐观的锁定。而且,它的性能ReentrantReadWriteLock 更好。

    假设我们有两个线程。第一个线程更新一个余额数据,而第二个线程则读取最新余额的当前值。为了更新余额,我们当然需要先读取其当前值。在这里,我们需要某种同步机制,假设第一个线程在同一时间内多次运行。第二个线程阐述了如何使用乐观锁来进行数据读取

    package com.example.demo.back.vo;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.locks.StampedLock;
    
    public class Tet {
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Balance {
            private Long amount;
        }
    
        public static void main(String[] args) {
            StampedLock lock = new StampedLock();
    
            /* 对象初始化数据 */
            Balance b = new Balance(10000L);
    
    
            // 读锁
            Runnable w = () -> {
                /* 加锁 */
                long stamp = lock.writeLock();
                try {
                    b.setAmount(b.getAmount() + 1000);
                    System.out.println("Write: " + b.getAmount());
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    /* 释放锁 */
                    lock.unlockWrite(stamp);
                }
            };
    
            // 写锁
            Runnable r = () -> {
                /* 尝试获取锁 */
                long stamp = lock.tryOptimisticRead();
    
                if (!lock.validate(stamp)) {
                    stamp = lock.readLock();
                    try {
                        System.out.println("读取成功============= Read: " + b.getAmount());
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlockRead(stamp);
                    }
                } else {
                    System.out.println("Optimistic read fails 正在写: 乐观锁读取失败");
                }
            };
    
    
            // 创建线程池
            ExecutorService executor = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 50; i++) {
                executor.submit(w);
            }
            for (int i = 0; i < 50; i++) {
                executor.submit(r);
            }
            executor.shutdown();
    
        }
    }
    
    

    3.并发累加器

    在 Java Concurrent 包中,有一个并发累加器(concurrent accumulator)。我们也有并发的加法器(concurrent adder),但它们的功能非常类似。LongAccumulator(我们也有 DoubleAccumulator)会使用一个提供给它的函数更新一个值。在很多场景下,它能让我们实现无锁的算法。当多个线程更新一个共同的值的时候,它通常会比 AtomicLong 更合适。

    我们看一下它是如何运行的。要创建它,我们需要在构造函数中设置两个参数。第一个参数是一个用于计算累加结果的函数。
    通常情况下,我们会使用 sum 方法。第二个参数表示累积器的初始值。

    现在,让我们创建一个初始值为 10000 的 LongAccumulator,然后从多个线程调用 accumulate() 方法。最后的结果是什么呢?如果你回想一下的话,我们做的事情和上一节完全一样,但这一次没有任何锁

    package com.example.demo.back.vo;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.LongAccumulator;
    
    /**
     * @author lisanwei
     */
    public class Asd {
    
        public static void main(String[] args) throws InterruptedException {
            LongAccumulator balance = new LongAccumulator(Long::sum, 10000L);
            Runnable w = () -> balance.accumulate(1000L);
    
            ExecutorService executor = Executors.newFixedThreadPool(50);
            for (int i = 0; i < 50; i++) {
                executor.submit(w);
            }
    
            executor.shutdown();
            if (executor.awaitTermination(1000L, TimeUnit.MILLISECONDS))
                System.out.println("Balance: " + balance.get());
            assert balance.get() == 60000L;
        }
    }
    
    

    4.数组的二分查找

    假设我们想在排序的数组中插入一个新的元素。如果数组中已经包含该元素的话,Arrays.binarySearch() 会返回该搜索键的索引,否则,它返回一个插入点,我们可以用它来计算新键的索引:-(insertion point)-1。此外,在 Java 中,binarySearch 方法是在一个有序数组中查找元素的最简单和最有效的方法。

    参考下面例子,如果我们有输入一个数组,其中有四个元素,按升序排列。我们想在这个数组中插入数字 3,下面的代码展示了如何计算插入点的索引

    package com.example.demo.back.vo;
    
    import java.util.Arrays;
    
    /**
     * @author lisanwei24282
     */
    public class ArraysRes {
        public static void main(String[] args) {
            int[] t = new int[]{1, 2, 4, 5};
    
            // 它返回一个插入点
            int x = Arrays.binarySearch(t, 3);
            System.out.println("x = " + x);
            Integer cac1 = cacPoint(x);
            System.out.println("cac = " + cac1);
    
            assert ~x == 2;
    
            // 它返回一个插入点
            int y = Arrays.binarySearch(t, 6);
            System.out.println("y = " + y);
            Integer cac2 = cacPoint(y);
            System.out.println("cac2 = " + cac2);
    
            // 会返回该搜索键的索引
            int z = Arrays.binarySearch(t, 2);
            System.out.println("z = " + z);
    
        }
    
        /**
         * 计算方式
         *
         * @param point 插入点返回
         * @return 结果位置
         */
        public static Integer cacPoint(Integer point) {
            return -(point) - 1;
        }
    }
    
    

    4.Bit Set

    如果我们需要对 bit 数组进行一些操作该怎么办呢?你是不是会使用 boolean[] 来实现呢?其实,有一种更有效、更节省内存的方法来实现。这就是 BitSet 类。BitSet 类允许我们存储和操作 bit 的数组。与 boolean[] 相比,它消耗的内存要少 8 倍。我们可以对数组进行逻辑操作,例如:and、or、xor。

    比方说,有两个 bit 的数组, 我们想对它们执行 xor 操作。为了做到这一点,我们需要创建两个 BitSet 的实例,并在实例中插入样例元素,如下所示。最后,对其中一个 BitSet 实例调用 xor 方法,并将第二个 BitSet 实例作为参数。

    package com.example.demo.back.vo;
    
    import java.util.BitSet;
    
    /**
     * @author lisanwei
     */
    public class ArraysRes {
        public static void main(String[] args) {
            BitSet bs1 = new BitSet();
            bs1.set(0);
            bs1.set(2);
            bs1.set(4);
            System.out.println("bs1 : " + bs1);
    
            BitSet bs2 = new BitSet();
            bs2.set(1);
            bs2.set(2);
            bs2.set(3);
            System.out.println("bs2 : " + bs2);
    
            bs2.xor(bs1);
            System.out.println("xor: " + bs2);
            
            // 打印结果:
            // bs1 : {0, 2, 4}
            // bs2 : {1, 2, 3}
            // xor: {0, 1, 3, 4}
        }
    }
    
    

    BitSet的高阶使用
    可参考我另一篇文章,BitSet的高阶使用

    5.Phaser

    最后我们介绍本文最后一个有趣的 Java 特性。和其他一些样例一样,它也是 Java Concurrent 包的元素,被称为 Phaser。它与更知名的 CountDownLatch 相当相似。然而它提供了一些额外的功能。它允许我们设置在继续执行之前需要等待的线程的动态数量。在 Phaser 中,已定义数量的线程需要在进入下一步执行之前在屏障上等待。得益于此,我们可以协调多个阶段的执行。

    在下面的例子中,我们设置了一个具有 50 个线程的屏障,在进入下一个执行阶段之前,需要到达该屏障。然后,我们创建一个线程,在 Phaser 实例上调用 arriveAndAwaitAdvance() 方法。它会一直阻塞线程,直到所有的 50 个线程都到达屏障。然后,它进入 phase-1,同样会再次调用 arriveAndAwaitAdvance() 方法。

    package com.example.demo.back.vo;
    
    import java.util.BitSet;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Phaser;
    
    /**
     * @author lisanwei
     */
    public class ArraysRes {
        public static void main(String[] args) {
            Phaser phaser = new Phaser(50);
    
            /* 启动执行*/
            Runnable r = () -> {
    
                System.out.println(" 到达第一层屏障 --------- phase-0");
                phaser.arriveAndAwaitAdvance();
    
                System.out.println(" 到达第二层屏障 --------- phase-0");
                System.out.println("phase-1");
                phaser.arriveAndAwaitAdvance();
    
                System.out.println(" 到达第三层屏障 --------- phase-0");
                System.out.println("phase-2");
                phaser.arriveAndDeregister();
            };
    
            ExecutorService executor = Executors.newFixedThreadPool(50);
            for (int i = 0; i < 50; i++) {
                executor.submit(r);
            }
            executor.shutdown();
        }
    }
    
    

    Phaser适合业务场景需要等待线程使用的地方,或者所有线程执行一个任务,只要有一个线程执行成功就得出结果的场景

    相关文章

      网友评论

        本文标题:Java 并发特性 延迟队列/StampedLock/并发累加器

        本文链接:https://www.haomeiwen.com/subject/yfwfyrtx.html