美文网首页
Activemq 和 Springboot 2整合

Activemq 和 Springboot 2整合

作者: 兴厚 | 来源:发表于2020-04-12 18:07 被阅读0次

    一. 导包 ( pooled-jms 包用最新的即可, 当前 springboot 版本为2.2.1)

        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
            <dependency>
                <groupId>org.messaginghub</groupId>
                <artifactId>pooled-jms</artifactId>
                <version>1.0.6</version>
            </dependency>
    

    二。 配置项

    
    spring:  
        activemq:
            broker-url: tcp://x.x.x.x:61616
            user: admin
            password: admin
            packages:
                trust-all: true
            pool:
                enabled: true
                max-connections: 50
        jms:
            pub-sub-domain: true
            listener:
                auto-startup: true
                concurrency: 2
                max-concurrency: 2
            template:
                delivery-mode: persistent
                priority: 100
                qos-enabled: true
                receive-timeout: 1000
                time-to-live: 36000
    

    三。代码添加配置

    @EnableJms
    @Configuration
    public class ActiveMQConfig {
    
        @Bean
        public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setConnectionFactory(connectionFactory);
    //        bean.setSessionAcknowledgeMode(4);  // 需要手动ack的时候,需要这个配置,否则默认是客户端自动ack的。
            return bean;
        }
    }
    

    四。 发送消息

    /**
         * 发送消息底层方法
         * @param serializable 消息对象,需要实现 serializable 接口
         * @param header       传递头,可以为null
         * @param destination  队列名字,  Destination destination = new ActiveMQQueue("队列名");
         */
        private void sendMessage(final Serializable serializable, Map<String, Object> header, Destination destination){
            jmsMessagingTemplate.getJmsTemplate().setExplicitQosEnabled(false);
            jmsMessagingTemplate.convertAndSend(destination, serializable, header);
        }
    

    五。接收消息

    /**
         * 当需要手动ack的时候,这种监听方法非常有效 
         * 加上这个注解,配合这个containerFactory能增大服务的吞吐量。 
         * @param message
         * @throws JMSException
         */
        @JmsListener(destination = "test.queue?consumer.prefetchSize=0", containerFactory = "jmsListenerContainerQueue")
        public void listenHello(Message message) throws JMSException {  // 这个 message 类型是javax.jms.Message
            log.info("收到消息 : {}", message);
            if(message instanceof ActiveMQTextMessage){
                // 字符串消息走这个实现
                ActiveMQTextMessage newMessage = (ActiveMQTextMessage) message;
            }else if(message instanceof ActiveMQObjectMessage){
                // 当消息类型为 Object 的时候,走这个读取消息接口
                ActiveMQObjectMessage newMessage = (ActiveMQObjectMessage) message;
                // testbean为这个队列的消息类型
                TestBean testBean = (TestBean) newMessage.getObject();
            }
            // 手动ack消息,需配合 配置类里的设置才能生效
            message.acknowledge();
        }
    
       /**
         * 当不需要手动ack的时候,这样写能最大化利用服务器资源,加大吞吐量和消息消费效率
         * 加上这个注解,配合这个containerFactory能增大服务的吞吐量。
         * @param testBean
         * @throws JMSException
         */
        @JmsListener(destination = "test.queue", containerFactory = "jmsListenerContainerQueue")
        public void listenHello(@Payload TestBean testBean) throws JMSException {
            // do some service ....
        }
    
        /**
         * 当异步任务执行时间特别长的时候,需要客户端主动去activemq上主动拉取消息的时候,就得用这个接收方式,
         * 这个会阻塞直到activemq有任务。然后才会消费,由于消费的时间会特别长,所以配置自动 ack 即可(默认)。
         */
        public void testQueue(){
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            executorService.submit(() -> {
                while (true) {
                    try {
                        Destination destination = new ActiveMQQueue("test.queue?consumer.prefetchSize=0");
                        jmsMessagingTemplate.getJmsTemplate().setReceiveTimeout(JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT);
                        // 会阻塞直到队列中有消息
                        Message<?> message = jmsMessagingTemplate.receive(destination);  // 这里的 Message 是  org.springframework.messaging.Message 类。上面的Message是javax.jms.Message
                        String text = (String) message.getPayload();
                        log.warn("收到消息 : {}, 休眠一分钟,查看ack状态", text);
                        Thread.sleep(20000);
                    }catch (Exception e){
                        log.warn("[Hello] 任务运行过程中出现异常,10秒后重试");
                        Thread.sleep(10000);
                    }
                }
            });
        }
    

    相关文章

      网友评论

          本文标题:Activemq 和 Springboot 2整合

          本文链接:https://www.haomeiwen.com/subject/slebmhtx.html