ActiveMQ设置异步投递用来解决slow consumer。
1.ActiveMQConfig.java
package com.ctgu.demo.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Queue;
import javax.jms.Topic;
import java.util.ArrayList;
import java.util.List;
@Configuration
@EnableJms
public class ActiveMQConfig {
@Value("${my-queue}")
private String myQueue;
@Value("${my-delay-queue}")
private String myDelayQueue;
@Value("${my-topic}")
private String myTopic;
@Value("${spring.activemq.user}")
private String userName;
@Value("${spring.activemq.password}")
private String password;
@Value("${spring.activemq.broker-url}")
private String brokerURL;
private static List<String> trustList = new ArrayList<>();
static {
trustList.add("java.lang");
trustList.add("java.util");
trustList.add("com.ctgu.demo");
}
@Bean
public Queue queue() {
return new ActiveMQQueue(myQueue);
}
@Bean
public Queue delayQueue() {
return new ActiveMQQueue(myDelayQueue);
}
@Bean
public Topic topic() {
return new ActiveMQTopic(myTopic);
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory(userName, password, brokerURL);
}
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(false);
bean.setConnectionFactory(connectionFactory);
return bean;
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory connectionFactory){
connectionFactory.setTrustAllPackages(true);
// connectionFactory.setTrustedPackages(trustList);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(0);
connectionFactory.setRedeliveryPolicy(policy);
return new JmsTemplate(connectionFactory);
}
}
2.User.java
package com.ctgu.demo.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
private Integer id;
private String name;
private Integer age;
}
3.ActiveMQReceiveService.java
package com.ctgu.demo.service;
import org.joda.time.DateTime;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.TextMessage;
@Service
public class ActiveMQReceiveService {
@JmsListener(destination = "${my-queue}", containerFactory = "queueListenerFactory")
public void receiveQueue(TextMessage textMessage) throws JMSException {
System.out.println("*****receive Queue message: " + textMessage.getText()
+ DateTime.now().toDate());
}
@JmsListener(destination = "${my-delay-queue}", containerFactory = "queueListenerFactory")
public void receiveQueue(ObjectMessage objectMessage) throws JMSException {
System.out.println("*****receive Delay Queue message: " + objectMessage.getObject()
+ DateTime.now().toDate());
}
@JmsListener(destination = "${my-topic}", containerFactory = "topicListenerFactory")
public void receiveTopic(TextMessage textMessage) throws JMSException {
System.out.println("*****receive Topic message: " + textMessage.getText());
}
@JmsListener(destination = "${my-topic}", containerFactory = "topicListenerFactory")
public void receiveTopic1(TextMessage textMessage) throws JMSException {
System.out.println("*****receive Topic message1: " + textMessage.getText());
}
}
4.DefinePostProcessor.java
package com.ctgu.demo.config;
import lombok.Data;
import org.apache.activemq.ScheduledMessage;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.util.StringUtils;
import javax.jms.JMSException;
import javax.jms.Message;
import java.io.Serializable;
@Data
public class DefinePostProcessor implements MessagePostProcessor {
private long delay = 0L;
private long period = 0L;
private int repeat = 0;
private String corn = null;
public Message postProcessMessage(Message message) throws JMSException{
if (delay > 0) {
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
}
if (period > 0) {
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
}
if (repeat > 0) {
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
}
if (!StringUtils.isEmpty(corn)) {
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, corn);
}
return message;
}
}
5.ActiveMQSendService.java
package com.ctgu.demo.service;
import com.ctgu.demo.config.DefinePostProcessor;
import com.ctgu.demo.entity.User;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import org.apache.activemq.ScheduledMessage;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.jms.*;
@Service
public class ActiveMQSendService {
private final static Logger log = LoggerFactory.getLogger(ActiveMQSendService.class);
@Autowired
private JmsTemplate jmsTemplate;
@Qualifier(value = "queue")
@Autowired
private Queue queue;
@Qualifier(value = "delayQueue")
@Autowired
private Queue delayQueue;
@Autowired
private Topic topic;
public void sendMessage(Destination destination, String message) {
jmsTemplate.convertAndSend(destination, message);
}
public void sendMessage(String message) {
jmsTemplate.convertAndSend(queue, message);
}
// @Scheduled(fixedRate = 3000)
@PostConstruct
public void sendMessageScheduled() {
String message = "*****Scheduled:" + DateTime.now().toString();
jmsTemplate.convertAndSend(queue, message);
System.out.println("*****send Queue MessageScheduled is OK!");
}
// @Scheduled(fixedRate = 3000)
@PostConstruct
public void sendAsyncMessageScheduled() throws JMSException {
String message = "*****AsyncScheduled:" + DateTime.now().toString();
asyncSendMessage(queue, message);
System.out.println("*****send Queue AsyncScheduled is OK!");
}
@PostConstruct
public void sendDelayMessageScheduled() throws JMSException {
User user = new User(1, "xx", 20);
long delay = 5 * 1000;
long period = 2 * 1000;
int repeat = 5;
// DefinePostProcessor msgPostProcess = new DefinePostProcessor();
// msgPostProcess.setDelay(delay);
// msgPostProcess.setPeriod(period);
// msgPostProcess.setRepeat(repeat);
// jmsTemplate.setDefaultDestination(delayQueue);
// jmsTemplate.convertAndSend(user, msgPostProcess);
jmsTemplate.convertAndSend(user, (message -> {
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
return message;
}));
// delaySendMessage(delayQueue, message, 500L);
System.out.println("*****send Delay Queue MessageScheduled is OK!");
}
public void sendTopic(String msg) {
jmsTemplate.convertAndSend(topic, msg);
}
// @Scheduled(fixedRate = 3000)
@PostConstruct
public void sendTopicScheduled() {
String message = "*****Topic Scheduled:" + DateTime.now().toString();
jmsTemplate.convertAndSend(topic, message);
System.out.println("*****send Topic MessageScheduled is OK!");
}
private void asyncSendMessage(Destination destination, String msg) throws JMSException {
Connection connection = jmsTemplate.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
// 设置持久化
producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
Message message = session.createTextMessage(msg);
producer.send(message, new AsyncCallback() {
@Override
public void onSuccess() {
log.info("消息发送成功!{}", DateTime.now().toDate());
}
@Override
public void onException(JMSException e) {
log.error("消息发送异常!{}", DateTime.now().toDate());
}
});
session.commit();
producer.close();
session.close();
connection.close();
}
private void delaySendMessage(Destination destination, String msg, long delayTime) throws JMSException {
Connection connection = jmsTemplate.getConnectionFactory().createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination);
producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());
Message message = session.createTextMessage(msg);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);
producer.send(message);
session.commit();
producer.close();
session.close();
connection.close();
}
}
6.application.yml
server:
port: 7777
spring:
activemq:
broker-url: tcp://localhost:61616
user: admin
password: admin
in-memory: true
pool:
enabled: true
max-connections: 100
# jms:
# pub-sub-domain: false
my-queue: boot-active-queue
my-delay-queue: boot-active-delay-queue
my-topic: boot-active-topic
7.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ctgu</groupId>
<artifactId>boot_mq_produce</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot_mq_produce</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.9</version>
</dependency>
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
网友评论