美文网首页
redis实现消息队列

redis实现消息队列

作者: 爱余星痕 | 来源:发表于2020-06-01 08:37 被阅读0次

消息队列一般都会想到kafka,rabbitmq,Rockermq, 其实,给你印像做缓存的Redis也是能做消息队列.

  1. redis消息队列生产者如下:
@Service
public class MessageQueueRedisProducerServiceImpl implements IMessageQueueProducerService {

    @Autowired
    private StringRedisTemplate  redisTemplate;


    @Override
    public boolean produceMessage(MessageQueueDto messageQueueDto) {
        redisTemplate.convertAndSend(messageQueueDto.getTopic(),messageQueueDto.getMessage());
        return true;
    }

    @Override
    public boolean support(String producerType) {
        return Objects.equals(producerType,"redis");
    }
}

其中,只要调用convertAndSend方法就可以产生队列

2 redis消息队列消费者如下:

public class MessageQueueRedisConsumerListener implements MessageListener {

    private IMessageQueueConsumerService messageQueueConsumerService;

    public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
        this.messageQueueConsumerService = messageQueueConsumerService;
    }


    @Override
    public void onMessage(Message message, byte[] pattern) {
        messageQueueConsumerService.receiveMessage(message.toString());
    }
}

MessageQueueRedisConsumerListener 实现接口MessageListener 的监听,这个主要用于处理获取到的消息数据

@Service
public class MessageQueueRedisConsumerServiceFactory {


    private List<IMessageQueueConsumerService> messageQueueConsumerServices;

    @Autowired
    public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
        messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                messageQueueConsumerService.support("redis")).collect(Collectors.toList());
    }

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
            MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
                    new MessageQueueRedisConsumerListener(messageQueueConsumerService));
            messageListenerAdapter.afterPropertiesSet();
            container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));

        });

        return container;
    }


}

b. 类MessageQueueRedisConsumerServiceFactory 主要是用于注册监听器,要监听哪种主题,并这种主题使用哪种数据处理类
至此,redis的消息队列已完成.

相关文章

网友评论

      本文标题:redis实现消息队列

      本文链接:https://www.haomeiwen.com/subject/ryfjzhtx.html