美文网首页
Java高并发系列——检视阅读(七)

Java高并发系列——检视阅读(七)

作者: 卡斯特梅的雨伞 | 来源:发表于2021-10-02 17:03 被阅读0次

    Java高并发系列——集合

    JUC中常见的集合

    JUC集合框架图

    img

    图可以看到,JUC的集合框架也是从Map、List、Set、Queue、Collection等超级接口中继承而来的。所以,大概可以知道JUC下的集合包含了一一些基本操作,并且变得线程安全。

    Map

    ConcurrentHashMap

    功能和HashMap基本一致,内部使用红黑树实现的。

    java8的ConcurrentHashMap为何放弃分段锁,为什么要使用CAS+Synchronized取代Segment+ReentrantLock...

    特性:

    1. 迭代结果和存入顺序不一致
    2. key和value都不能为空,否则会空指针,这里和HashMap不一样,HashMap的key和value都可以为空。
    3. 线程安全的
    ConcurrentSkipListMap

    内部使用跳表实现的,放入的元素会进行排序,排序算法支持2种方式来指定:

    1. 通过构造方法传入一个Comparator
    2. 放入的元素实现Comparable接口

    上面2种方式必选一个,如果2种都有,走规则1。

    特性:

    1. 迭代结果和存入顺序不一致
    2. 放入的元素会排序
    3. key和value都不能为空
    4. 线程安全的

    ConcurrentSkipListMap分析和使用

    List

    CopyOnWriteArrayList

    实现List的接口的,一般我们使用ArrayList、LinkedList、Vector,其中只有Vector是线程安全的,可以使用Collections静态类的synchronizedList方法对ArrayList、LinkedList包装为线程安全的List,不过这些方式在保证线程安全的情况下性能都不高。

    CopyOnWriteArrayList是线程安全的List,内部使用数组存储数据集合中多线程并行操作一般存在4种情况:读读、读写、写写、写读,这个只有在写写操作过程中会导致其他线程阻塞,其他3种情况均不会阻塞,所以读取的效率非常高。

    可以看一下这个类的名称:CopyOnWrite,意思是在写入操作的时候,进行一次自我复制,换句话说,当这个List需要修改时,并不修改原有内容(这对于保证当前在读线程的数据一致性非常重要),而是在原有存放数据的数组上产生一个副本,在副本上修改数据,修改完毕之后,用副本替换原来的数组,这样也保证了写操作不会影响读。

    注意:当然,做写操作的时候也是会用Lock加锁保证同步的,因为每次添加都会加锁且重新复制一个数组出来存储,因此,在保证读多写少的情况下,在进行写操作特别是初始化时也尽量调用addAll这些方法来一次性添加一个集合数据来避免不断地复制。

    特性:

    1. 迭代结果和存入顺序一致
    2. 元素不重复
    3. 元素可以为空
    4. 线程安全的
    5. 读读、读写、写读3种情况不会阻塞;写写会阻塞
    6. 无界的

    Set

    ConcurrentSkipListSet

    有序的Set,内部基于ConcurrentSkipListMap实现的,放入的元素会进行排序,排序算法支持2种方式来指定:

    1. 通过构造方法传入一个Comparator
    2. 放入的元素实现Comparable接口

    上面2种方式需要实现一个,如果2种都有,走规则1

    特性:

    1. 迭代结果和存入顺序不一致
    2. 放入的元素会排序
    3. 元素不重复
    4. 元素不能为空
    5. 线程安全的
    6. 无界的
    CopyOnWriteArraySet

    内部使用CopyOnWriteArrayList实现的,将所有的操作都会转发给CopyOnWriteArrayList。

    注意:因为底层是用CopyOnWriteArrayList实现,因此CopyOnWriteArraySet实现元素不重复是用CopyOnWriteArrayList的indexOf方法从头到尾一个一个比较来实现的。

      public boolean addIfAbsent(E e) {
            Object[] snapshot = getArray();
            return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
                addIfAbsent(e, snapshot);
        }
    

    特性:

    1. 迭代结果和存入顺序不一致
    2. 元素不重复
    3. 元素可以为空
    4. 线程安全的
    5. 读读、读写、写读 不会阻塞;写写会阻塞
    6. 无界的

    Queue

    Queue接口中的方法,我们再回顾一下:

    操作类型 抛出异常 返回特殊值
    插入 add(e) offer(e)
    移除 remove() poll()
    检查 element() peek()

    3种操作,每种操作有2个方法,不同点是队列为空或者满载时,调用方法是抛出异常还是返回特殊值,大家按照表格中的多看几遍,加深记忆。

    ConcurrentLinkedQueue

    高效并发队列,内部使用链表实现的。

    特性:

    1. 线程安全的
    2. 迭代结果和存入顺序一致
    3. 元素可以重复
    4. 元素不能为空
    5. 线程安全的
    6. 无界队列
    Deque

    先介绍一下Deque接口,双向队列(Deque)是Queue的一个子接口双向队列是指该队列两端的元素既能入队(offer)也能出队(poll),如果将Deque限制为只能从一端入队和出队,则可实现栈的数据结构。对于栈而言,有入栈(push)和出栈(pop),遵循先进后出原则。

    一个线性 collection,支持在两端插入和移除元素。名称 deque 是“double ended queue(双端队列)”的缩写,通常读为“deck”。大多数 Deque 实现对于它们能够包含的元素数没有固定限制,但此接口既支持有容量限制的双端队列,也支持没有固定大小限制的双端队列。

    此接口定义在双端队列两端访问元素的方法。提供插入、移除和检查元素的方法。每种方法都存在两种形式:一种形式在操作失败时抛出异常,另一种形式返回一个特殊值(nullfalse,具体取决于操作)。插入操作的后一种形式是专为使用有容量限制的 Deque 实现设计的;在大多数实现中,插入操作不能失败。

    下表总结了上述 12 种方法:

    img

    此接口扩展了 Queue接口。在将双端队列用作队列时,将得到 FIFO(先进先出)行为。将元素添加到双端队列的末尾,从双端队列的开头移除元素。从 Queue 接口继承的方法完全等效于 Deque 方法,如下表所示:

    Queue 方法 等效 Deque 方法
    add(e) addLast(e)
    offer(e) offerLast(e)
    remove() removeFirst()
    poll() pollFirst()
    element() getFirst()
    peek() peekFirst()
    ConcurrentLinkedDeque

    实现了Deque接口,内部使用链表实现的高效的并发双端队列

    特性:

    1. 线程安全的
    2. 迭代结果和存入顺序一致
    3. 元素可以重复
    4. 元素不能为空
    5. 线程安全的
    6. 无界队列
    BlockingQueue

    关于阻塞队列,上一篇有详细介绍。

    疑问:

    Q:跳表是什么?

    跳表,是基于链表实现的一种类似“二分”的算法。它可以快速的实现增,删,改,查操作。跳跃列表的平均查找和插入时间复杂度都是O(logn)。跳表是通过维护一个多层次的链表来构建多层索引结构,是一种以空间来换时间的数据结构,多层链表中的每一层链表元素是前一层链表元素的子集。一开始时,算法在最稀疏的层次进行搜索,直至需要查找的元素在该层两个相邻的元素中间。这时,算法将跳转到下一个层次,重复刚才的搜索,直到找到需要查找的元素为止。

    之所以要用跳表,我们可以与单链表进行比较,单链表中查找某个数据的时候需要的时间复杂度为O(n).而我们多个跳表构建多层索引,使得查询效率提升到O(logn)。

    数据结构与算法——跳表

    跳表

    接口性能提升实战篇

    需求:电商app的商品详情页,需要给他们提供一个接口获取商品相关信息:

    1. 商品基本信息(名称、价格、库存、会员价格等)
    2. 商品图片列表
    3. 商品描述信息(描述信息一般是由富文本编辑的大文本信息)

    普通接口实现伪代码如下:

    public Map<String,Object> detail(long goodsId){
        //创建一个map
        //step1:查询商品基本信息,放入map
        map.put("goodsModel",(select * from t_goods where id = #gooldsId#));
        //step2:查询商品图片列表,返回一个集合放入map
        map.put("goodsImgsModelList",(select * from t_goods_imgs where goods_id = #gooldsId#));
        //step3:查询商品描述信息,放入map
        map.put("goodsExtModel",(select * from t_goods_ext where goods_id = #gooldsId#));
        return map;
    }
    

    上面这种写法应该很常见,代码很简单,假设上面每个步骤耗时200ms,此接口总共耗时>=600毫秒

    整个过程是按顺序执行的,实际上3个查询之间是没有任何依赖关系,所以说3个查询可以同时执行,那我们对这3个步骤采用多线程并行执行实现如下:

    示例:

    public class GetProductDetailTest {
    
        //自定义包含策略
        private ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 60,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
                new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
    
        /**
         * 获取商品基本信息
         *
         * @param goodsId 商品id
         * @return 商品基本信息
         * @throws InterruptedException
         */
        public String goodsDetailModel(long goodsId) throws InterruptedException {
            //模拟耗时,休眠200ms
            TimeUnit.MILLISECONDS.sleep(200);
            return "商品id:" + goodsId + ",商品基本信息....";
        }
    
        /**
         * 获取商品图片列表
         *
         * @param goodsId 商品id
         * @return 商品图片列表
         * @throws InterruptedException
         */
        public List<String> goodsImgsModelList(long goodsId) throws InterruptedException {
            //模拟耗时,休眠200ms
            TimeUnit.MILLISECONDS.sleep(200);
            return Arrays.asList("图1", "图2", "图3");
        }
    
        /**
         * 获取商品描述信息
         *
         * @param goodsId 商品id
         * @return 商品描述信息
         * @throws InterruptedException
         */
        public String goodsExtModel(long goodsId) throws InterruptedException {
            //模拟耗时,休眠200ms
            TimeUnit.MILLISECONDS.sleep(200);
            return "商品id:" + goodsId + ",商品描述信息......";
        }
    
        public Map<String,Object> getGoodsDetail(long goodsId) throws ExecutionException, InterruptedException {
            Map<String, Object> result = new HashMap<>();
            Future<String> gooldsDetailModelFuture  = executor.submit(() -> goodsDetailModel(goodsId));
            Future<List<String>> goodsImgsModelFuture = executor.submit(() -> goodsImgsModelList(goodsId));
            //异步获取商品描述信息
            Future<String> goodsExtModelFuture = executor.submit(() -> goodsExtModel(goodsId));
            result.put("gooldsDetailModel", gooldsDetailModelFuture.get());
            result.put("goodsImgsModelList", goodsImgsModelFuture.get());
            result.put("goodsExtModel", goodsExtModelFuture.get());
            return result;
        }
    
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            GetProductDetailTest detailTest = new GetProductDetailTest();
            long starTime = System.currentTimeMillis();
            Map<String, Object> map = detailTest.getGoodsDetail(1L);
            System.out.println(map);
            System.out.println("耗时(ms):" + (System.currentTimeMillis() - starTime));
        }
    }
    

    输出:

    {goodsImgsModelList=[图1, 图2, 图3], gooldsDetailModel=商品id:1,商品基本信息...., goodsExtModel=商品id:1,商品描述信息......}
    耗时(ms):255
    

    可以看出耗时200毫秒左右,性能提升了2倍,假如这个接口中还存在其他无依赖的操作,性能提升将更加显著,上面使用了线程池并行去执行3次查询的任务,最后通过Future获取异步执行结果。

    整个优化过程:

    1. 先列出无依赖的一些操作
    2. 将这些操作改为并行的方式

    总结

    1. 对于无依赖的操作尽量采用并行方式去执行,可以很好的提升接口的性能

    解决微服务日志的痛点

    日志有什么用?

    1. 系统出现故障的时候,可以通过日志信息快速定位问题,修复bug,恢复业务
    2. 提取有用数据,做数据分析使用

    本文主要讨论通过日志来快速定位并解决问题。

    日志存在的痛点

    先介绍一下多数公司采用的方式:目前比较流行的是采用springcloud(或者dubbo)做微服务,按照业务拆分为多个独立的服务,服务采用集群的方式部署在不同的机器上,当一个请求过来的时候,可能会调用到很多服务进行处理,springcloud一般采用logback(或者log4j)输出日志到文件中。当系统出问题的时候,按照系统故障的严重程度,严重的会回退版本,然后排查bug,轻的,找运维去线上拉日志,然后排查问题。

    这个过程中存在一些问题

    1. 日志文件太大太多,不方便查找
    2. 日志分散在不同的机器上,也不方便查找
    3. 一个请求可能会调用多个服务,完整的日志难以追踪(没有完整的链路日志)
    4. 系统出现了问题,只能等到用户发现了,自己才知道(没有报错预警)

    本文要解决上面的几个痛点,构建我们的日志系统,达到以下要求:

    1. 方便追踪一个请求完整的日志
    2. 方便快速检索日志
    3. 系统出现问题自动报警,通知相关人员

    构建日志系统

    方便追踪一个请求完整的日志

    当一个请求过来的时候,可能会调用多个服务,多个服务内部可能又会产生子线程处理业务,所以这里面有两个问题需要解决:

    1. 多个服务之间日志的追踪
    2. 服务内部子线程和主线程日志的追踪,这个地方举个例子,比如一个请求内部需要给10000人发送推送,内部开启10个线程并行处理,处理完毕之后响应操作者,这里面有父子线程,我们要能够找到这个里面所有的日志

    需要追踪一个请求完整日志,我们需要给每个请求设置一个全局唯一编号,可以使用UUID或者其他方式也行。

    多个服务之间日志追踪的问题:当一个请求过来的时候,在入口处生成一个trace_id,然后放在ThreadLocal中,如果内部设计到多个服务之间相互调用,调用其他服务的时,将trace_id顺便携带过去。

    父子线程日志追踪的问题:可以采用InheritableThreadLocal来存放trace_id,这样可以在线程中获取到父线程中的trace_id。

    所以此处我们需要使用InheritableThreadLocal来存储trace_id。

    使用了线程池处理请求的,由于线程池中的线程采用的是复用的方式,所以需要对执行的任务Runable做一些改造 包装。

    public class TraceRunnable implements Runnable {
        private String tranceId;
        private Runnable target;
    
        public TraceRunnable(Runnable target) {
            this.tranceId = TraceUtil.get();
            this.target = target;
        }
    
        @Override
        public void run() {
            try {
                TraceUtil.set(this.tranceId);
                MDC.put(TraceUtil.MDC_TRACE_ID, TraceUtil.get());
                this.target.run();
            } finally {
                MDC.remove(TraceUtil.MDC_TRACE_ID);
                TraceUtil.remove();
            }
        }
    
        public static Runnable trace(Runnable target) {
            return new TraceRunnable(target);
        }
    }
    

    需要用线程池执行的任务使用TraceRunnable封装一下就可以了。

    TraceUtil代码:

    public class TraceUtil {
    
        public static final String REQUEST_HEADER_TRACE_ID = "com.ms.header.trace.id";
        public static final String MDC_TRACE_ID = "trace_id";
    
        private static InheritableThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();
    
        /**
         * 获取traceid
         *
         * @return
         */
        public static String get() {
            String traceId = inheritableThreadLocal.get();
            if (traceId == null) {
                traceId = IDUtil.getId();
                inheritableThreadLocal.set(traceId);
            }
            return traceId;
        }
    
        public static void set(String trace_id) {
            inheritableThreadLocal.set(trace_id);
        }
    
        public static void remove() {
            inheritableThreadLocal.remove();
        }
    
    }
    

    日志输出中携带上trace_id,这样最终我们就可以通过trace_id找到一个请求的完整日志了。

    方便快速检索日志

    日志分散在不同的机器上,如果要快速检索,需要将所有服务产生的日志汇集到一个地方。

    关于检索日志的,列一下需求:

    1. 我们将收集日志发送到消息中间件中(可以是kafka、rocketmq),消息中间件这块不介绍,选择玩的比较溜的就可以了
    2. 系统产生日志尽量不要影响接口的效率
    3. 带宽有限的情况下,发送日志也尽量不要去影响业务
    4. 日志尽量低延次,产生的日志,尽量在生成之后1分钟后可以检索到
    5. 检索日志功能要能够快速响应

    关于上面几点,我们需要做的:日志发送的地方进行改造,引入消息中间件,将日志异步发送到消息中间件中,查询的地方采用elasticsearch日志系统需要订阅消息中间件中的日志,然后丢给elasticsearch建索引,方便快速检索,咱们来一点点的介绍。

    日志发送端的改造

    日志是由业务系统产生的,一个请求过来的时候会产生很多日志,日志产生时,我们尽量减少日志输出对业务耗时的影响,我们的过程如下:

    1. 业务系统内部引用一个线程池来异步处理日志,线程池内部可以使用一个容量稍微大一点的阻塞队列
    2. 业务系统将日志丢给线程池进行处理
    3. 线程池中将需要处理的日志先压缩一下,然后发送至mq

    线程池的使用可以参考:JAVA线程池,这一篇就够了

    引入mq存储日志

    业务系统将日志先发送到mq中,后面由其他消费者订阅进行消费。日志量比较大的,对mq的要求也比较高,可以选择kafka,业务量小的,也可以选取activemq。

    使用elasticsearch来检索日志

    elasticsearch(以下简称es)是一个全文检索工具,具体详情可以参考其官网相关文档。使用它来检索数据效率非常高。日志系统中需要我们开发一个消费端来拉取mq中的消息,将其存储到es中方便快速检索,关于这块有几点说一下:

    1. 建议按天在es中建立数据库,日志量非常大的,也可以按小时建立数据库。查询的时候,时间就是必选条件了,这样可以快速让es定位到日志库进行检索,提升检索效率
    2. 日志常见的需要收集的信息:trace_id、时间、日志级别、类、方法、url、调用的接口开始时间、调用接口的结束时间、接口耗时、接口状态码、异常信息、日志信息等等,可以按照这些在es中建立索引,方便检索。
    日志监控报警——可自定义配置报警

    日志监控报警是非常重要的,这个必须要有,日志系统中需要开发监控报警功能,这块我们可以做成通过页面配置的方式,支持报警规则的配置,如日志中产生了某些异常、接口响应时间大于多少、接口返回状态码404等异常信息的时候能够报警,具体的报警可以是语音电话、短信通知、钉钉机器人报警等等,这些也做成可以配置的

    日志监控模块从mq中拉取日志,然后去匹配我们启用的一些规则进行报警。

    日志处理结构图如下:
    image.png

    高并发中常见的限流方式

    常见的限流的场景

    1. 秒杀活动,数量有限,访问量巨大,为了防止系统宕机,需要做限流处理
    2. 国庆期间,一般的旅游景点人口太多,采用排队方式做限流处理
    3. 医院看病通过发放排队号的方式来做限流处理。

    常见的限流算法

    1. 通过控制最大并发数来进行限流
    2. 使用漏桶算法来进行限流
    3. 使用令牌桶算法来进行限流

    通过控制最大并发数来进行限流

    以秒杀业务为例,10个iphone,100万人抢购,100万人同时发起请求,最终能够抢到的人也就是前面几个人,后面的基本上都没有希望了,那么我们可以通过控制并发数来实现,比如并发数控制在10个,其他超过并发数的请求全部拒绝,提示:秒杀失败,请稍后重试。

    单机中的JUC中提供了这样的工具类:Semaphore:如果是集群,则可以用redis或者zk代替Semaphore

    示例:

    public class MaxAccessLimiter {
    
        private static Semaphore limiter = new Semaphore(5);
        //自定义包含策略
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 60,
                TimeUnit.SECONDS, new SynchronousQueue(),
                new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) {
            for (int i = 0; i < 20; i++) {
                executor.submit(() -> {
                    boolean flag = false;
                    try {
                        flag = limiter.tryAcquire(100, TimeUnit.MICROSECONDS);
                        if (flag) {
                            //休眠2秒,模拟下单操作
                            System.out.println(Thread.currentThread() + ",尝试下单中。。。。。");
                            TimeUnit.SECONDS.sleep(2);
                        } else {
                            System.out.println(Thread.currentThread() + ",秒杀失败,请稍微重试!");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        if (flag) {
                            limiter.release();
                        }
                    }
                });
            }
               executor.shutdown();
        }
    }
    

    输出:

    Thread[From DemoThreadFactory's 订单创建组-Worker-1,5,main],尝试下单中。。。。。
    Thread[From DemoThreadFactory's 订单创建组-Worker-2,5,main],尝试下单中。。。。。
    Thread[From DemoThreadFactory's 订单创建组-Worker-3,5,main],尝试下单中。。。。。
    Thread[From DemoThreadFactory's 订单创建组-Worker-4,5,main],尝试下单中。。。。。
    Thread[From DemoThreadFactory's 订单创建组-Worker-5,5,main],尝试下单中。。。。。
    Thread[From DemoThreadFactory's 订单创建组-Worker-9,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-14,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-16,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-17,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-18,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-20,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-12,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-11,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-7,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-8,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-6,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-10,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-19,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-15,5,main],秒杀失败,请稍微重试!
    Thread[From DemoThreadFactory's 订单创建组-Worker-13,5,main],秒杀失败,请稍微重试!
    

    使用漏桶算法来进行限流

    国庆期间比较火爆的景点,人流量巨大,一般入口处会有限流的弯道,让游客进去进行排队,排在前面的人,每隔一段时间会放一拨进入景区。排队人数超过了指定的限制,后面再来的人会被告知今天已经游客量已经达到峰值,会被拒绝排队,让其明天或者以后再来,这种玩法采用漏桶限流的方式。

    漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

    漏桶算法示意图:

    image.png

    示例:代码中BucketLimit.build(10, 60, TimeUnit.MINUTES);创建了一个容量为10,流水为60/分钟的漏桶。

    public class BucketLimitTest {
    
        public static class BucketLimit {
            static AtomicInteger threadNum = new AtomicInteger(1);
            //容量
            private int capcity;
            //流速
            private int flowRate;
            //流速时间单位
            private TimeUnit flowRateUnit;
            private BlockingQueue<Node> queue;
            //漏桶流出的任务时间间隔(纳秒)
            private long flowRateNanosTime;
    
            public BucketLimit(int capcity, int flowRate, TimeUnit flowRateUnit) {
                this.capcity = capcity;
                this.flowRate = flowRate;
                this.flowRateUnit = flowRateUnit;
                this.bucketThreadWork();
            }
    
            //漏桶线程
            public void bucketThreadWork() {
                this.queue = new ArrayBlockingQueue<Node>(capcity);
                //漏桶流出的任务时间间隔(纳秒)
                this.flowRateNanosTime = flowRateUnit.toNanos(1) / flowRate;
                System.out.println(TimeUnit.NANOSECONDS.toSeconds(this.flowRateNanosTime));
                Thread thread = new Thread(this::bucketWork);
                thread.setName("漏桶线程-" + threadNum.getAndIncrement());
                thread.start();
            }
    
            //漏桶线程开始工作
            public void bucketWork() {
                while (true) {
                    Node node = this.queue.poll();
                    if (Objects.nonNull(node)) {
                        //唤醒任务线程
                        LockSupport.unpark(node.thread);
                    }
                    //阻塞当前线程,最长不超过nanos纳秒
                    //休眠flowRateNanosTime
                    LockSupport.parkNanos(this.flowRateNanosTime);
                }
            }
    
            //返回一个漏桶
            public static BucketLimit build(int capcity, int flowRate, TimeUnit flowRateUnit) {
                if (capcity < 0 || flowRate < 0) {
                    throw new IllegalArgumentException("capcity、flowRate必须大于0!");
                }
                return new BucketLimit(capcity, flowRate, flowRateUnit);
            }
    
            //当前线程加入漏桶,返回false,表示漏桶已满;true:表示被漏桶限流成功,可以继续处理任务
            public boolean acquire() {
                Thread thread = Thread.currentThread();
                Node node = new Node(thread);
                if (this.queue.offer(node)) {
                    LockSupport.park();
                    return true;
                }
                return false;
            }
    
            //漏桶中存放的元素
            class Node {
                private Thread thread;
    
                public Node(Thread thread) {
                    this.thread = thread;
                }
            }
        }
        //自定义包含策略
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(15, 15, 60,
                TimeUnit.SECONDS, new SynchronousQueue(),
                new DemoThreadFactory("订单创建组"), new ThreadPoolExecutor.AbortPolicy());
        public static void main(String[] args) {
            //容量为10,流速为1个/秒,即60/每分钟
            BucketLimit bucketLimit = BucketLimit.build(10, 60, TimeUnit.MINUTES);
            for (int i = 0; i < 15; i++) {
                executor.submit(() -> {
                    boolean acquire = bucketLimit.acquire();
                    System.out.println(Thread.currentThread().getName()+ " ," +System.currentTimeMillis() + " " + acquire);
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
    

    输出:

    From DemoThreadFactory's 订单创建组-Worker-11 ,1599545066963 false
    From DemoThreadFactory's 订单创建组-Worker-12 ,1599545066963 false
    From DemoThreadFactory's 订单创建组-Worker-13 ,1599545066963 false
    From DemoThreadFactory's 订单创建组-Worker-14 ,1599545066964 false
    From DemoThreadFactory's 订单创建组-Worker-15 ,1599545066964 false
    From DemoThreadFactory's 订单创建组-Worker-3 ,1599545067961 true
    From DemoThreadFactory's 订单创建组-Worker-1 ,1599545068962 true
    From DemoThreadFactory's 订单创建组-Worker-2 ,1599545069963 true
    From DemoThreadFactory's 订单创建组-Worker-4 ,1599545070964 true
    From DemoThreadFactory's 订单创建组-Worker-5 ,1599545071965 true
    From DemoThreadFactory's 订单创建组-Worker-6 ,1599545072966 true
    From DemoThreadFactory's 订单创建组-Worker-7 ,1599545073966 true
    From DemoThreadFactory's 订单创建组-Worker-8 ,1599545074967 true
    From DemoThreadFactory's 订单创建组-Worker-9 ,1599545075967 true
    From DemoThreadFactory's 订单创建组-Worker-10 ,1599545076968 true
    

    使用令牌桶算法来进行限流

    令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求从原理上看,令牌桶算法和漏桶算法是相反的,一个“进水”,一个是“漏水”。这种算法可以应对突发程度的请求,因此比漏桶算法好。

    令牌桶算法示意图:

    image.png

    限流工具类RateLimiter

    Google开源工具包Guava提供了限流工具类RateLimiter,可以非常方便的控制系统每秒吞吐量.

    示例:RateLimiter.create(5)创建QPS为5的限流对象,后面又调用rateLimiter.setRate(10);将速率设为10,输出中分2段,第一段每次输出相隔200毫秒,第二段每次输出相隔100毫秒,可以非常精准的控制系统的QPS。

    public class RateLimiterTest {
    
        public static void main(String[] args) {
            //permitsPerSecond=1 即QPS=1
            RateLimiter rateLimiter = RateLimiter.create(1);
            for (int i = 0; i < 10; i++) {
                //调用acquire会根据QPS计算需要睡眠多久,返回耗时时间
                double acquire = rateLimiter.acquire();
                System.out.println(System.currentTimeMillis()+"耗时"+acquire);
            }
            System.out.println("----------");
            //可以随时调整速率,我们将qps调整为10
            rateLimiter.setRate(10);
            for (int i = 0; i < 10; i++) {
                //rateLimiter.acquire();
                double acquire = rateLimiter.acquire();
                System.out.println(System.currentTimeMillis()+"耗时"+acquire);
            }
        }
    }
    

    输出:

    1599545866820耗时0.0
    1599545867820耗时0.998552
    1599545868819耗时0.997836
    1599545869820耗时0.999819
    1599545870820耗时0.998723
    1599545871819耗时0.999232
    1599545872819耗时0.999328
    1599545873819耗时1.000024
    1599545874819耗时0.99995
    1599545875820耗时0.999597
    ----------
    1599545876819耗时0.998575
    1599545876920耗时0.099593
    1599545877020耗时0.098779
    1599545877119耗时0.098661
    1599545877220耗时0.099558
    1599545877319耗时0.098965
    1599545877419耗时0.099139
    1599545877520耗时0.099768
    1599545877620耗时0.098729
    1599545877720耗时0.0986
    

    相关文章

      网友评论

          本文标题:Java高并发系列——检视阅读(七)

          本文链接:https://www.haomeiwen.com/subject/dpujnltx.html