1. ThreadPoolUtil
package com.example.threadpool.utils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程池的工具类
* 用于进行线程的管理,防止重复创建、杀死线程。
* <p>
* 多线程运行期间,如果系统不断的创建、杀死新线程,
* 会产生过度消耗系统资源,以及过度切换线程的问题,甚至可能导致系统资源的崩溃。
* 因此需要线程池,对线程进行管理。
*/
@Slf4j
public class ThreadPoolUtil {
public static final ThreadPoolUtil EXCUTOR = new ThreadPoolUtil();
//核心线程池的数量,同时能够执行的线程数量
private int corePoolSize;
//最大线程池数量,表示当缓冲队列满的时候能继续容纳的等待任务的数量
private int maxPoolSize;
//存活时间
private long keepAliveTime = 1;
private TimeUnit unit = TimeUnit.HOURS;
private ThreadPoolExecutor executor;
private ThreadPoolUtil() {
//给corePoolSize赋值:当前设备可用处理器核心数*2 + 1,能够让cpu的效率得到最大程度执行(有研究论证的)
corePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
maxPoolSize = corePoolSize;
executor = new ThreadPoolExecutor(
//当某个核心任务执行完毕,会依次从缓冲队列中取出等待任务
corePoolSize,
// 然后new LinkedBlockingQueue<Runnable>(),然后maximumPoolSize,但是它的数量是包含了corePoolSize的
maxPoolSize,
//表示的是maximumPoolSize当中等待任务的存活时间
keepAliveTime,
unit,
//缓冲队列,用于存放等待任务,Linked的先进先出
new LinkedBlockingQueue<Runnable>(),
new DefaultThreadFactory(Thread.NORM_PRIORITY, "thread-pool-"),
new ThreadPoolExecutor.AbortPolicy()
);
log.info("Class ThreadPoolUtil field corePoolSize ={},maxPoolSize ={} ", corePoolSize, maxPoolSize);
}
/**
* 执行任务 无返回值
*
* @param runnable
*/
public void execute(Runnable runnable) {
if (runnable != null) {
executor.execute(runnable);
}
}
/**
* 执行任务 有返回值
*
* @param callable
*/
public <T> T executeReturn(Callable<T> callable) {
try {
return executor.submit(callable).get();
} catch (Exception e) {
log.error("Class ThreadPoolUtil executeReturn fail message :{}",e.getMessage());
}
return null;
}
/**
* 执行任务 有返回值
*
* @param callables
*/
public <T>List<Future<T>> executeReturn(List<Callable<T>> callables) {
try {
return executor.invokeAll(callables);
} catch (Exception e) {
log.error("Class ThreadPoolUtil executeReturn fail message :{}",e.getMessage());
}
return null;
}
/**
* 移除任务
*
* @param runnable
*/
public void remove(Runnable runnable) {
if (runnable != null) {
executor.remove(runnable);
}
}
private static class DefaultThreadFactory implements ThreadFactory {
//线程池的计数
private static final AtomicInteger poolNumber = new AtomicInteger(1);
//线程的计数
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final String namePrefix;
private final int threadPriority;
DefaultThreadFactory(int threadPriority, String threadNamePrefix) {
this.threadPriority = threadPriority;
this.group = Thread.currentThread().getThreadGroup();
this.namePrefix = threadNamePrefix + poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(@NonNull Runnable r) {
Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
// 返回True该线程就是守护线程
// 守护线程应该永远不去访问固有资源,如:数据库、文件等。因为它会在任何时候甚至在一个操作的中间发生中断。
if (thread.isDaemon()) {
thread.setDaemon(false);
}
thread.setPriority(threadPriority);
return thread;
}
}
}
2.UserController
package com.example.threadpool.controller;
import com.example.threadpool.model.User;
import com.example.threadpool.service.IUservice;
import com.example.threadpool.utils.ThreadPoolUtil;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* @program: demo-threadpool
* @description
* @author: tina.liu
* @create: 2021-12-12 16:53
**/
@RestController
@Slf4j
public class UserController {
private static final Long num = 20000L ;
@Autowired
private IUservice uservice;
@PostMapping(value = "/batchInsert")
public Object batchInsertUsers() {
uservice.batchInsertUsers();
ImmutableMultimap<Object, Object> build = ImmutableMultimap.builder()
.put("code", "200")
.put("data", "批量添加成功!")
.put("message", "succesful").build();
return build;
}
@GetMapping("/queryList")
public Object queryList() {
Instant start = Instant.now();
List<User> list = uservice.queryList();
Duration d = Duration.between(start, Instant.now());
Map<String, Object> resultMap = Maps.newHashMap();
resultMap.put("count", list.size());
resultMap.put("spend", d.getSeconds() + "s");
return resultMap;
}
@GetMapping("/queryListAsync")
public Object queryListAsync() {
Instant start = Instant.now();
List<User> users = Lists.newArrayList();
List<Callable<List<User>>> callables = this.initQueryUsersTask();
ExecutorService executorService = Executors.newFixedThreadPool(10);
try {
List<Future<List<User>>> futures = executorService.invokeAll(callables);
for (Future<List<User>> f :futures ) {
users.addAll(f.get());
}
} catch (Exception e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
Duration d = Duration.between(start, Instant.now());
return getResultMap(users.size(),d.getSeconds());
}
@GetMapping("/queryListAsync2")
public Object queryListAsync2() {
Instant start = Instant.now();
List<User> users = Lists.newArrayList();
List<Callable<List<User>>> callables = this.initQueryUsersTask();
try {
List<Future<List<User>>> futures = ThreadPoolUtil.EXCUTOR.executeReturn(callables);
for (Future<List<User>> f :futures ) {
users.addAll(f.get());
}
} catch (Exception e) {
log.error("Class UserController queryListAsync2 fail message :{}",e.getMessage());
}
Duration d = Duration.between(start, Instant.now());
return getResultMap(users.size(),d.getSeconds());
}
@GetMapping("queryListOnNotice")
public Object queryListOnNotice(){
Instant start = Instant.now();
ThreadPoolUtil.
EXCUTOR.
execute(()->{
System.out.println("吃饭");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("喝水");
});
Duration d = Duration.between(start, Instant.now());
return getResultMap(0,d.getSeconds());
}
@GetMapping("queryListOnNotice2")
public Object queryListOnNotice2() throws ExecutionException, InterruptedException {
Instant start = Instant.now();
List<Callable<String>> callables = Lists.newArrayList();
callables.add(()->step1());
callables.add(()->step2());
callables.add(()->step3());
ArrayList<String> result = Lists.newArrayList();
List<Future<String>> futures = ThreadPoolUtil.EXCUTOR
.executeReturn(callables);
for (Future<String> future:futures) {
result.add(future.get());
}
Duration d = Duration.between(start, Instant.now());
return getResultMap(result.size(),d.getSeconds());
}
private String step1(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("current_thread_name:{} invoke step1",Thread.currentThread().getName());
return "step1";
}
private String step2(){
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("current_thread_name:{} invoke step2",Thread.currentThread().getName());
return "step2";
}
private String step3(){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("current_thread_name:{} invoke step3",Thread.currentThread().getName());
return "step3";
}
private List<Callable<List<User>>> initQueryUsersTask() {
Long counts = uservice.count();
List<Callable<List<User>>> callables = Lists.newArrayList();
long nums = counts % num == 0 ? counts / num : counts / num + 1;
for (int i = 0; i < nums; i++) {
int rowNum = i * num.intValue();
int rowSize = num.intValue();
callables.add(() -> uservice.queryListLimit(rowNum,rowSize));
}
return callables;
}
private Map<String, Object> getResultMap(long count,long spend){
Map<String, Object> resultMap = Maps.newHashMap();
resultMap.put("count", count);
resultMap.put("spend", spend + "s");
return resultMap;
}
}
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-android</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
@PostMapping
public Object batchInsetAsync(){
ExecutorService executorService = Executors.newFixedThreadPool(10);
int count = LIST.size();
int threadSize = count % 3000 == 0 ? count / 3000 : (count / 3000) + 1;
List<Callable<Boolean>> tasks = new ArrayList<>();
for (int i = 0; i < threadSize; i++) {
//0,3000,3000 6000, 6000 6001
int k = i;
int start = k * 3000;
int index = (k + 1) * 3000;
tasks.add(() -> {
int end = k == threadSize-1 ? count : index;
//业务批量插入
List<Model> models = LIST.subList(start, end);
modelMapper.batchInsert(models);
log.info("currentTime:{},threadName:{}", Instant.now(),Thread.currentThread().getName());
return Boolean.TRUE;
});
}
try {
executorService.invokeAll(tasks);
} catch (InterruptedException e) {
log.error("e:{}",e);
e.printStackTrace();
}
executorService.shutdown();
return "success2222";
}
网友评论