平时使用的时候,像一些异步任务,有些人会选择直接new一个线程去处理,这样子存在许多弊端。
线程资源频繁的创建还有销毁会浪费许多系统资源。
其次,代码里面的异步任务十分不好管理,哪怕后面想要统一修改也是比较麻烦的。因为需要全面的排查,所以我们这里要做一个统一的入口,这样子维护起来也是十分方便的。
那么异步任务统一的入口其实都是run方法。恰好用线程池去维护就好了。
public class AsyncUtils {
//核心线程数
private static final int corePoolSize = 1;
//最大线程数
private static final int maximumPoolSize = 10;
//线程空闲时间
private static final int keepAliveTime = 60 * 1000;
//任务队列堆积的最大任务数
private static final int maxTaskNum = 100;
//初始化的任务数
private static final int initialTaskNum = 0;
private static final AtomicInteger successaskNum = new AtomicInteger(initialTaskNum);
private static final AtomicInteger failTaskNum = new AtomicInteger(initialTaskNum);
private static final AtomicInteger rejectTaskNum = new AtomicInteger(initialTaskNum);
private static final boolean successFlag = true;
private static final ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(maxTaskNum),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 被拒绝需要有补偿机制--todo
rejectTaskNum.incrementAndGet();
}
});
public static void execute(Runnable runnable) throws InterruptedException, ExecutionException {
try {
if (executorService.submit(runnable, successFlag).get().booleanValue()) {
successaskNum.incrementAndGet();
}
} catch (Exception e) {
failTaskNum.incrementAndGet();
}
}
public static int getSuccessTaskNum() {
return successaskNum.get();
}
public static int getFailTaskNum() {
return failTaskNum.get();
}
public static int getRejectTaskNum() {
return rejectTaskNum.get();
}
}
那么第二个问题来了,怎么样提高吞吐量呢?若每个任务执行的时间过长,必然会导致吞吐量下降。因为首要解决的就是,如何提升吞吐量,那么这里,需要用到的就是消息队列,线程池应该关注的是消息的发送,而不是消息的消费,这样子就可以提高吞吐量了。
那么下面来进行改造一下,升级为2.0版本,但是这里的话,首先得搭建一下mq的容器。
先引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 开启热部署 -->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
</dependencies>
启动类还是照旧
@SpringBootApplication
public class Config {
public static void main(String args[]) {
SpringApplication.run(Config.class, args);
}
}
配置文件
rocketmq.producer.group=test_prodoucer_group
rocketmq.consumer.group=test_consume_group
rocketmq.name-server=127.0.0.1:9876
server.port=8081
将mq消费者生产者交给spring来管理
@Configuration
public class MqConfiguration {
@Value("${rocketmq.producer.group}")
private String group;
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.consume.group}")
private String consumeGroup;
@Bean
public DefaultMQProducer getDefaultMQProducer() {
DefaultMQProducer mqProducer = new DefaultMQProducer();
mqProducer.setProducerGroup(group);
mqProducer.setNamesrvAddr(nameServer);
return mqProducer;
}
@Bean
public DefaultMQPushConsumer getDefaultMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer mqushConsumer = new DefaultMQPushConsumer();
mqushConsumer.setConsumerGroup(consumeGroup);
mqushConsumer.setNamesrvAddr(nameServer);
mqushConsumer.subscribe("test_topic", "*");
mqushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
mqushConsumer.start();
return mqushConsumer;
}
}
添加一个工具类,用于获取bean
@Component
public class ApplicationContextUtils implements ApplicationContextAware{
private ApplicationContext context;
public static ApplicationContext APP_CONTEXT;
public <T> T getBean(Class<T> t) {
return context.getBean(t);
}
public String[] getBeanNames() {
return context.getBeanDefinitionNames();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.context = applicationContext;
APP_CONTEXT = applicationContext;
}
public ApplicationContext getContext() {
return context;
}
public void setContext(ApplicationContext context) {
this.context = context;
}
}
这个时候适当改造一下,改为通过异步发送消息队列
public class AsyncUtilsV2 {
private static final int corePoolSize = 1;
private static final int maximumPoolSize = 10;
private static final int keepAliveTime = 60 * 1000;
private static final int maxTaskNum = 1;
private static final int initialTaskNum = 0;
private static final AtomicInteger successaskNum = new AtomicInteger(initialTaskNum);
private static final AtomicInteger failTaskNum = new AtomicInteger(initialTaskNum);
private static final AtomicInteger rejectTaskNum = new AtomicInteger(initialTaskNum);
private static final boolean successFlag = true;
private static final ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(maxTaskNum),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 被拒绝需要有补偿机制--todo
rejectTaskNum.incrementAndGet();
}
});
public static void execute(Message message) throws InterruptedException, ExecutionException {
try {
if (executorService.submit(new Runnable() {
@Override
public void run() {
ApplicationContextUtils.APP_CONTEXT.getBean(DefaultMQProducer.class);
}
}, successFlag).get().booleanValue()) {
successaskNum.incrementAndGet();
}
} catch (Exception e) {
failTaskNum.incrementAndGet();
}
}
public static int getSuccessTaskNum() {
return successaskNum.get();
}
public static int getFailTaskNum() {
return failTaskNum.get();
}
public static int getRejectTaskNum() {
return rejectTaskNum.get();
}
}
后面发现有个地方多此一举了,看了一下源码,其实对应的,在配置文件配上就可以了,不过只有DefaultMQProducer是可以完成自动配置的,而DefaultMQPushConsumer需要我们自己去注入。
之前的配置文件的key,由于是自己去注入的,所以可以随便写,现在改为spring自动去配置。所以要按照spring的规范来写。为什么要这样子写,直接看源码部分。
rocketmq.producer.group=test_prodoucer_group
rocketmq.nameServer=127.0.0.1:9876
@Configuration
public class MqConfiguration {
@Value("${rocketmq.producer.group}")
private String group;
@Value("${rocketmq.nameServer}")
private String nameServer;
// @Bean通过spring来自动配置,所以不需要了
// public DefaultMQProducer getDefaultMQProducer() {
// DefaultMQProducer mqProducer = new DefaultMQProducer();
// mqProducer.setProducerGroup(group);
// mqProducer.setNamesrvAddr(nameServer);
// return mqProducer;
// }
@Bean
public DefaultMQPushConsumer getDefaultMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer mqushConsumer = new DefaultMQPushConsumer();
mqushConsumer.setConsumerGroup(group);
mqushConsumer.setNamesrvAddr(nameServer);
mqushConsumer.subscribe("test_topic", "*");
mqushConsumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
mqushConsumer.start();
return mqushConsumer;
}
}
源码部分,先来看看关键入口,RocketMQAutoConfiguration,看到defaultMQProducer这个方法相信大家都知道怎么去获取了。看看RocketMQProperties,就是一个很典型的配置类。那么这个RocketMQAutoConfiguration是怎么被找到的呢?就得了解一下springboot的自动装配了。毕竟这个启动类已经不在我们所自定义的config的包路径下了,那么它是怎么被注入的?
@Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class, ObjectMapper.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
@Import({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class})
@AutoConfigureAfter({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class})
public class RocketMQAutoConfiguration {
@Bean
@ConditionalOnMissingBean(DefaultMQProducer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
//通过spring规定好的配置信息去创建DefaultMQProducer
RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = producerConfig.getGroup();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
DefaultMQProducer producer;
String ak = rocketMQProperties.getProducer().getAccessKey();
String sk = rocketMQProperties.getProducer().getSecretKey();
if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic());
producer.setVipChannelEnabled(false);
} else {
producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
rocketMQProperties.getProducer().getCustomizedTraceTopic());
}
producer.setNamesrvAddr(nameServer);
if (!StringUtils.isEmpty(accessChannel)) {
producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
return producer;
}
」
RocketMQProperties,rockmq配置信息的类,因为我们只需要配置上对应信息就好,里面就可以找到我们配置文件上的rocketmq.nameServer,以及rocketmq.producer.group
@SuppressWarnings("WeakerAccess")
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
/**
* The name server for rocketMQ, formats: `host:port;host:port`.
*/
private String nameServer;
/**
* Enum type for accesChannel, values: LOCAL, CLOUD
*/
private String accessChannel;
private Producer producer;
/**
* Configure enable listener or not.
* In some particular cases, if you don't want the the listener is enabled when container startup,
* the configuration pattern is like this :
* rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
* <p>
* the listener is enabled by default.
*/
private Consumer consumer = new Consumer();
public String getNameServer() {
return nameServer;
}
public void setNameServer(String nameServer) {
this.nameServer = nameServer;
}
public String getAccessChannel() {
return accessChannel;
}
public void setAccessChannel(String accessChannel) {
this.accessChannel = accessChannel;
}
public RocketMQProperties.Producer getProducer() {
return producer;
}
public void setProducer(RocketMQProperties.Producer producer) {
this.producer = producer;
}
public static class Producer {
/**
* Group name of producer.
*/
private String group;
/**
* Millis of send message timeout.
*/
private int sendMessageTimeout = 3000;
/**
* Compress message body threshold, namely, message body larger than 4k will be compressed on default.
*/
private int compressMessageBodyThreshold = 1024 * 4;
/**
* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
* <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendAsyncFailed = 2;
/**
* Indicate whether to retry another broker on sending failure internally.
*/
private boolean retryNextServer = false;
/**
* Maximum allowed message size in bytes.
*/
private int maxMessageSize = 1024 * 1024 * 4;
/**
* The property of "access-key".
*/
private String accessKey;
/**
* The property of "secret-key".
*/
private String secretKey;
/**
* Switch flag instance for message trace.
*/
private boolean enableMsgTrace = true;
/**
* The name value of message trace topic.If you don't config,you can use the default trace topic name.
*/
private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public int getSendMessageTimeout() {
return sendMessageTimeout;
}
public void setSendMessageTimeout(int sendMessageTimeout) {
this.sendMessageTimeout = sendMessageTimeout;
}
public int getCompressMessageBodyThreshold() {
return compressMessageBodyThreshold;
}
public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
this.compressMessageBodyThreshold = compressMessageBodyThreshold;
}
public int getRetryTimesWhenSendFailed() {
return retryTimesWhenSendFailed;
}
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
public int getRetryTimesWhenSendAsyncFailed() {
return retryTimesWhenSendAsyncFailed;
}
public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
}
public boolean isRetryNextServer() {
return retryNextServer;
}
public void setRetryNextServer(boolean retryNextServer) {
this.retryNextServer = retryNextServer;
}
public int getMaxMessageSize() {
return maxMessageSize;
}
public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
public void setEnableMsgTrace(boolean enableMsgTrace) {
this.enableMsgTrace = enableMsgTrace;
}
public String getCustomizedTraceTopic() {
return customizedTraceTopic;
}
public void setCustomizedTraceTopic(String customizedTraceTopic) {
this.customizedTraceTopic = customizedTraceTopic;
}
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public static final class Consumer {
/**
* listener configuration container
* the pattern is like this:
* group1.topic1 = false
* group2.topic2 = true
* group3.topic3 = false
*/
private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
public Map<String, Map<String, Boolean>> getListeners() {
return listeners;
}
public void setListeners(Map<String, Map<String, Boolean>> listeners) {
this.listeners = listeners;
}
}
}
网友评论