美文网首页
springboot2.x监听beanstalk实现异步任务

springboot2.x监听beanstalk实现异步任务

作者: 骑蚂蚁上高速_jun | 来源:发表于2020-11-06 22:37 被阅读0次

    项目中使用springboot 监听beanstalk 队列 ,接收到任务后将其异步投递到 线程池中运行。

    1.添加 java操作 beanstalk坐标

    <dependency>
          <groupId>com.dinstone</groupId>
          <artifactId>beanstalkc</artifactId>
          <version>2.3.0</version>
               
         <exclusions>
                  <!-- springboot使用logback,需要排除 log4j -->
             <exclusion>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-log4j12</artifactId>
             </exclusion>
        </exclusions>
    </dependency>
    
    1. 定时任务多线程配置。
      由于springboot中所有的定时任务都是单线程执行,就算是多个定时任务在一起也是单线程,所有只要其中的一个定时器造成阻塞,那么其他的所有定时任务都不会执行了。此时就必须配置定时任务的多线程模式运行。 由于定时任务监听 beanstalk,会造成阻塞,会影响到项目中其他的所有定时器,故而需要配置定时任务的线程池。。
    package com.configuration;
    
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.SchedulingConfigurer;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.scheduling.config.ScheduledTaskRegistrar;
    
    import java.util.concurrent.Executors;
    
    @Configuration
    public class ScheduledConfiguration implements SchedulingConfigurer /**/{
    
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            //设定一个长度10的定时任务线程池
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setThreadNamePrefix("scheduled@");
            taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10,executor));
        }
    }
    
    
    1. 使用 Schedule 定时任务线程监听队列。
    package com.task;
    
    import com.alibaba.fastjson.JSONObject;
    import com.dinstone.beanstalkc.BeanstalkClientFactory;
    import com.dinstone.beanstalkc.Configuration;
    import com.dinstone.beanstalkc.Job;
    import com.dinstone.beanstalkc.JobConsumer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Lazy;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.Objects;
    
    @Component
    @Slf4j
    @Lazy(false)
    public class CrontabTask {
    
    
        @Autowired
        SmtpSendMailer smtpSendMailer; // 线程池对象
      // 链接 beanstalk队列
       private JobConsumer connectBeanstalkConsumer(){
            Configuration config = new Configuration();
            config.setServiceHost("8.129.0.115");
            config.setServicePort(11300);
            BeanstalkClientFactory factory = new BeanstalkClientFactory(config);
            return factory.createJobConsumer("smtp-mail");
        }
    
        @Scheduled(fixedDelay = 1)
        public void executeInternal()  {
            JobConsumer consumer = connectBeanstalkConsumer();
            System.out.println("beanstalk连接的线程是:"+SystemUtils.getCurrentThreadName());
            while(true){
                Job job = null;
                try{
                    job = consumer.reserveJob(3);
                }catch(Throwable e){
                    System.out.println("beanstalk重连一次......");
                    consumer.close(); // 关闭原来的链接
                    consumer = connectBeanstalkConsumer();
                }
    
                if (Objects.isNull(job)) continue;
                String jobString = new String(job.getData());
                try{
                    DataDto dataDto = JSONObject.parseObject(jobString,  DataDto.class);
                    System.out.println(dataDto);
                    // 投递到异步队列线程池执行
                }catch(Throwable e){
                    System.out.println("非法的消费数据 : " + jobString);
                }
                consumer.deleteJob(job.getId());
            }
        }
    }
    
    1. 配置beanstalk任务线程池
    package com.configuration;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    
    @Configuration
    @EnableAsync // 开启异步任务
    @EnableScheduling // 开启定时任务
    public class TaskConfiguration {
    
    
        @Bean("smtpMailer")
        public Executor smtpMailer() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor .setCorePoolSize(10);// 设置最小的线程数量
            executor .setMaxPoolSize(50);// 设置最大的线程数量
            executor .setQueueCapacity(25);// 等待队列
            executor .setKeepAliveSeconds(60);
            executor .setThreadNamePrefix("smtpMailer@"); // 设置线程名称
            executor.initialize();
            return executor ;
        }
    
    }
    
    

    4 . 执行beanstalk 任务投递过来的异步任务

    @Async("smtpMailer")
        public void sendMailer(String jobString) {
            // 得到任务数据。。。。
            JSONObject jsonObject= JSONObject.parseObject(jobString);
            // TODO.. 继续任务
        }
    

    相关文章

      网友评论

          本文标题:springboot2.x监听beanstalk实现异步任务

          本文链接:https://www.haomeiwen.com/subject/mfmxbktx.html