美文网首页redis
Redis做轻量级消息队列的3种玩法

Redis做轻量级消息队列的3种玩法

作者: 肥兔子爱豆畜子 | 来源:发表于2022-01-11 14:11 被阅读0次

    Redis也可以做轻量级的消息队列:基于List的队列模式、PubSub多播的发布订阅模式、以及5.0之后提供的Stream。
    特别是Stream,可以消息持久化、高可用、消息可以指定offset进行反复消费、支持发布订阅按消费组多播、使用消费者的pending_ids机制保证消息传递的可靠性等等,基本已经具有了真正的消息队列中间件的功能了。而PubSub模式在一些对消息传递可靠性和可追溯性不严格的场景(比如内网非一致性的消息通知)也有一定的使用价值。

    一、快速入门

    1、用list类型模拟队列

    典型的队列模式,rpush右边放入,lpop左边取出。举例:

    127.0.0.1:7001> rpush queue A
    -> Redirected to slot [13011] located at 122.51.112.187:7003
    (integer) 1
    122.51.112.187:7003> rpush queue B
    (integer) 2
    122.51.112.187:7003> rpush queue C
    (integer) 3
    122.51.112.187:7003> lpop queue
    "A"
    122.51.112.187:7003> lpop queue
    "B"
    122.51.112.187:7003> lpop queue
    "C"
    122.51.112.187:7003> lpop queue
    (nil)
    
    2、PUB|SUB

    PubSub解决了list做队列1个消息只能被单个消费者消费的问题,可以1个消息被多个消费组的消费组收到,即所谓的发布订阅模式。

    用redis-cli先来用一下:

    127.0.0.1:7001> subscribe testTopic
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "testTopic"
    3) (integer) 1
    

    返回subscript testTopic 1意思是订阅testTopic成功,然后阻塞等待消息,再开一个终端连接进行publish:

    127.0.0.1:7001> publish testTopic msg1
    (integer) 1
    

    然后订阅者侧连接收到消息:

    1) "message"
    2) "testTopic"
    3) "msg1"
    

    message testTopic msg1,意思是收到了message,主题是testTopic,内容是msg1

    可以同时订阅多个主题,

    subscribe testTopic1 testTopic2 testTopic3
    

    也可以按照模式匹配的方式来订阅主题psubscribe,比如订阅所有以test开头的主题

    psubscribe test*
    

    PubSub的消息没有持久化,发布消息之后如果没有消费者、那么消息会直接丢弃,如果消费者刚好此时宕机、重启后也不会收到宕机时发布的消息,所以PubSub来做消息队列的场景十分有限。

    Redis5.0之后提供了更强大的Redis Stream数据结构,解决了上述问题。

    3、Redis Stream

    发布订阅模式,一个消息发布到stream里,多个消费组订阅了这个stream的话都可以收到这个消息,且消息是持久化的。消费组在stream里用last_delivered_id来定位偏移量、也就是消费到哪个消息了。从设计上看确实跟kafka非常像,Redis作者也说借鉴了kafka。

    从数据结构上来说,stream是个链表,节点是消息,消息有ID,消息内容是一系列的k-v对。stream也有消费组和消费者的概念,每个消费者组用last_delivered_id游标指向链表中的节点、来表示消费到哪个ID的消息了,类似kafka里的offset偏移量,每个消费者组里可以有多个消费者,一个消息只会被投递给消费者组里的一个消费者,可以类比RocketMQ里的集群消息。另外,每个消费者内部还有一个 pending_ids数组,它记录着这个消费者已经被客户端读取了的、但客户端没有回复ACK的消息,由此确保消息至少被消费1次。

    添加消息和直接读取消息命令:

    xadd testTopic * name douchuzi  #向testTopic这个stream添加"name douchuzi"这个消息,*表示消息ID由Redis自动生成
    

    消息ID可以由Redis自动生成,生成的ID类似"1641816045243-0",表示1641816045243这个时间戳这1ms生成的第0个消息。当然消息ID也可以由客户端自己生成。

    xlen testTopic #查看有多少消息
    xrange testTopic - + #从头到尾返回所有消息
    xread count 1 streams testTopic 0 #从ID大于0处开始读取1个消息
    xread count 1 block 0 streams testTopic $ #阻塞的从ID大于$处开始读取1个消息,$表示最后一个消息的ID
    

    消费组命令:

    xgroup create testTopic group1 $ #创建消费组group1,其last_delivered_id是$
    

    xgroup的完整玩法:

    xgroup [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
    
    #让消费者consumer1-1从group1消费者组的testTopic stream中拿最新的、并且没有被发送给其他消费者处理的entry
    xreadgroup group group1 consumer1-1 count 1 block 0 streams testTopic >
    

    发送ACK,刚才阻塞xreadgroup之后从别的客户端xadd了1条消息,然后xreadgroup阻塞结束,消息收到。这时候我们看下consumer1-1的pending_ids:

    127.0.0.1:7001> xpending testTopic group1 - + 1000  consumer1-1
    1) 1) "1641819182032-0"
       2) "consumer1-1"
       3) (integer) 695470
       4) (integer) 1
    

    xpending查询消费组group1中消费者consumer1-1的1000条未收到客户端回复的消息。

    接着我们xack回复一下,再看看pending_ids:

    127.0.0.1:7001> xack testTopic group1 1641819182032-0
    (integer) 1
    127.0.0.1:7001> xpending testTopic group1 - + 1000  consumer1-1
    (empty array)
    

    可见,xack之后,consumer1-1的pending_ids为空了,Redis Stream用这个办法来确保消息一定被投递。

    二、实战开发:基于SpringBoot开发Redis消息队列

    下面具体实战一下,用SpringBoot来做Redis的消息队列开发,笔者使用的SpringBoot版本是2.3.7.RELEASE,其默认的客户端是lettuce 5.3.5.RELEASE,测试所用的Redis为6节点Cluster,可以参照笔者的文章Redis分布式缓存搭建 - 简书 (jianshu.com)进行搭建。

    1、PubSub

    发布消息,比较简单,直接用RedisTemplate:

    @Service
    public class RedisDao {
        
        @Autowired
        RedisTemplate<String, Object> redisTemplate;
        
        /**
         * PubSub发布消息
         * */
        public void publishMessage(ChannelTopic channelTopic, String message) {
            redisTemplate.convertAndSend(channelTopic.getTopic(), message);
        }
    }
    

    订阅消息,使用Spring提供的消息容器RedisMessageListenerContainer以及消息到达监听接口MessageListener

    //配置PubSub消息容器
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory());
        
        /**这里可以使用自定义注解来发现所有的MessageHandler,
         * 然后循环container.addMessageListener来达到自动配置消息订阅者的目的
         * 这样开发只需要编写MessageHandler的实现类就可以了
         */
        MessageHandler handler = new MessageHandlerImpl();
        container.addMessageListener(new MessageListener() {
    
            @Override
            public void onMessage(Message message, byte[] pattern) {
                handler.handleMsg(message);
            }
            
        }, handler.getChannelTopic());
        
        return container;
    }
    

    MessageHandler接口及其实现是MessageHandlerImpl业务层代码:

    /**
     * PubSub订阅消息
     * 
     * */
    public interface MessageHandler {
        
        //订阅消息到达后的逻辑处理
        public void handleMsg(Message msg);
        
        //消息的Topic
        public ChannelTopic getChannelTopic();
    }
    
    @Slf4j
    public class MessageHandlerImpl implements MessageHandler{
    
        @Override
        public void handleMsg(Message msg) {
            
            try {   
                String msgChannel = new String(msg.getChannel(), "utf-8");
                String msgBody = new String(msg.getBody(), "utf-8");
                log.info("收到消息:");
                log.info("Message channel : {}" , msgChannel);
                log.info("Message body : {}" , msgBody);
                
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public ChannelTopic getChannelTopic() {
            return new ChannelTopic("testPubSub");
        }
    
    }
    

    另外需要注意,Redis Cluster模式下如果客户端用的是Lettuce,需要配置客户端自适应刷新,在集群主备故障切换的时候、客户端能够自动切换到故障主节点对应的从节点去。详见笔者的文章Redis分布式缓存搭建 - 简书 (jianshu.com)

    好了,测试一下:

    @Slf4j
    @SpringBootApplication
    public class RedismqApplication {
    
        public static void main(String[] args) {
            ConfigurableApplicationContext context = SpringApplication.run(RedismqApplication.class, args);
            RedisDao redisDao = context.getBean(RedisDao.class);
            redisDao.publishMessage(new ChannelTopic("testPubSub"), "肥兔子爱豆畜子");
        }
    
    }
    

    收到消息:
    Message channel : testPubSub
    Message body : 肥兔子爱豆畜子

    2、Redis Stream

    跟PubSub类似,也是需要消息容器MessageContainer、Listener这俩东西。

    /**
     * 发送消息到指定stream
     * */
    public void publishStreamMessage(String stream, Object message) {
        ObjectRecord<String, String> record =                 StreamRecords.newRecord().ofObject(JSON.toJSONString(message)).withStreamKey(stream);
        RecordId recordId = stringRedisTemplate.opsForStream().add(record);
        log.info("消息已发送,消息ID:{}" , recordId.getValue());
    }
    

    消息监听,实现StreamListener接口:

    import org.springframework.data.redis.connection.stream.ObjectRecord;
    import org.springframework.data.redis.connection.stream.RecordId;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.data.redis.stream.StreamListener;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class StreamMessageListener implements StreamListener<String, ObjectRecord<String,String>>{
        
        private StringRedisTemplate redisTemplate;
    
        public StreamMessageListener(StringRedisTemplate stringRedisTemplate) {
            redisTemplate = stringRedisTemplate;
        }
        
        @Override
        public void onMessage(ObjectRecord<String, String> message) {
    
            RecordId id = message.getId();
            String topic = message.getStream();
            String msgBody = message.getValue();
            
            log.info("收到主题{}消息ID={}, 消息内容{}", topic, id.getValue(), msgBody);
            
            String group = "some-service"; //消费组,使用服务名
            redisTemplate.opsForStream().acknowledge(topic, group, id.getValue());
        }
    
    }
    

    配置消息容器,将StreamListener的实现注册到消息容器StreamMessageListenerContainer

    @Configuration
    public class RedisStreamListenerContainerConfig {
        
        @Autowired
        private RedisConnectionFactory redisConnectionFactory;
        
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
        
        @Bean
        public StreamMessageListenerContainer redisStreamListenerContainer() {
            StreamMessageListenerContainerOptions options = 
                    StreamMessageListenerContainerOptions.builder()
                                                        .batchSize(100)
                                                        .pollTimeout(Duration.ZERO)
                                                        .targetType(String.class)
                                                        .build();
            StreamMessageListenerContainer container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
            
            String GroupName = "some-service";  //消费组命名一般用服务名
            String consumerName = "127.0.0.1:8080"; //消费者命名一般用服务集群下每个节点的ip:port,可以区分是哪个节点消费
            String stream = "testTopic"; //stream名称,即topic
            
            container.receive(Consumer.from(GroupName, consumerName), 
                                StreamOffset.create(stream, ReadOffset.lastConsumed()),
                                new StreamMessageListener(stringRedisTemplate)); //将Listener添加到监听容器
            
            container.start(); //启动消息容器
            
            return container;
        }
    }
    

    测试:

    消息已发送,消息ID:1641872849111-0

    收到主题testTopic消息ID=1641872849111-0, 消息内容{"name":"stream-肥兔子爱豆畜子"}

    总结说明:

    1、为了方便消息的格式笔者统一用了String类型,用fastjson做序列化以后进行传输。

    2、StreamListener.onMessage收到消息进行处理以后,手工调用ack进行回复,不然服务端给当前消费者缓存的pending_ids会越来越大、占用内存。

    3、消费组笔者一般用服务名来区分,服务下挂多个节点,那么每个节点可以用ip:port作为唯一标识,所以用ip:port作为消费组下的消费组名称。

    参考:《Redis深度历险:核心原理与应用实践》

           [Redis 的发布订阅功能在 SpringBoot 中的应用 - 知乎 (zhihu.com)](https://zhuanlan.zhihu.com/p/59065399)
    
           [Stream消息队列在SpringBoot中的实践与踩坑 | (lolico.me)](https://lolico.me/2020/06/28/Using-stream-to-implement-message-queue-in-springboot/)
    
          [redis — 基于Spring Boot实现Redis stream实时流事件处理_Haqiu.Hwang的博客-CSDN博客](https://blog.csdn.net/qq_38658567/article/details/109376888)
    

    相关文章

      网友评论

        本文标题:Redis做轻量级消息队列的3种玩法

        本文链接:https://www.haomeiwen.com/subject/govbcrtx.html