美文网首页Springboot收藏
使用单机缓存之王Caffeine实现一个延迟队列

使用单机缓存之王Caffeine实现一个延迟队列

作者: 大哥你先走 | 来源:发表于2022-01-28 10:16 被阅读0次

    延迟队列:顾名思义就是支持将消息按照一定的要求延迟投递的消息队列。生活中需要使用延迟队列最普遍的场景就是订单支付,订单只有在规定的时间内完成支付,交易才算真正的完成,没有在规定时间内完成支付的订单将会被取消。

    Caffeine:一款高性能、接近最优的缓存库。

    Caffeine和延迟队列又有什么联系呢?延迟队列的核心特征就是将消息延迟投递,Caffeine的老化机制刚好可以满足延迟队列的基本要求。Caffeine可以按照时间对存储的值进行老化,不同值的老化时间可以不同,并且Caffeine支持将值的老化信息发送到监听器,利用这一特性就可以实现简单的延迟队列。

    当前已经有很多优秀的延迟队列实现,如果需要延迟队列功能请使用经过考验的软件哦。

    Delay Queue架构

    delay queue.png

    基于Java自带的Blocking Queue和Caffeine实现Delay Queue。Delay Queue只提供两个简单的接口用于读写数据,

    write: 客户端向Delay Queue添加数据时,添加的数据会直接写入Caffeine并根据参数设置老化时间。

    read: 客户端从Delay Queue获取数据,如果有可被获取的数据Delay Queue直接返回位于队列头部的数据,否则将会阻塞客户端直到有可用数据为止。

    event: Caffeine将值的过期事件处理后写入 Blocking Queue。

    接口说明

    write:

    public void write(E element, long delay, TimeUnit unit);
    

    写入element到队列,并设置经过delay的延迟后element才能被read。

    read:

    public E read(long timeout, TimeUnit unit) throws InterruptedException;
    

    尝试从queue读取数据,最多等待timeout时间

    3 代码实现

    3.1 数据封装

    引入写入到queue中的每个数据需要被延迟处理的时间不同,因此再将值写入到Caffeine之前需要简单的封装,封装后的数据包括原始数据和延迟时间等信息。

    private static final class DataWrapper<E> {
        private final E data;
        private final long delay;
        private final TimeUnit unit;
    
        public DataWrapper(E data, long delay, TimeUnit unit) {
            this.data = data;
            this.delay = delay;
            this.unit = unit;
        }
    }
    

    3.2 event处理

    我们需要监听Caffeine对过期数据的处理,并把数据写入到Blocking Queue中。

    private static final class Listener<E> implements RemovalListener<String, DataWrapper<E>> {
        private final BlockingQueue<E> blockingQueue;
    
        public Listener(BlockingQueue<E> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
    
        @Override
        public void onRemoval(@Nullable String s, @Nullable DataWrapper<E> dataWrapper, @NonNull RemovalCause removalCause) {
            try {
                if (Objects.nonNull(dataWrapper)) {
                    blockingQueue.put(dataWrapper.data);
                }
            } catch (InterruptedException e) {
                e.printStackTrace(); // 仅用于测试
            }
        }
    }
    

    3.3 过期策略设置

    对写入到Caffeine中的每一个数据设置一个过期时间,这可以通过Caffeine中的Expiry接口实现

    private Expiry<String, DataWrapper<E>> expiry = new Expiry<>() {
        @Override
        public long expireAfterCreate(@NonNull String key, @NonNull DataWrapper<E> dataWrapper, long currentTime) {
            return dataWrapper.unit.toNanos(dataWrapper.delay);
        }
    
        @Override
        public long expireAfterUpdate(@NonNull String key, @NonNull DataWrapper<E> dataWrapper, long currentTime, @NonNegative long currentDuration) {
            return dataWrapper.unit.toNanos(dataWrapper.delay);
        }
    
        @Override
        public long expireAfterRead(@NonNull String key, @NonNull DataWrapper<E> dataWrapper, long currentTime, @NonNegative long currentDuration) {
            return dataWrapper.unit.toNanos(dataWrapper.delay);
        }
    };
    

    3.4 CaffeineDelayQueue

    在完成3.1-3.3章节的代码编写后,实现一个基于Caffeine的延迟队列就变得十分简单,关键代码如下:

    public class CaffeineDelayQueue<E> {
        private final BlockingQueue<E> blockingQueue;
        private final Cache<String, DataWrapper<E>> scheduler;
    
        public CaffeineDelayQueue(int size) {
            this.blockingQueue = new ArrayBlockingQueue<>(size);
            this.scheduler = Caffeine.newBuilder()
                .expireAfter(expiry)
                .evictionListener(new Listener<>(blockingQueue))
                .scheduler(Scheduler.systemScheduler())
                .build();
        }
    }
    

    4 使用样例

    CaffeineDelayQueue<Integer> queue = new CaffeineDelayQueue<>(1024);
    queue.write(1, 1, TimeUnit.SECONDS);
    queue.write(2, 3, TimeUnit.SECONDS);
    queue.write(3, 2, TimeUnit.SECONDS);
    
    for (; ; ) {
        Integer data = queue.read(200, TimeUnit.MILLISECONDS);
        if (Objects.nonNull(data)) {
            System.out.println(data);
        }
    }
    

    上面样例程序的输出如下:

    1
    3
    2
    

    相关文章

      网友评论

        本文标题:使用单机缓存之王Caffeine实现一个延迟队列

        本文链接:https://www.haomeiwen.com/subject/adqahrtx.html