美文网首页
redis实现延时队列

redis实现延时队列

作者: echo不扣 | 来源:发表于2019-04-11 16:57 被阅读0次

    需求是想用redis做一个延时的队列,每次内容必须在一定时间后才能被取出,

    比如说:有未支付订单要在一定时间内关闭,假设为30秒,存入的时候我们使用redis的有序集合进行添加,用当前时间戳加上30秒来排序(zadd),然后每次消费者轮询的时候就只取出开始时间0到当前时间这个时间段(zrangeByScore)

    1.生产类 Producer.java

      import redis.clients.jedis.Jedis;
    
      import redis.clients.jedis.JedisPool;
    
      import redis.clients.jedis.JedisPoolConfig;
    
      import redis.clients.jedis.Transaction;
    
      public class Producer {
    
        static final String QueueName = "delay-queue";
    
        public static void main(String[] args)throws InterruptedException {
    
            JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);
    
            Jedis jedis = pool.getResource();
    
            try {
    
                int count = 0;
    
                while (true) {
    
                    String message = "Message #" + count;
    
                    String key = "foobar:" + count;
    
                    System.out.println("Queueing message: " + message);
    
                    queueMessage(jedis, QueueName, key, message, 5);
    
                   // delete every 5th Action
    
                    if (count != 0 && count % 5 == 0) {
    
                        System.out.println("Deleting msg with id " + count);
    
                        jedis.del(key);
    
                    }
    
                    count += 1;
    
                   Thread.sleep(3000L);
    
                }
    
            } finally {
    
                jedis.close();
    
                pool.destroy();
    
            }
    
        }
    
        private static void queueMessage(Jedis jedis, String queue, String key, String message, Integer delay) {
    
            long time = System.currentTimeMillis() / 1000 + delay;//当前时间的秒数加上要延时的秒数
    
            Transaction t = jedis.multi();
            t.zadd(queue, time, key);
            t.set(key, message);
            t.exec();
    
    }
    

    }

    2.消费者类 Consumer.java

    代码如下:

    public class Consumer {
    
    public static void main(String[] args) throws InterruptedException {
    
        JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);
    
        Jedis jedis = pool.getResource();
    
        try {
    
            while (true) {
    
                getMessages(jedis, Producer.QueueName);
    
                Thread.sleep(1000L);
    
            }
    
        } finally {
    
            jedis.close();
    
            pool.destroy();
    
        }
    
    }
    
    private static void getMessages(Jedis jedis, String queue) {
    
        int startTime = 0;
    
        long endTime = System.currentTimeMillis() / 1000;
    
        Transaction t = jedis.multi();
    
    
        Response<Set<String>> setResponse = t.zrangeByScore(queue, startTime, endTime);//在startTime和endTime之间的数
    
        t.zremrangeByScore(queue, startTime, endTime);//移除所有startTime-endTime中的所有成员
    
        t.exec();
    
        List<String> keys = new ArrayList();
    
        keys.addAll(setResponse.get());//将所有的key添加到list中
    
        String[] keyArray = keys.toArray(new String[keys.size()]);//然后转换成数组
    
        if (keyArray.length > 0) {
    
            Transaction tMessage = jedis.multi();
    
            Response<List<String>>  messageResponse = tMessage.mget(keyArray);//获取多个键值对
    
            tMessage.del(keyArray);
    
            tMessage.exec();
    
            List<String> messages = messageResponse.get();
    
            for (int i = 0; i < messages.size(); i++) {
    
                String key = keys.get(i);
    
                String message = messages.get(i);
    
                System.out.print("Received key: " + key + ". ");
    
                if (message == null) {
    
                    System.out.println("Message for key " + key + " is gone!");
    
                } else {
    
                    System.out.println("Message for key " + key + " is " + message);
    
                }
    
            }
    
        }
    
    }
    

    }

    相关文章

      网友评论

          本文标题:redis实现延时队列

          本文链接:https://www.haomeiwen.com/subject/ckucbqtx.html