美文网首页
多线程Query 和 多线程insert

多线程Query 和 多线程insert

作者: 刘小刀tina | 来源:发表于2021-12-11 20:20 被阅读0次

1. ThreadPoolUtil

package com.example.threadpool.utils;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程池的工具类
 * 用于进行线程的管理,防止重复创建、杀死线程。
 * <p>
 * 多线程运行期间,如果系统不断的创建、杀死新线程,
 * 会产生过度消耗系统资源,以及过度切换线程的问题,甚至可能导致系统资源的崩溃。
 * 因此需要线程池,对线程进行管理。
 */
@Slf4j
public class ThreadPoolUtil {

    public static final ThreadPoolUtil EXCUTOR = new ThreadPoolUtil();
    //核心线程池的数量,同时能够执行的线程数量
    private int corePoolSize;
    //最大线程池数量,表示当缓冲队列满的时候能继续容纳的等待任务的数量
    private int maxPoolSize;
    //存活时间
    private long keepAliveTime = 1;
    private TimeUnit unit = TimeUnit.HOURS;
    private ThreadPoolExecutor executor;

    private ThreadPoolUtil() {
        //给corePoolSize赋值:当前设备可用处理器核心数*2 + 1,能够让cpu的效率得到最大程度执行(有研究论证的)
        corePoolSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
        maxPoolSize = corePoolSize;
        executor = new ThreadPoolExecutor(
                //当某个核心任务执行完毕,会依次从缓冲队列中取出等待任务
                corePoolSize,
                // 然后new LinkedBlockingQueue<Runnable>(),然后maximumPoolSize,但是它的数量是包含了corePoolSize的
                maxPoolSize,
                //表示的是maximumPoolSize当中等待任务的存活时间
                keepAliveTime,
                unit,
                //缓冲队列,用于存放等待任务,Linked的先进先出
                new LinkedBlockingQueue<Runnable>(),
                new DefaultThreadFactory(Thread.NORM_PRIORITY, "thread-pool-"),
                new ThreadPoolExecutor.AbortPolicy()
        );
        log.info("Class ThreadPoolUtil field corePoolSize ={},maxPoolSize ={} ", corePoolSize, maxPoolSize);
    }

    /**
     * 执行任务 无返回值
     *
     * @param runnable
     */
    public void execute(Runnable runnable) {
        if (runnable != null) {
            executor.execute(runnable);
        }
    }

    /**
     * 执行任务 有返回值
     *
     * @param callable
     */
    public <T> T executeReturn(Callable<T> callable) {
        try {
            return executor.submit(callable).get();
        } catch (Exception e) {
            log.error("Class ThreadPoolUtil executeReturn fail message :{}",e.getMessage());
        }
        return null;
    }




    /**
     * 执行任务 有返回值
     *
     * @param callables
     */
    public <T>List<Future<T>> executeReturn(List<Callable<T>> callables) {
        try {
            return executor.invokeAll(callables);
        } catch (Exception e) {
            log.error("Class ThreadPoolUtil executeReturn fail message :{}",e.getMessage());
        }
        return null;
    }


    /**
     * 移除任务
     *
     * @param runnable
     */
    public void remove(Runnable runnable) {
        if (runnable != null) {
            executor.remove(runnable);
        }
    }

    private static class DefaultThreadFactory implements ThreadFactory {

        //线程池的计数
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        //线程的计数
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final String namePrefix;
        private final int threadPriority;

        DefaultThreadFactory(int threadPriority, String threadNamePrefix) {
            this.threadPriority = threadPriority;
            this.group = Thread.currentThread().getThreadGroup();
            this.namePrefix = threadNamePrefix + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(@NonNull Runnable r) {
            Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            // 返回True该线程就是守护线程
            // 守护线程应该永远不去访问固有资源,如:数据库、文件等。因为它会在任何时候甚至在一个操作的中间发生中断。
            if (thread.isDaemon()) {
                thread.setDaemon(false);
            }
            thread.setPriority(threadPriority);
            return thread;
        }
    }

}


2.UserController

package com.example.threadpool.controller;

import com.example.threadpool.model.User;
import com.example.threadpool.service.IUservice;
import com.example.threadpool.utils.ThreadPoolUtil;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @program: demo-threadpool
 * @description
 * @author: tina.liu
 * @create: 2021-12-12 16:53
 **/
@RestController
@Slf4j
public class UserController {

    private static final Long num = 20000L ;
    @Autowired
    private IUservice uservice;


    @PostMapping(value = "/batchInsert")
    public Object batchInsertUsers() {
        uservice.batchInsertUsers();
        ImmutableMultimap<Object, Object> build = ImmutableMultimap.builder()
                .put("code", "200")
                .put("data", "批量添加成功!")
                .put("message", "succesful").build();
        return build;
    }

    @GetMapping("/queryList")
    public Object queryList() {
        Instant start = Instant.now();
        List<User> list = uservice.queryList();
        Duration d = Duration.between(start, Instant.now());
        Map<String, Object> resultMap = Maps.newHashMap();
        resultMap.put("count", list.size());
        resultMap.put("spend", d.getSeconds() + "s");
        return resultMap;
    }


    @GetMapping("/queryListAsync")
    public Object queryListAsync() {
        Instant start = Instant.now();
        List<User> users = Lists.newArrayList();
        List<Callable<List<User>>> callables = this.initQueryUsersTask();
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        try {
            List<Future<List<User>>> futures = executorService.invokeAll(callables);
            for (Future<List<User>> f :futures ) {
                users.addAll(f.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            executorService.shutdown();
        }
        Duration d = Duration.between(start, Instant.now());
        return getResultMap(users.size(),d.getSeconds());
    }

    @GetMapping("/queryListAsync2")
    public Object queryListAsync2() {
        Instant start = Instant.now();
        List<User> users = Lists.newArrayList();
        List<Callable<List<User>>> callables = this.initQueryUsersTask();
        try {
            List<Future<List<User>>> futures = ThreadPoolUtil.EXCUTOR.executeReturn(callables);
            for (Future<List<User>> f :futures ) {
                users.addAll(f.get());
            }
        } catch (Exception e) {
            log.error("Class UserController queryListAsync2 fail message :{}",e.getMessage());
        }
        Duration d = Duration.between(start, Instant.now());
        return getResultMap(users.size(),d.getSeconds());
    }


    @GetMapping("queryListOnNotice")
    public Object queryListOnNotice(){
        Instant start = Instant.now();
        ThreadPoolUtil.
                EXCUTOR.
                execute(()->{
                    System.out.println("吃饭");
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("喝水");
                });
        Duration d = Duration.between(start, Instant.now());
        return getResultMap(0,d.getSeconds());
    }


    @GetMapping("queryListOnNotice2")
    public Object queryListOnNotice2() throws ExecutionException, InterruptedException {
        Instant start = Instant.now();
        List<Callable<String>> callables = Lists.newArrayList();
        callables.add(()->step1());
        callables.add(()->step2());
        callables.add(()->step3());
        ArrayList<String> result = Lists.newArrayList();
        List<Future<String>> futures = ThreadPoolUtil.EXCUTOR
                .executeReturn(callables);
        for (Future<String> future:futures) {
            result.add(future.get());
        }
        Duration d = Duration.between(start, Instant.now());
        return getResultMap(result.size(),d.getSeconds());
    }

    private String step1(){
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("current_thread_name:{} invoke step1",Thread.currentThread().getName());
        return "step1";
    }

    private String step2(){
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("current_thread_name:{} invoke step2",Thread.currentThread().getName());
        return "step2";
    }

    private String step3(){
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("current_thread_name:{} invoke step3",Thread.currentThread().getName());
        return "step3";
    }

    private List<Callable<List<User>>> initQueryUsersTask() {
        Long counts = uservice.count();
        List<Callable<List<User>>> callables = Lists.newArrayList();
        long nums = counts % num == 0 ? counts / num : counts / num + 1;
        for (int i = 0; i < nums; i++) {
            int rowNum = i * num.intValue();
            int rowSize = num.intValue();
            callables.add(() ->  uservice.queryListLimit(rowNum,rowSize));
        }
        return callables;
    }

    private Map<String, Object> getResultMap(long count,long spend){
        Map<String, Object> resultMap = Maps.newHashMap();
        resultMap.put("count", count);
        resultMap.put("spend", spend + "s");
        return resultMap;
    }


}




    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.2</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>28.2-android</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>


 @PostMapping
    public Object batchInsetAsync(){
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        int count = LIST.size();
        int threadSize = count % 3000 == 0 ? count / 3000 : (count / 3000) + 1;
        List<Callable<Boolean>> tasks = new ArrayList<>();
        for (int i = 0; i < threadSize; i++) {
            //0,3000,3000 6000, 6000 6001
            int k = i;
            int start = k * 3000;
            int index = (k + 1) * 3000;
            tasks.add(() -> {
                int end = k == threadSize-1 ? count : index;
                //业务批量插入
                List<Model> models = LIST.subList(start, end);
                modelMapper.batchInsert(models);
                log.info("currentTime:{},threadName:{}", Instant.now(),Thread.currentThread().getName());
                return Boolean.TRUE;
            });
        }
        try {
            executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            log.error("e:{}",e);
            e.printStackTrace();
        }
        executorService.shutdown();
        return "success2222";
    }


相关文章

  • 多线程Query 和 多线程insert

    1. ThreadPoolUtil 2.UserController

  • iOS多线程技术方案

    多线程技术方案 目录 一、多线程简介 1、多线程的由来 2、耗时操作的模拟试验 3、进程和线程 4、多线程的...

  • iOS面试之多线程模块

    多线程 多线程内容如下: GCD NSOperation NSThread 多线程与锁 1.GCD 同步/异步和串...

  • iOS多线程 NSOperation

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • iOS多线程 pthread、NSThread

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • iOS多线程: GCD

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • iOS多线程运用

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • iOS多线程基础

    系列文章: 多线程 多线程 pthread、NSThread 多线程 GCD 多线程 NSOperation 多线...

  • java 多线程

    多线程: 说到多线程,最先提到的就是Thread和Runnable。实现多线程可以通过继承Thread 或者 实现...

  • iOS多线程技术方案

    目录 一、多线程简介 1、多线程的由来2、耗时操作的模拟试验3、进程和线程4、多线程的概念及原理5、多线程的优缺点...

网友评论

      本文标题:多线程Query 和 多线程insert

      本文链接:https://www.haomeiwen.com/subject/lmqdfrtx.html