美文网首页
redis实现消息队列

redis实现消息队列

作者: 爱余星痕 | 来源:发表于2020-06-01 08:37 被阅读0次

    消息队列一般都会想到kafka,rabbitmq,Rockermq, 其实,给你印像做缓存的Redis也是能做消息队列.

    1. redis消息队列生产者如下:
    @Service
    public class MessageQueueRedisProducerServiceImpl implements IMessageQueueProducerService {
    
        @Autowired
        private StringRedisTemplate  redisTemplate;
    
    
        @Override
        public boolean produceMessage(MessageQueueDto messageQueueDto) {
            redisTemplate.convertAndSend(messageQueueDto.getTopic(),messageQueueDto.getMessage());
            return true;
        }
    
        @Override
        public boolean support(String producerType) {
            return Objects.equals(producerType,"redis");
        }
    }
    

    其中,只要调用convertAndSend方法就可以产生队列

    2 redis消息队列消费者如下:

    public class MessageQueueRedisConsumerListener implements MessageListener {
    
        private IMessageQueueConsumerService messageQueueConsumerService;
    
        public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
            this.messageQueueConsumerService = messageQueueConsumerService;
        }
    
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            messageQueueConsumerService.receiveMessage(message.toString());
        }
    }
    

    MessageQueueRedisConsumerListener 实现接口MessageListener 的监听,这个主要用于处理获取到的消息数据

    @Service
    public class MessageQueueRedisConsumerServiceFactory {
    
    
        private List<IMessageQueueConsumerService> messageQueueConsumerServices;
    
        @Autowired
        public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
            messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                    messageQueueConsumerService.support("redis")).collect(Collectors.toList());
        }
    
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
    
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
    
            messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
                MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
                        new MessageQueueRedisConsumerListener(messageQueueConsumerService));
                messageListenerAdapter.afterPropertiesSet();
                container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));
    
            });
    
            return container;
        }
    
    
    }
    

    b. 类MessageQueueRedisConsumerServiceFactory 主要是用于注册监听器,要监听哪种主题,并这种主题使用哪种数据处理类
    至此,redis的消息队列已完成.

    相关文章

      网友评论

          本文标题:redis实现消息队列

          本文链接:https://www.haomeiwen.com/subject/ryfjzhtx.html