项目中使用springboot 监听beanstalk 队列 ,接收到任务后将其异步投递到 线程池中运行。
1.添加 java操作 beanstalk坐标
<dependency>
<groupId>com.dinstone</groupId>
<artifactId>beanstalkc</artifactId>
<version>2.3.0</version>
<exclusions>
<!-- springboot使用logback,需要排除 log4j -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
- 定时任务多线程配置。
由于springboot中所有的定时任务都是单线程执行,就算是多个定时任务在一起也是单线程,所有只要其中的一个定时器造成阻塞,那么其他的所有定时任务都不会执行了。此时就必须配置定时任务的多线程模式运行。 由于定时任务监听 beanstalk,会造成阻塞,会影响到项目中其他的所有定时器,故而需要配置定时任务的线程池。。
package com.configuration;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.Executors;
@Configuration
public class ScheduledConfiguration implements SchedulingConfigurer /**/{
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
//设定一个长度10的定时任务线程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("scheduled@");
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10,executor));
}
}
- 使用 Schedule 定时任务线程监听队列。
package com.task;
import com.alibaba.fastjson.JSONObject;
import com.dinstone.beanstalkc.BeanstalkClientFactory;
import com.dinstone.beanstalkc.Configuration;
import com.dinstone.beanstalkc.Job;
import com.dinstone.beanstalkc.JobConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Objects;
@Component
@Slf4j
@Lazy(false)
public class CrontabTask {
@Autowired
SmtpSendMailer smtpSendMailer; // 线程池对象
// 链接 beanstalk队列
private JobConsumer connectBeanstalkConsumer(){
Configuration config = new Configuration();
config.setServiceHost("8.129.0.115");
config.setServicePort(11300);
BeanstalkClientFactory factory = new BeanstalkClientFactory(config);
return factory.createJobConsumer("smtp-mail");
}
@Scheduled(fixedDelay = 1)
public void executeInternal() {
JobConsumer consumer = connectBeanstalkConsumer();
System.out.println("beanstalk连接的线程是:"+SystemUtils.getCurrentThreadName());
while(true){
Job job = null;
try{
job = consumer.reserveJob(3);
}catch(Throwable e){
System.out.println("beanstalk重连一次......");
consumer.close(); // 关闭原来的链接
consumer = connectBeanstalkConsumer();
}
if (Objects.isNull(job)) continue;
String jobString = new String(job.getData());
try{
DataDto dataDto = JSONObject.parseObject(jobString, DataDto.class);
System.out.println(dataDto);
// 投递到异步队列线程池执行
}catch(Throwable e){
System.out.println("非法的消费数据 : " + jobString);
}
consumer.deleteJob(job.getId());
}
}
}
- 配置beanstalk任务线程池
package com.configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync // 开启异步任务
@EnableScheduling // 开启定时任务
public class TaskConfiguration {
@Bean("smtpMailer")
public Executor smtpMailer() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor .setCorePoolSize(10);// 设置最小的线程数量
executor .setMaxPoolSize(50);// 设置最大的线程数量
executor .setQueueCapacity(25);// 等待队列
executor .setKeepAliveSeconds(60);
executor .setThreadNamePrefix("smtpMailer@"); // 设置线程名称
executor.initialize();
return executor ;
}
}
4 . 执行beanstalk 任务投递过来的异步任务
@Async("smtpMailer")
public void sendMailer(String jobString) {
// 得到任务数据。。。。
JSONObject jsonObject= JSONObject.parseObject(jobString);
// TODO.. 继续任务
}
网友评论