美文网首页Java 杂谈
01_Spring集成RabbitMQ之声明式注解

01_Spring集成RabbitMQ之声明式注解

作者: 明天你好向前奔跑 | 来源:发表于2019-08-29 17:10 被阅读0次

    @Author Jacky Wang
    日常积累,转载请注明出处,https://www.jianshu.com/p/b081f1fd1480
    最近新开发的一个项目是由传统的SSM架构进行开发的,之前介绍了SpringBoot集成RabbitMQ的方式,这次特地对Spring集成RabbitMQ做一次记录及介绍。
    如需较详细了解RabbitMQ的相关知识,可参考我的另一篇文章:03_SpringBoot集成RabbitMQ

    由Spring集成RabbitMQ一般采用Xml配置或注解式两种方式来进行集成。由于个人不喜欢过多的xml文件,因此这里仅对注解方式进行记录。

    1. 声明式注解集成RabbitMQ

    1.1 步骤

    1. 引入pom依赖
    2. 创建配置文件,包含mq的基础配置
    3. 创建RabbitMQ监听器
    4. 声明RabbitMQ配置类
    5. 创建消息生产者
    6. 测试

    1.2 集成

    1.2.1 引入Pom依赖
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.1.5.RELEASE</version>
    </dependency>
    <!-- 解决冲突 -->
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.9.13</version>
    </dependency>
    
    1.2.2 创建配置文件 rabbitmq.properties
    #rabbitmq.host=127.0.0.1
    #rabbitmq服务器
    rabbitmq.host=192.168.3.171
    rabbitmq.port=5672
    rabbitmq.username=guest
    rabbitmq.password=guest
    rabbitmq.virtual.host=/
    
    #交换机
    exchange.mes.com.add=mes.fanout.com.add
    exchange.mes.com.del=mes.fanout.com.delete
    exchange.mes.com.update=mes.fanout.com.update
    exchange.mes.user.add=mes.fanout.user.add
    exchange.mes.user.del=mes.fanout.user.delete
    exchange.mes.user.update=mes.fanout.user.update
    
    #队列
    queue.mes.com.add=mes_company_add
    queue.mes.com.del=mes_company_delete
    queue.mes.com.update=mes_company_update
    queue.mes.user.add=mes_user_add
    queue.mes.user.del=mes_user_delete
    queue.mes.user.update=mes_user_update
    
    1.2.3 创建RabbitMQ监听器
    /**
     * 消费监听类
     */
    @Component
    @Transactional
    public class QueueListener {
    
        public static final Logger logger = LoggerFactory.getLogger(QueueListener.class);
    
        @Autowired
        private AppInterfaceService appInterfaceService;
        @Autowired
        private RabbitMQConfig rabbitMQConfig;
        @Autowired
        private OfficeMapper officeMapper;
        @Autowired
        private SystemService systemService;
        @Autowired
        private UserMapper userMapper;
    
        /**
        * 如果监听队列指定的方法不存在则执行默认方法
        */
        public void onMessage(byte[] msg) {
            try {
                logger.info("onMessage : [{}]", new String(msg, "UTF-8"));
            } catch (Exception e) {
                logger.error("Error : [{}]", e);
            }
        }
    
        /**
         * 公司信息同步
         *
         * @param message
         */
        public void addCompany(byte[] message) {
            logger.info("RabbitMQ Method addCompany Get Msg : [{}]", new String(message));
        }
    
        public void updateCompany(byte[] message) {
            logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
        }
    
        /**
         * 用户信息同步
         *
         * @param message
         */
        public void addUser(byte[] message) {
            logger.info("RabbitMQ Method updateCompany Get Msg : [{}]", new String(message));
        }
    
        public void delUser(byte[] message) {
            logger.info("RabbitMQ Method delUser Get Msg : [{}]", new String(message));
        }
    
        public void updateUser(byte[] message) {
            logger.info("RabbitMQ Method updateUser Get Msg : [{}]", new String(message));
        }
    }
    
    1.2.4 创建RabbitMQ声明配置类
    • 此次使用消费者容器进行消息消费,可支持单类多方法消费不同队列
    • 若无须使用消费者容器,可取消下面 声明消费者监听执行类的注释即可
    • 示例总共创建了六个交换机与六个队列,具体根据实际情况创建即可
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.PropertySource;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration
    @PropertySource(value = "classpath:/properties/rabbitmq.properties")
    public class RabbitMQConfig {
        @Autowired
        private QueueListener queueListener;
    
        @Value("${rabbitmq.host}")
        private String host;
        @Value("${rabbitmq.port}")
        private int port;
        @Value("${rabbitmq.username}")
        private String username;
        @Value("${rabbitmq.password}")
        private String password;
        @Value("${rabbitmq.virtual.host}")
        private String vhost;
    
        @Value("${exchange.mes.com.add}")
        private String companyAddExchangeName;
        @Value("${exchange.mes.com.del}")
        private String companyDelExchangeName;
        @Value("${exchange.mes.com.update}")
        private String companyUpdateExchangeName;
        @Value("${exchange.mes.user.add}")
        private String userAddExchangeName;
        @Value("${exchange.mes.user.del}")
        private String userDelExchangeName;
        @Value("${exchange.mes.user.update}")
        private String userUpdateExchangeName;
    
        @Value("${queue.mes.com.add}")
        private String companyAddQueueName;
        @Value("${queue.mes.com.del}")
        private String companyDelQueueName;
        @Value("${queue.mes.com.update}")
        private String companyUpdateQueueName;
        @Value("${queue.mes.user.add}")
        private String userAddQueueName;
        @Value("${queue.mes.user.del}")
        private String userDelQueueName;
        @Value("${queue.mes.user.update}")
        private String userUpdateQueueName;
    
        /**
         * rabbitmq连接配置
         *
         * @return
         */
        @Bean
        public ConnectionFactory rabbitConnectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(vhost);
            return connectionFactory;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory());
            template.setMessageConverter(jsonMessageConverter());
            return template;
        }
    
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin() {
            RabbitAdmin admin = new RabbitAdmin(rabbitConnectionFactory());
            admin.setIgnoreDeclarationExceptions(true); //即使有关rabbitmq的bean初始化失败整个web应用还能正常启动
            return admin;
        }
    
        /**
         * 声明交换机Exchange
         *
         * @return
         */
        @Bean
        public FanoutExchange companyAddExchange() {
            return new FanoutExchange(companyAddExchangeName, true, false);
        }
    
        @Bean
        public FanoutExchange companyDelExchange() {
            return new FanoutExchange(companyDelExchangeName, true, false);
        }
    
        @Bean
        public FanoutExchange companyUpdateExchange() {
            return new FanoutExchange(companyUpdateExchangeName, true, false);
        }
    
        @Bean
        public FanoutExchange userAddExchange() {
            return new FanoutExchange(userAddExchangeName, true, false);
        }
    
        @Bean
        public FanoutExchange userDelExchange() {
            return new FanoutExchange(userDelExchangeName, true, false);
        }
    
        @Bean
        public FanoutExchange userUpdateExchange() {
            return new FanoutExchange(userUpdateExchangeName, true, false);
        }
    
        /**
         * 声明队列Queue
         *
         * @return
         */
        @Bean
        public Queue companyAddQueue() {
            return new Queue(companyAddQueueName, true, false, false);
        }
    
        @Bean
        public Queue companyDelQueue() {
            return new Queue(companyDelQueueName, true, false, false);
        }
    
        @Bean
        public Queue companyUpdateQueue() {
            return new Queue(companyUpdateQueueName, true, false, false);
        }
    
        @Bean
        public Queue userAddQueue() {
            return new Queue(userAddQueueName, true, false, false);
        }
    
        @Bean
        public Queue userDelQueue() {
            return new Queue(userDelQueueName, true, false, false);
        }
    
        @Bean
        public Queue userUpdateQueue() {
            return new Queue(userUpdateQueueName, true, false, false);
        }
    
        /**
         * 将队列绑定到指定的交换机
         *
         * @param companyAddQueue
         * @param companyAddExchange
         * @return
         */
        @Bean
        public Binding companyAddBinding(Queue companyAddQueue, FanoutExchange companyAddExchange) {
            return BindingBuilder.bind(companyAddQueue).to(companyAddExchange);
        }
    
        @Bean
        public Binding companyDelBinding(Queue companyDelQueue, FanoutExchange companyDelExchange) {
            return BindingBuilder.bind(companyDelQueue).to(companyDelExchange);
        }
    
        @Bean
        public Binding companyUpdateBinding(Queue companyUpdateQueue, FanoutExchange companyUpdateExchange) {
            return BindingBuilder.bind(companyUpdateQueue).to(companyUpdateExchange);
        }
    
        @Bean
        public Binding userAddBinding(Queue userAddQueue, FanoutExchange userAddExchange) {
            return BindingBuilder.bind(userAddQueue).to(userAddExchange);
        }
    
        @Bean
        public Binding userDelBinding(Queue userDelQueue, FanoutExchange userDelExchange) {
            return BindingBuilder.bind(userDelQueue).to(userDelExchange);
        }
    
        @Bean
        public Binding userUpdateBinding(Queue userUpdateQueue, FanoutExchange userUpdateExchange) {
            return BindingBuilder.bind(userUpdateQueue).to(userUpdateExchange);
        }
    
        /**
         * 声明消费者监听执行类
         * @param receiver
         * @return
         */
        /*@Bean
        public MessageListenerAdapter listenerAdapter(QueueListener receiver) {
            MessageListenerAdapter m = new MessageListenerAdapter(receiver, "process");
            m.setMessageConverter(jsonMessageConverter());
            return m;
        }*/
    
        /**
         * 消费者容器
         * 为不同队列指定不同的执行方法
         * @param rabbitConnectionFactory
         * @return
         */
        @Bean
        SimpleMessageListenerContainer listenerContainer(ConnectionFactory rabbitConnectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(rabbitConnectionFactory);
            //container.setMessageConverter(jsonMessageConverter());
            //container.setConcurrentConsumers(1);
            //container.setMaxConcurrentConsumers(5);
            //container.setDefaultRequeueRejected(false);
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setConsumerTagStrategy(q -> projectKey + "_" + q);
            //container.setQueues(companyAddQueue(), companyDelQueue(), companyUpdateQueue(), userAddQueue(), userDelQueue(), userUpdateQueue());
            container.setQueueNames(companyAddQueueName, companyDelQueueName, companyUpdateQueueName, userAddQueueName, userDelQueueName, userUpdateQueueName);
    
            MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(queueListener);
            listenerAdapter.setDefaultListenerMethod("onMessage");
            listenerAdapter.setMessageConverter(jsonMessageConverter());
            Map<String, String> queueOrTagToMethodName = new HashMap<>();
            queueOrTagToMethodName.put(companyAddQueueName, "addCompany");
            queueOrTagToMethodName.put(companyDelQueueName, "delCompany");
            queueOrTagToMethodName.put(companyUpdateQueueName, "updateCompany");
            queueOrTagToMethodName.put(userAddQueueName, "addUser");
            queueOrTagToMethodName.put(userDelQueueName, "delUser");
            queueOrTagToMethodName.put(userUpdateQueueName, "updateUser");
            listenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
            container.setMessageListener(listenerAdapter);
            return container;
        }
    }
    
    1.2.5 创建消息生产者
    @Service
    public class RabbitMQSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private final static Logger logger = LoggerFactory.getLogger(RabbitMQSender.class);
    
        public void sendDataToQueue(String exchange, String routingKey, Object object) {
            try {
                rabbitTemplate.setMessagePropertiesConverter(new MessagePropertiesConverter() {
                    @Override
                    public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) {
                        MessageProperties messageProperties = new MessageProperties();
                        messageProperties.setContentType("application/json");
                        messageProperties.setContentEncoding("UTF-8");
                        return messageProperties;
                    }
    
                    @Override
                    public AMQP.BasicProperties fromMessageProperties(MessageProperties source, String charset) {
                        return null;
                    }
                });
    
                rabbitTemplate.convertAndSend(exchange, routingKey, object);
            } catch (Exception e) {
                logger.error("发送mq消息异常,Cause:[]", e);
            }
    
        }
    }
    
    1.2.6 测试
    自定义Test或自定义Controller测试调用生产者发送消息,查看消费者是否消费即可。
    
    eg:
    
    @Controller
    @RequestMapping("${adminPath}/rabbitmq")
    public class RabbitMQController {
    
        @Autowired
        private RabbitMQSender rabbitMQSender;
    
        @RequestMapping("/sendToCompanyAdd")
        @ResponseBody
        public String sendToCompanyAdd(String id) {
            HashMap<String, String> map = new HashMap<>();
            map.put("id", id);
            rabbitMQSender.sendDataToQueue("mes.fanout.com.add", null, map);
            return "SUCCESS";
        }
    }
    

    3 注意事项

    • 若配置失败,请检查Spring的注解扫描是否能扫描到配置类。
    • 检查RabbitMQ基础配置信息是否有错误

    2. Xml方式集成配置文件参考

    spring-rabbitmq.xml:
    
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    
        <description>rabbitmq 连接服务配置</description>
    
        <!-- 加载配置属性文件 -->
        <context:property-placeholder ignore-unresolvable="true" location="classpath:/properties/rabbitmq.properties"/>
    
        <!-- 连接配置 -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}"
                                   password="${rabbitmq.password}" port="${rabbitmq.port}"
                                   virtual-host="${rabbitmq.vhost}"/>
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!-- spring template声明-->
        <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
                         message-converter="jsonMessageConverter"/>
    
        <!-- 消息对象json转换类 -->
        <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
    
        <!-- 申明一个消息队列Queue -->
        <rabbit:queue id="testQueueId" name="${rabbitmq.queue}" durable="false" auto-delete="false" exclusive="false"/>
    
        <!-- 定义交换机 -->
        <rabbit:direct-exchange id="testExchangeId" name="${rabbitmq.exchange}" durable="true" auto-delete="false">
            <rabbit:bindings>
                <rabbit:binding queue="testQueueId" key="${rabbitmq.routingKey}"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <!-- MQ监听配置 -->
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
            <rabbit:listener queues="testQueueId" ref="queueListener" method="process"/>
        </rabbit:listener-container>
    </beans>
    

    相关文章

      网友评论

        本文标题:01_Spring集成RabbitMQ之声明式注解

        本文链接:https://www.haomeiwen.com/subject/ewxwectx.html