美文网首页
ActiveMQ 异步、延迟、定时投递,消息后置处理器

ActiveMQ 异步、延迟、定时投递,消息后置处理器

作者: 笨鸡 | 来源:发表于2020-02-24 19:05 被阅读0次

    ActiveMQ设置异步投递用来解决slow consumer。

    1.ActiveMQConfig.java

    package com.ctgu.demo.config;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.RedeliveryPolicy;
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jms.annotation.EnableJms;
    import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
    import org.springframework.jms.config.JmsListenerContainerFactory;
    import org.springframework.jms.core.JmsTemplate;
    
    import javax.jms.Queue;
    import javax.jms.Topic;
    import java.util.ArrayList;
    import java.util.List;
    
    @Configuration
    @EnableJms
    public class ActiveMQConfig {
    
        @Value("${my-queue}")
        private String myQueue;
    
        @Value("${my-delay-queue}")
        private String myDelayQueue;
    
        @Value("${my-topic}")
        private String myTopic;
    
        @Value("${spring.activemq.user}")
        private String userName;
    
        @Value("${spring.activemq.password}")
        private String password;
    
        @Value("${spring.activemq.broker-url}")
        private String brokerURL;
    
        private static List<String> trustList = new ArrayList<>();
    
        static {
            trustList.add("java.lang");
            trustList.add("java.util");
            trustList.add("com.ctgu.demo");
        }
    
        @Bean
        public Queue queue() {
            return new ActiveMQQueue(myQueue);
        }
    
        @Bean
        public Queue delayQueue() {
            return new ActiveMQQueue(myDelayQueue);
        }
    
        @Bean
        public Topic topic() {
            return new ActiveMQTopic(myTopic);
        }
    
        @Bean
        public ActiveMQConnectionFactory connectionFactory() {
            return new ActiveMQConnectionFactory(userName, password, brokerURL);
        }
    
        @Bean
        public JmsListenerContainerFactory<?> topicListenerFactory(ActiveMQConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setPubSubDomain(true);
            bean.setConnectionFactory(connectionFactory);
            return bean;
        }
    
        @Bean
        public JmsListenerContainerFactory<?> queueListenerFactory(ActiveMQConnectionFactory connectionFactory) {
            DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
            bean.setPubSubDomain(false);
            bean.setConnectionFactory(connectionFactory);
            return bean;
        }
    
        @Bean
        public JmsTemplate jmsTemplate(ActiveMQConnectionFactory connectionFactory){
            connectionFactory.setTrustAllPackages(true);
    //        connectionFactory.setTrustedPackages(trustList);
            RedeliveryPolicy policy = new RedeliveryPolicy();
            policy.setMaximumRedeliveries(0);
            connectionFactory.setRedeliveryPolicy(policy);
            return new JmsTemplate(connectionFactory);
        }
    
    
    }
    
    

    2.User.java

    package com.ctgu.demo.entity;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.io.Serializable;
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public class User implements Serializable {
    
        private Integer id;
    
        private String name;
    
        private Integer age;
    }
    

    3.ActiveMQReceiveService.java

    package com.ctgu.demo.service;
    
    import org.joda.time.DateTime;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Service;
    
    import javax.jms.JMSException;
    import javax.jms.ObjectMessage;
    import javax.jms.TextMessage;
    
    @Service
    public class ActiveMQReceiveService {
    
        @JmsListener(destination = "${my-queue}", containerFactory = "queueListenerFactory")
        public void receiveQueue(TextMessage textMessage) throws JMSException {
            System.out.println("*****receive Queue message: " + textMessage.getText()
                    + DateTime.now().toDate());
        }
    
        @JmsListener(destination = "${my-delay-queue}", containerFactory = "queueListenerFactory")
        public void receiveQueue(ObjectMessage objectMessage) throws JMSException {
            System.out.println("*****receive Delay Queue message: " + objectMessage.getObject()
                    + DateTime.now().toDate());
        }
    
        @JmsListener(destination = "${my-topic}", containerFactory = "topicListenerFactory")
        public void receiveTopic(TextMessage textMessage) throws JMSException {
            System.out.println("*****receive Topic message: " + textMessage.getText());
        }
    
        @JmsListener(destination = "${my-topic}", containerFactory = "topicListenerFactory")
        public void receiveTopic1(TextMessage textMessage) throws JMSException {
            System.out.println("*****receive Topic message1: " + textMessage.getText());
        }
    }
    

    4.DefinePostProcessor.java

    package com.ctgu.demo.config;
    
    import lombok.Data;
    import org.apache.activemq.ScheduledMessage;
    
    import org.springframework.jms.core.MessagePostProcessor;
    import org.springframework.util.StringUtils;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import java.io.Serializable;
    
    @Data
    public class DefinePostProcessor implements MessagePostProcessor {
    
        private long delay = 0L;
    
        private long period = 0L;
    
        private int repeat = 0;
    
        private String corn = null;
    
        public Message postProcessMessage(Message message) throws JMSException{
            if (delay > 0) {
                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
            }
            if (period > 0) {
                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            }
            if (repeat > 0) {
                message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
            }
            if (!StringUtils.isEmpty(corn)) {
                message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, corn);
            }
            return message;
        }
    
    }
    

    5.ActiveMQSendService.java

    package com.ctgu.demo.service;
    
    import com.ctgu.demo.config.DefinePostProcessor;
    import com.ctgu.demo.entity.User;
    import org.apache.activemq.ActiveMQMessageProducer;
    import org.apache.activemq.AsyncCallback;
    import org.apache.activemq.ScheduledMessage;
    import org.joda.time.DateTime;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.jms.JmsProperties;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.jms.*;
    
    @Service
    public class ActiveMQSendService {
    
        private final static Logger log = LoggerFactory.getLogger(ActiveMQSendService.class);
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @Qualifier(value = "queue")
        @Autowired
        private Queue queue;
    
        @Qualifier(value = "delayQueue")
        @Autowired
        private Queue delayQueue;
    
        @Autowired
        private Topic topic;
    
        public void sendMessage(Destination destination, String message) {
            jmsTemplate.convertAndSend(destination, message);
        }
    
        public void sendMessage(String message) {
            jmsTemplate.convertAndSend(queue, message);
        }
    
        //    @Scheduled(fixedRate = 3000)
        @PostConstruct
        public void sendMessageScheduled() {
            String message = "*****Scheduled:" + DateTime.now().toString();
            jmsTemplate.convertAndSend(queue, message);
            System.out.println("*****send Queue MessageScheduled is OK!");
        }
    
        //    @Scheduled(fixedRate = 3000)
        @PostConstruct
        public void sendAsyncMessageScheduled() throws JMSException {
            String message = "*****AsyncScheduled:" + DateTime.now().toString();
    
            asyncSendMessage(queue, message);
            System.out.println("*****send Queue AsyncScheduled is OK!");
        }
    
    
        @PostConstruct
        public void sendDelayMessageScheduled() throws JMSException {
            User user = new User(1, "xx", 20);
            long delay = 5 * 1000;
            long period = 2 * 1000;
            int repeat = 5;
            
    //        DefinePostProcessor msgPostProcess = new DefinePostProcessor();
    //        msgPostProcess.setDelay(delay);
    //        msgPostProcess.setPeriod(period);
    //        msgPostProcess.setRepeat(repeat);
    //        jmsTemplate.setDefaultDestination(delayQueue);
    //        jmsTemplate.convertAndSend(user, msgPostProcess);
            
            jmsTemplate.convertAndSend(user, (message -> {
                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
                message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
                return message;
            }));
    
    //        delaySendMessage(delayQueue, message, 500L);
            System.out.println("*****send Delay Queue MessageScheduled is OK!");
        }
    
        public void sendTopic(String msg) {
            jmsTemplate.convertAndSend(topic, msg);
        }
    
        //    @Scheduled(fixedRate = 3000)
        @PostConstruct
        public void sendTopicScheduled() {
            String message = "*****Topic Scheduled:" + DateTime.now().toString();
            jmsTemplate.convertAndSend(topic, message);
            System.out.println("*****send Topic MessageScheduled is OK!");
        }
    
        private void asyncSendMessage(Destination destination, String msg) throws JMSException {
    
            Connection connection = jmsTemplate.getConnectionFactory().createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    
            ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
            // 设置持久化
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            Message message = session.createTextMessage(msg);
    
            producer.send(message, new AsyncCallback() {
                @Override
                public void onSuccess() {
                    log.info("消息发送成功!{}", DateTime.now().toDate());
                }
    
                @Override
                public void onException(JMSException e) {
                    log.error("消息发送异常!{}", DateTime.now().toDate());
                }
            });
            session.commit();
            producer.close();
            session.close();
            connection.close();
        }
    
    
        private void delaySendMessage(Destination destination, String msg, long delayTime) throws JMSException {
            Connection connection = jmsTemplate.getConnectionFactory().createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    
            ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
            producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
            Message message = session.createTextMessage(msg);
            message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);
            producer.send(message);
    
            session.commit();
    
            producer.close();
            session.close();
            connection.close();
        }
    }
    

    6.application.yml

    server:
      port: 7777
    
    spring:
      activemq:
        broker-url: tcp://localhost:61616
        user: admin
        password: admin
        in-memory: true
        pool:
          enabled: true
          max-connections: 100
    #  jms:
    #    pub-sub-domain: false
    
    my-queue: boot-active-queue
    
    my-delay-queue: boot-active-delay-queue
    
    my-topic: boot-active-topic
    

    7.pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.4.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.ctgu</groupId>
        <artifactId>boot_mq_produce</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>boot_mq_produce</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.15.9</version>
            </dependency>
            <dependency>
                <groupId>org.messaginghub</groupId>
                <artifactId>pooled-jms</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <version>2.10</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

    相关文章

      网友评论

          本文标题:ActiveMQ 异步、延迟、定时投递,消息后置处理器

          本文链接:https://www.haomeiwen.com/subject/knriqhtx.html