美文网首页
简单的异步任务工具类

简单的异步任务工具类

作者: guessguess | 来源:发表于2020-07-01 20:43 被阅读0次

平时使用的时候,像一些异步任务,有些人会选择直接new一个线程去处理,这样子存在许多弊端。
线程资源频繁的创建还有销毁会浪费许多系统资源。
其次,代码里面的异步任务十分不好管理,哪怕后面想要统一修改也是比较麻烦的。因为需要全面的排查,所以我们这里要做一个统一的入口,这样子维护起来也是十分方便的。
那么异步任务统一的入口其实都是run方法。恰好用线程池去维护就好了。

public class AsyncUtils {
    //核心线程数
    private static final int corePoolSize = 1;
        //最大线程数
    private static final int maximumPoolSize = 10;
        //线程空闲时间
    private static final int keepAliveTime = 60 * 1000;
        //任务队列堆积的最大任务数
    private static final int maxTaskNum = 100;
        //初始化的任务数
    private static final int initialTaskNum = 0;
    private static final AtomicInteger successaskNum = new AtomicInteger(initialTaskNum);
    private static final AtomicInteger failTaskNum = new AtomicInteger(initialTaskNum);
    private static final AtomicInteger rejectTaskNum = new AtomicInteger(initialTaskNum);


    private static final boolean successFlag = true;
    
    private static final ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
            keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(maxTaskNum),
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    // 被拒绝需要有补偿机制--todo
                    rejectTaskNum.incrementAndGet();
                }
            });

    public static void execute(Runnable runnable) throws InterruptedException, ExecutionException {
        try {
            if (executorService.submit(runnable, successFlag).get().booleanValue()) {
                successaskNum.incrementAndGet();
            }
        } catch (Exception e) {
            failTaskNum.incrementAndGet();
        }
    }
    
    public static int getSuccessTaskNum() {
        return successaskNum.get();
    }
    
    public static int getFailTaskNum() {
        return failTaskNum.get();
    }
    
    public static int getRejectTaskNum() {
        return rejectTaskNum.get();
    }
}

那么第二个问题来了,怎么样提高吞吐量呢?若每个任务执行的时间过长,必然会导致吞吐量下降。因为首要解决的就是,如何提升吞吐量,那么这里,需要用到的就是消息队列,线程池应该关注的是消息的发送,而不是消息的消费,这样子就可以提高吞吐量了。

那么下面来进行改造一下,升级为2.0版本,但是这里的话,首先得搭建一下mq的容器。

先引入依赖

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
    </parent>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 开启热部署 -->
        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.4</version>
        </dependency>
    </dependencies>

启动类还是照旧

@SpringBootApplication
public class Config {
    public static void main(String args[]) {
        SpringApplication.run(Config.class, args);
    }
}

配置文件

rocketmq.producer.group=test_prodoucer_group
rocketmq.consumer.group=test_consume_group
rocketmq.name-server=127.0.0.1:9876
server.port=8081

将mq消费者生产者交给spring来管理

@Configuration
public class MqConfiguration {
    
    @Value("${rocketmq.producer.group}")
    private String group;
    @Value("${rocketmq.name-server}")
    private String nameServer;
    @Value("${rocketmq.consume.group}")
    private String consumeGroup;
    @Bean
    public DefaultMQProducer getDefaultMQProducer() {
        DefaultMQProducer mqProducer = new DefaultMQProducer();
        mqProducer.setProducerGroup(group);
        mqProducer.setNamesrvAddr(nameServer);
        return mqProducer;
    }
    
    @Bean
    public DefaultMQPushConsumer getDefaultMQPushConsumer() throws MQClientException {
        DefaultMQPushConsumer mqushConsumer = new DefaultMQPushConsumer();
        mqushConsumer.setConsumerGroup(consumeGroup);
        mqushConsumer.setNamesrvAddr(nameServer);
        mqushConsumer.subscribe("test_topic", "*");
        mqushConsumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
            
        });
        mqushConsumer.start();
        return mqushConsumer;
    }
}

添加一个工具类,用于获取bean

@Component
public class ApplicationContextUtils implements ApplicationContextAware{
    private ApplicationContext context;
    public static ApplicationContext APP_CONTEXT;
    public <T> T getBean(Class<T> t) {
        return context.getBean(t);
    }
    
    public String[] getBeanNames() {
        return context.getBeanDefinitionNames();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.context = applicationContext;
        APP_CONTEXT = applicationContext;
    }

    public ApplicationContext getContext() {
        return context;
    }

    public void setContext(ApplicationContext context) {
        this.context = context;
    }
}

这个时候适当改造一下,改为通过异步发送消息队列

public class AsyncUtilsV2 {
    
    private static final int corePoolSize = 1;
    private static final int maximumPoolSize = 10;
    private static final int keepAliveTime = 60 * 1000;
    private static final int maxTaskNum = 1;
    private static final int initialTaskNum = 0;
    private static final AtomicInteger successaskNum = new AtomicInteger(initialTaskNum);
    private static final AtomicInteger failTaskNum = new AtomicInteger(initialTaskNum);
    private static final AtomicInteger rejectTaskNum = new AtomicInteger(initialTaskNum);
    private static final boolean successFlag = true;
     
    private static final ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
            keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(maxTaskNum),
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    // 被拒绝需要有补偿机制--todo
                    rejectTaskNum.incrementAndGet();
                }
            });

    public static void execute(Message message) throws InterruptedException, ExecutionException {
        try {
            if (executorService.submit(new Runnable() {
                @Override
                public void run() {
                    ApplicationContextUtils.APP_CONTEXT.getBean(DefaultMQProducer.class);
                }
            }, successFlag).get().booleanValue()) {
                successaskNum.incrementAndGet();
            }
        } catch (Exception e) {
            failTaskNum.incrementAndGet();
        }
    }
    
    public static int getSuccessTaskNum() {
        return successaskNum.get();
    }
    
    public static int getFailTaskNum() {
        return failTaskNum.get();
    }
    
    public static int getRejectTaskNum() {
        return rejectTaskNum.get();
    }
}

后面发现有个地方多此一举了,看了一下源码,其实对应的,在配置文件配上就可以了,不过只有DefaultMQProducer是可以完成自动配置的,而DefaultMQPushConsumer需要我们自己去注入。
之前的配置文件的key,由于是自己去注入的,所以可以随便写,现在改为spring自动去配置。所以要按照spring的规范来写。为什么要这样子写,直接看源码部分。

rocketmq.producer.group=test_prodoucer_group
rocketmq.nameServer=127.0.0.1:9876
@Configuration
public class MqConfiguration {
    
    @Value("${rocketmq.producer.group}")
    private String group;
    @Value("${rocketmq.nameServer}")
    private String nameServer;
    
//  @Bean通过spring来自动配置,所以不需要了
//  public DefaultMQProducer getDefaultMQProducer() {
//      DefaultMQProducer mqProducer = new DefaultMQProducer();
//      mqProducer.setProducerGroup(group);
//      mqProducer.setNamesrvAddr(nameServer);
//      return mqProducer;
//  }
    
    @Bean
    public DefaultMQPushConsumer getDefaultMQPushConsumer() throws MQClientException {
        DefaultMQPushConsumer mqushConsumer = new DefaultMQPushConsumer();
        mqushConsumer.setConsumerGroup(group);
        mqushConsumer.setNamesrvAddr(nameServer);
        mqushConsumer.subscribe("test_topic", "*");
        mqushConsumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
            
        });
        mqushConsumer.start();
        return mqushConsumer;
    }
}

源码部分,先来看看关键入口,RocketMQAutoConfiguration,看到defaultMQProducer这个方法相信大家都知道怎么去获取了。看看RocketMQProperties,就是一个很典型的配置类。那么这个RocketMQAutoConfiguration是怎么被找到的呢?就得了解一下springboot的自动装配了。毕竟这个启动类已经不在我们所自定义的config的包路径下了,那么它是怎么被注入的?

  @Configuration
@EnableConfigurationProperties(RocketMQProperties.class)
@ConditionalOnClass({MQAdmin.class, ObjectMapper.class})
@ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true)
@Import({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class})
@AutoConfigureAfter({JacksonFallbackConfiguration.class, MessageConverterConfiguration.class})
public class RocketMQAutoConfiguration {
    
    @Bean
    @ConditionalOnMissingBean(DefaultMQProducer.class)
    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
    public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
        //通过spring规定好的配置信息去创建DefaultMQProducer
        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
        String nameServer = rocketMQProperties.getNameServer();
        String groupName = producerConfig.getGroup();
        Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
        Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");

        String accessChannel = rocketMQProperties.getAccessChannel();

        DefaultMQProducer producer;
        String ak = rocketMQProperties.getProducer().getAccessKey();
        String sk = rocketMQProperties.getProducer().getSecretKey();
        if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {
            producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),
                rocketMQProperties.getProducer().isEnableMsgTrace(),
                rocketMQProperties.getProducer().getCustomizedTraceTopic());
            producer.setVipChannelEnabled(false);
        } else {
            producer = new DefaultMQProducer(groupName, rocketMQProperties.getProducer().isEnableMsgTrace(),
                rocketMQProperties.getProducer().getCustomizedTraceTopic());
        }

        producer.setNamesrvAddr(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
        producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
        producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
        producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
        producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
        producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());

        return producer;
    }
」

RocketMQProperties,rockmq配置信息的类,因为我们只需要配置上对应信息就好,里面就可以找到我们配置文件上的rocketmq.nameServer,以及rocketmq.producer.group

@SuppressWarnings("WeakerAccess")
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {

    /**
     * The name server for rocketMQ, formats: `host:port;host:port`.
     */
    private String nameServer;

    /**
     * Enum type for accesChannel, values: LOCAL, CLOUD
     */
    private String accessChannel;

    private Producer producer;

    /**
     * Configure enable listener or not.
     * In some particular cases, if you don't want the the listener is enabled when container startup,
     * the configuration pattern is like this :
     * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
     * <p>
     * the listener is enabled by default.
     */
    private Consumer consumer = new Consumer();

    public String getNameServer() {
        return nameServer;
    }

    public void setNameServer(String nameServer) {
        this.nameServer = nameServer;
    }

    public String getAccessChannel() {
        return accessChannel;
    }

    public void setAccessChannel(String accessChannel) {
        this.accessChannel = accessChannel;
    }

    public RocketMQProperties.Producer getProducer() {
        return producer;
    }

    public void setProducer(RocketMQProperties.Producer producer) {
        this.producer = producer;
    }

    public static class Producer {

        /**
         * Group name of producer.
         */
        private String group;

        /**
         * Millis of send message timeout.
         */
        private int sendMessageTimeout = 3000;

        /**
         * Compress message body threshold, namely, message body larger than 4k will be compressed on default.
         */
        private int compressMessageBodyThreshold = 1024 * 4;

        /**
         * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
         * This may potentially cause message duplication which is up to application developers to resolve.
         */
        private int retryTimesWhenSendFailed = 2;

        /**
         * <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
         * This may potentially cause message duplication which is up to application developers to resolve.
         */
        private int retryTimesWhenSendAsyncFailed = 2;

        /**
         * Indicate whether to retry another broker on sending failure internally.
         */
        private boolean retryNextServer = false;

        /**
         * Maximum allowed message size in bytes.
         */
        private int maxMessageSize = 1024 * 1024 * 4;

        /**
         * The property of "access-key".
         */
        private String accessKey;

        /**
         * The property of "secret-key".
         */
        private String secretKey;

        /**
         * Switch flag instance for message trace.
         */
        private boolean enableMsgTrace = true;

        /**
         * The name value of message trace topic.If you don't config,you can use the default trace topic name.
         */
        private String customizedTraceTopic = MixAll.RMQ_SYS_TRACE_TOPIC;

        public String getGroup() {
            return group;
        }

        public void setGroup(String group) {
            this.group = group;
        }

        public int getSendMessageTimeout() {
            return sendMessageTimeout;
        }

        public void setSendMessageTimeout(int sendMessageTimeout) {
            this.sendMessageTimeout = sendMessageTimeout;
        }

        public int getCompressMessageBodyThreshold() {
            return compressMessageBodyThreshold;
        }

        public void setCompressMessageBodyThreshold(int compressMessageBodyThreshold) {
            this.compressMessageBodyThreshold = compressMessageBodyThreshold;
        }

        public int getRetryTimesWhenSendFailed() {
            return retryTimesWhenSendFailed;
        }

        public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
            this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
        }

        public int getRetryTimesWhenSendAsyncFailed() {
            return retryTimesWhenSendAsyncFailed;
        }

        public void setRetryTimesWhenSendAsyncFailed(int retryTimesWhenSendAsyncFailed) {
            this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
        }

        public boolean isRetryNextServer() {
            return retryNextServer;
        }

        public void setRetryNextServer(boolean retryNextServer) {
            this.retryNextServer = retryNextServer;
        }

        public int getMaxMessageSize() {
            return maxMessageSize;
        }

        public void setMaxMessageSize(int maxMessageSize) {
            this.maxMessageSize = maxMessageSize;
        }

        public String getAccessKey() {
            return accessKey;
        }

        public void setAccessKey(String accessKey) {
            this.accessKey = accessKey;
        }

        public String getSecretKey() {
            return secretKey;
        }

        public void setSecretKey(String secretKey) {
            this.secretKey = secretKey;
        }

        public boolean isEnableMsgTrace() {
            return enableMsgTrace;
        }

        public void setEnableMsgTrace(boolean enableMsgTrace) {
            this.enableMsgTrace = enableMsgTrace;
        }

        public String getCustomizedTraceTopic() {
            return customizedTraceTopic;
        }

        public void setCustomizedTraceTopic(String customizedTraceTopic) {
            this.customizedTraceTopic = customizedTraceTopic;
        }
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

    public static final class Consumer {
        /**
         * listener configuration container
         * the pattern is like this:
         * group1.topic1 = false
         * group2.topic2 = true
         * group3.topic3 = false
         */
        private Map<String, Map<String, Boolean>> listeners = new HashMap<>();

        public Map<String, Map<String, Boolean>> getListeners() {
            return listeners;
        }

        public void setListeners(Map<String, Map<String, Boolean>> listeners) {
            this.listeners = listeners;
        }
    }

}

相关文章

  • 简单的异步任务工具类

    平时使用的时候,像一些异步任务,有些人会选择直接new一个线程去处理,这样子存在许多弊端。线程资源频繁的创建还有销...

  • Android多线程之AsyncTask机制原理分析

    Android5.1提供了工具类AsyncTask,它使创建异步任务变得更加简单,不再需要编写任务线程和Handl...

  • AsyncTask相关面试题

    什么是AsyncTask? AsyncTask(异步任务类),比Handler更轻量,更适合简单的异步操作内部实现...

  • 深入理解 AsyncTask

    AsyncTask 是一个简单实用的多线程异步任务工具类。 Android 开发中经常遇到需要将耗时的操作放到子线...

  • springboot2.x 使用带返回值的异步任务实现多线程并发

    1.在启动类上使用注解 @EnableAsync开启异步任务 创建SystemUtils 工具类, 用于辅助测试实...

  • AsyncTask基本使用

    描述及作用 AsyncTask(异步任务类),比Handler更轻量,更适合简单的异步操作 内部实现了对Threa...

  • AsyncTask 学习

    AsyncTask 类常用于处理 Android 的异步任务。 本文概括AsyncTask 的使用和简单分析内部实...

  • AsyncTask原理

    一:简单说下使用AsyncTask是Google提供的轻量级的异步任务类,该类中实现了异步操作,并提供接口返回结果...

  • 异步

    同步任务 && 异步任务 程序里面所有的任务,可以分成两类:同步任务(synchronous)和异步任务(asyn...

  • AsyncTask异步任务类

    目录介绍 01.先看下AsyncTask用法 02.AsyncTask源码深入分析2.1 构造方法源码分析2.2 ...

网友评论

      本文标题:简单的异步任务工具类

      本文链接:https://www.haomeiwen.com/subject/rtdcqktx.html