美文网首页
Redis的基本使用(二) 消息队列

Redis的基本使用(二) 消息队列

作者: 缘木与鱼 | 来源:发表于2021-04-28 10:40 被阅读0次

    Redis的基本使用(二) 消息队列

    使用消息中间件的时候,并非每次都需要非常专业的消息中间件,假如只有一个消息队列,只有一个消费者,那就没有必要去使用专业的消息中间件,这种情况可以直接使用 Redis 来做消息队列。

    Redis 的消息队列不是特别专业,他没有很多高级特性,适用简单的场景,如果对于消息可靠性有着极高的追求,那么不适合使用 Redis 做消息队列。

    1、消息队列

    Redis 做消息队列,使用它里边的 List 数据结构就可以实现,使用 lpush/rpush 操作来实现入队,然后使用 lpop/rpop 来实现出队。

    • lpush将value值从左往右依次插入到列表头部。如果 key 不存在,那么在进行 push 操作前会创建一个空列表。
    • rpush从右往左依次插入到列表头部。
    • rpop移除并返回列表的尾元素。
    • lpop移除并返回列表的头元素。

    在客户端(例如 Java 端),我们会维护一个死循环来不停的从队列中读取消息,并处理,如果队列中有消息,则直

    接获取到,如果没有消息,就会陷入死循环,直到下一次有消息进入,这种死循环会造成大量的资源浪费,这个时候,

    可以使用之前讲的 blpop/brpop 。

    blpop 阻塞式的弹出,相当于 lpop 的阻塞版。

    2、延迟消息队列

    延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到redis 中,然后通过轮询的方式,去不断的读取消息出来。

    首先,如果消息是一个字符串,直接发送即可,如果是一个对象,则需要对对象进行序列化,这里我们
    使用 JSON 来实现序列化和反序列化。

    首先在项目中,添加 JSON 依赖:

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.11.3</version>
    </dependency>
    
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
        <scope>provided</scope>
    </dependency>
    

    接下来,构造一个消息对象:

    @Data
    public class Message {
    
        private String id;
        private Object data;
        
    }
    

    接下来封装一个消息队列:

    public class DelayMsgQueue {
    
        private Jedis jedis;
        private String queue;
    
        public DelayMsgQueue(Jedis jedis, String queue) {
            this.jedis = jedis;
            this.queue = queue;
        }
    
        /**
         * 消息入队
         * @param data 要发送的消息
         */
        public void queue(Object data){
            // 构造一个message
            Message message = new Message();
            message.setId(UUID.randomUUID().toString());
            message.setData(data);
    
            // 序列化
            try {
                String s = new ObjectMapper().writeValueAsString(message);
                System.out.println("message publish:" + s);
    
                // 消息发送, score延迟 5s
                jedis.zadd(queue, System.currentTimeMillis()+5000, s);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    
    
        /**
         * 消息消费
         */
        public void loop(){
            while (!Thread.interrupted()){
                // 读取 score 在0 到当前时间戳之间的消息
                Set<String> zrange = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
    
                if (zrange.isEmpty()){
                    // 如果消息是空的,则休息500 ms 继续读取
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        break;
                    }
                    continue;
                }
                // 如果读取到了消息,则直接取出消息
                String next = zrange.iterator().next();
                if (jedis.zrem(queue, next)>0){
                    // 抢到了,处理业务
                    try {
                        Message message = new ObjectMapper().readValue(next, Message.class);
                        System.out.println("receive message: " + message);
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    }
    

    测试:

    public class DelatMsgTest {
    
        public static void main(String[] args) {
            Redis redis = new Redis();
            redis.execute(Jedis -> {
                // 构造一个消息队列
                DelayMsgQueue queue = new DelayMsgQueue(Jedis, "lucky-delay-query");
                // 构造消息生产者
                Thread provider = new Thread(){
                    @Override
                    public void run() {
                        for (int i = 0; i < 5; i++){
                            queue.queue("lucky>>>" + i);
                        }
                    }
                };
    
                // 构造一个消费者
                Thread consumer = new Thread(){
                    @Override
                    public void run() {
                        for (int i = 0; i < 5; i++){
                            queue.loop();
                        }
                    }
                };
    
                // 启动
                provider.start();
                consumer.start();
    
                // 休息7秒后,停止程序
                try {
                    Thread.sleep(7000);
                    consumer.interrupt();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            });
        }
    
    }
    

    相关文章

      网友评论

          本文标题:Redis的基本使用(二) 消息队列

          本文链接:https://www.haomeiwen.com/subject/qhfvrltx.html