美文网首页Redis
Redis实现延迟消息队列

Redis实现延迟消息队列

作者: AbstractCulture | 来源:发表于2020-06-12 17:12 被阅读0次

    消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ。

    • 如果只需要实现简单的消息队列,那么借助Redis即可。
    • 如果对消息有着严格的可靠性等要求,那么建议使用专业的MQ.(RocketMQ,Kafka,RabbitMQ)‘

    Redis实现延迟消息队列的思想

    可以借助zset有序集合来实现延迟消息队列。因为zset有一个score,它是可以按这个score来进行排序的,我们可以把时间戳作为zset的score,让它按时间去排序,然后在Java程序中使用轮询或者定时任务来消费里面的消息。这里使用的是点对点的消费模式。
    以下是代码的展示.
    第一步,先创建一个发送消息的对象

    package com.xjm.redis;
    
    public class RedisMessage {
        private String id;
        private Object message;
    
        @Override
        public String toString() {
            return "RedisMessage{" +
                    "id='" + id + '\'' +
                    ", message=" + message +
                    '}';
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public Object getMessage() {
            return message;
        }
    
        public void setMessage(Object message) {
            this.message = message;
        }
    }
    

    第二步,引入序列化工具jackson

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.10.3</version>
    </dependency>
    

    第三步,编写一个消息队列的工具类,主要包含消息入队和消费功能

    package com.xjm.redis;
    
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import redis.clients.jedis.Jedis;
    
    import java.util.Date;
    import java.util.Set;
    import java.util.UUID;
    
    /**
     * 延迟消息队列
     */
    public class DelayMsgQueue {
        private Jedis jedis;
        private String queue;
    
        public DelayMsgQueue(Jedis jedis, String queue) {
            this.jedis = jedis;
            this.queue = queue;
        }
    
        /**
         * 消息入队,要发送的消息
         * @param message
         */
        public void queue(Object message){
             RedisMessage redisMessage = new RedisMessage();
             redisMessage.setId(UUID.randomUUID().toString());
             redisMessage.setMessage(message);
            try {
                //序列化
                String s = new ObjectMapper().writeValueAsString(redisMessage);
                System.out.println("Redis发送消息:"+new Date());
                //消息发送,score延迟5秒
                jedis.zadd(queue,System.currentTimeMillis()+5000,s);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 消息出队(消息消费)
         *
         */
        public void loop(){
            /**
             * 轮询,线程被中断时停止
             */
            while (!Thread.interrupted()){
                //读取score时间在0到当前时间戳的之间的消息,一次一条
                Set<String> message = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
                if (message.isEmpty()){
                    try {
                        //如果消息为空,则线程休眠一段时间
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        break;
                    }
                    continue;
                }
                //如果读取到了消息,则直接加载
                String next = message.iterator().next();
                if(jedis.zrem(queue,next)>0){
                    //抢到了,接下来处理业务
                    try {
                        RedisMessage redisMessage = new ObjectMapper().readValue(next, RedisMessage.class);
                        System.out.println("抢到了!"+new Date());
                        System.out.println(redisMessage.toString());
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    第四步,测试

    package com.xjm.redis;
    
    
    public class DelayMsgTest {
        public static void main(String[] args) {
            Redis redis = new Redis();
            redis.exeute(jedis -> {
                //构造一个消息队列
                DelayMsgQueue delayMsgQueue = new DelayMsgQueue(jedis, "jaymin-delay-queue");
                Runnable producer = new Runnable() {
                    @Override
                    public void run() {
                        for (int i=0;i<5;i++){
                            delayMsgQueue.queue("Java>>>>>"+i);
                        }
                    }
                };
                Thread producerThread = new Thread(producer);
                Runnable customer = new Runnable() {
                    @Override
                    public void run() {
                        delayMsgQueue.loop();
                    }
                };
                Thread customerThread = new Thread(customer);
                producerThread.start();
                customerThread.start();
                try {
                    //休息7秒后,停止程序
                    Thread.sleep(7000);
                    customerThread.interrupt();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            });
    
        }
    }
    

    测试结果如下:


    image.png

    在此鸣谢@江南一点雨,感谢他的redis视频解析.

    相关文章

      网友评论

        本文标题:Redis实现延迟消息队列

        本文链接:https://www.haomeiwen.com/subject/xslntktx.html