美文网首页
通用的消息队列(redis,kafka,rabbitmq)--消

通用的消息队列(redis,kafka,rabbitmq)--消

作者: 爱余星痕 | 来源:发表于2020-06-18 14:48 被阅读0次

    上篇我写了一个通用的消息队列(redis,kafka,rabbitmq)--生产者篇,这次写一个消费者篇.
    1.消费者的通用调用类:

    
    /**
     * 消息队列处理的handle
     * @author starmark
     * @date 2020/5/1  上午10:56
     */
    public interface IMessageQueueConsumerService {
    
    
        /**
         * 处理消息队列的消息
         * @param message 消息
         */
        void receiveMessage(String message);
    
        /**
         * 返回监听的topic
         * @return 主题
         */
        String topic();
    
        /**
         *
         * @param consumerType 消费者类型
         * @return 是否支持该消费者类者
         */
        boolean support(String consumerType);
    }
    

    只要实现该类的接口就可以实现监听,
    redis的消费端,有两个类,如下:

    
    /**
     * @author starmark
     * @date 2020/5/2  下午3:05
     */
    public class MessageQueueRedisConsumerListener implements MessageListener {
    
        private IMessageQueueConsumerService messageQueueConsumerService;
    
        public MessageQueueRedisConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
            this.messageQueueConsumerService = messageQueueConsumerService;
        }
    
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            messageQueueConsumerService.receiveMessage(message.toString());
        }
    }
    
    /**
     * 消息队列服务端的监听
     *
     * @author starmark
     * @date 2020/5/1  上午10:55
     */
    @Service
    public class MessageQueueRedisConsumerServiceFactory {
    
    
        private List<IMessageQueueConsumerService> messageQueueConsumerServices;
    
        @Autowired
        public MessageQueueRedisConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
            messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                    messageQueueConsumerService.support("redis")).collect(Collectors.toList());
        }
    
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
    
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
    
            messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
                MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
                        new MessageQueueRedisConsumerListener(messageQueueConsumerService));
                messageListenerAdapter.afterPropertiesSet();
                container.addMessageListener(messageListenerAdapter, new PatternTopic(messageQueueConsumerService.topic()));
    
            });
    
            return container;
        }
    
    
    }
    

    kafka消费者也有两个类,如下:

    
    /**
     * @author starmark
     * @date 2020/5/2  下午3:05
     */
    public class MessageQueueKafkaConsumerListener implements MessageListener<String,String> {
    
        private final IMessageQueueConsumerService messageQueueConsumerService;
    
        public MessageQueueKafkaConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
            this.messageQueueConsumerService = messageQueueConsumerService;
        }
    
    
        @Override
        public void onMessage(ConsumerRecord<String, String> data) {
            messageQueueConsumerService.receiveMessage(data.value());
        }
    }
    
    /**
     * 消息队列服务端的监听
     *
     * @author starmark
     * @date 2020/5/1  上午10:55
     */
    @Component
    public class MessageQueueKafkaConsumerServiceFactory  implements InitializingBean {
    
        @Autowired
        KafkaProperties kafkaProperties;
    
        private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
    
        @Autowired
        public MessageQueueKafkaConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList) {
            messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                    messageQueueConsumerService.support("kafka")).collect(Collectors.toList());
        }
    
    
    
    
        private KafkaMessageListenerContainer<Integer, String> createContainer(
                ContainerProperties containerProps) {
            Map<String, Object> props = kafkaProperties.buildConsumerProperties();
            DefaultKafkaConsumerFactory<Integer, String> cf =
                    new DefaultKafkaConsumerFactory<>(props);
            return new KafkaMessageListenerContainer<>(cf, containerProps);
        }
    
    
        @Override
        public void afterPropertiesSet() {
            messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
                ContainerProperties containerProps = new ContainerProperties(messageQueueConsumerService.topic());
    
                containerProps.setMessageListener(new MessageQueueKafkaConsumerListener(messageQueueConsumerService)
                );
                KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
                container.setBeanName(messageQueueConsumerService.topic() + "kafkaListener");
    
                container.start();
    
            });
    
        }
    }
    
    

    这些类都是实现动态监听某个主题.

    rabbitmq就有点复杂,因为他要求建了queue才能实现监听,我现在这个代码,如果生产者没有创建队列,会自动帮生产者创建该主题的队列。其实这是不对的,但不这么做,无法实现监听.

    
    /**
     * @author starmark
     * @date 2020/5/2  下午3:05
     */
    public class MessageQueueRabbitmqConsumerListener implements MessageListener  {
    
        private final IMessageQueueConsumerService messageQueueConsumerService;
    
        public MessageQueueRabbitmqConsumerListener(IMessageQueueConsumerService messageQueueConsumerService) {
            this.messageQueueConsumerService = messageQueueConsumerService;
        }
    
    
        @Override
        public void onMessage(Message message) {
    
            messageQueueConsumerService.receiveMessage(new String(message.getBody()));
        }
    
    }
    
    @Component
    public class MessageQueueRabbitmqConsumerServiceFactory implements InitializingBean {
    
        //自动注入RabbitTemplate模板类
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private final ConfigurableApplicationContext applicationContext;
        private final List<IMessageQueueConsumerService> messageQueueConsumerServices;
        private final ConnectionFactory connectionFactory;
    
        @Autowired
        public MessageQueueRabbitmqConsumerServiceFactory(List<IMessageQueueConsumerService> messageQueueConsumerServiceList, ConfigurableApplicationContext applicationContext, ConnectionFactory connectionFactory) {
            messageQueueConsumerServices = messageQueueConsumerServiceList.stream().filter(messageQueueConsumerService ->
                    messageQueueConsumerService.support("rabbitmq")).collect(Collectors.toList());
            this.applicationContext = applicationContext;
            this.connectionFactory = connectionFactory;
    
        }
    
    
        @Override
        public void afterPropertiesSet() {
            messageQueueConsumerServices.forEach(messageQueueConsumerService -> {
    
                this.registerBean(messageQueueConsumerService.topic(), messageQueueConsumerService.topic());
                SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
                container.setConnectionFactory(connectionFactory);
                container.setConsumerStartTimeout(6000L);
            ;
                //设置监听的队列名,
                String[] types = {messageQueueConsumerService.topic()};
                container.setQueueNames(types);
                container.setMessageListener(new MessageQueueRabbitmqConsumerListener(messageQueueConsumerService));
                container.start();
            });
    
        }
    
    
        private void registerBean(String name, Object... args) {
            if (applicationContext.containsBean(name)) {
                return;
            }
            BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(Queue.class);
            if (args.length > 0) {
                for (Object arg : args) {
                    beanDefinitionBuilder.addConstructorArgValue(arg);
                }
            }
            BeanDefinition beanDefinition = beanDefinitionBuilder.getRawBeanDefinition();
    
            BeanDefinitionRegistry beanFactory = (BeanDefinitionRegistry) applicationContext.getBeanFactory();
            beanFactory.registerBeanDefinition(name, beanDefinition);
    
        }
    }
    

    至此,通用的消息队列已完成,这个只能满足一般情况的使用 .
    如果要更高端的使用,直接使用其原生的api会更好.

    相关文章

      网友评论

          本文标题:通用的消息队列(redis,kafka,rabbitmq)--消

          本文链接:https://www.haomeiwen.com/subject/egduxktx.html