美文网首页
一个多线程

一个多线程

作者: 你家门口的两朵云 | 来源:发表于2022-01-25 16:49 被阅读0次

依赖

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
        </dependency>

写一个工具类

package edu.hgnu.utils;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.concurrent.*;

/**
 * 任务数量不要大于2048
 */
public class Paraller {

    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("hbd-core-pool-%d").build();

    private static Semaphore globalSemaphore = new Semaphore(2048);

    //common Thread Pool
    private static volatile ExecutorService pool = null;

    static {
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        //使用SynchronousQueue 每次加入任务后立即从线程池中取一个线程或新增一个线程 , 线程过期策略15秒
        pool = new ThreadPoolExecutor(corePoolSize, 1024 * 20,
                5L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    }

    /**
     * 大部分情况下不需要调用
     */
    public static void shutDown() {
        if (pool != null) {
            pool.shutdown();
        }
    }

    /**
     * 提交一个任务到线程池中,通过该方法提交的并发任务数量超过1024*2则该方法会阻塞
     */
    public static void execute(ParallerAction action) {
        try {
            globalSemaphore.acquire();
            pool.execute(() -> {
                try {
                    action.action();
                } catch (Throwable ex) {
                    ex.printStackTrace();
                } finally {
                    globalSemaphore.release();
                }
            });
        } catch (Throwable ex) {
            ex.printStackTrace();
        }

    }

    public static void forEach(final List<ParallerAction> actions) {
        forEach(actions, null);
    }

    public static void forEach(final List<ParallerAction> actions, ParallelOptions parallelOptions) {

        if (actions == null || actions.size() < 1) {
            return;
        }

        int corePoolSize = Runtime.getRuntime().availableProcessors();
        int threadCount = 0;
        if (parallelOptions == null || parallelOptions.threadCount <= 0) {
            threadCount = corePoolSize;
        } else {
            threadCount = parallelOptions.threadCount;
        }

        if (threadCount > 1024) {
            threadCount = 1024;
        }

        final CountDownLatch latch = new CountDownLatch(actions.size());
        //限制入池流量
        final Semaphore semaphore = new Semaphore(threadCount);
        for (final ParallerAction item : actions) {
            try {
                //获取到线程许可才可以执行,否则堵塞,执行一次释放一个栅格
                semaphore.acquire();
                pool.execute(() -> {
                    try {
                        item.action();
                    } catch (Throwable ex) {
                        ex.printStackTrace();
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                });
            } catch (Throwable ex) {
                ex.printStackTrace();
            }
        }

        try {
            latch.await();
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public static class ParallelOptions {
        public ParallelOptions() {

        }

        public ParallelOptions(int threadCount) {
            this.threadCount = threadCount;
        }

        public int getThreadCount() {
            return threadCount;
        }

        public void setThreadCount(int threadCount) {
            this.threadCount = threadCount;
        }

        /**
         * 并发的线程数量
         */
        private int threadCount;

    }

    public interface ParallerAction {
        void action();
    }
}

使用多线程执行任务

public void mulThreadRun(){

        ArrayList<Emp> list = new ArrayList<>();
        list.add(new Emp().setEmpId("444").setName("李六"));
        list.add(new Emp().setEmpId("111").setName("张三"));
        list.add(new Emp().setEmpId("222").setName("Mr z"));
        list.add(new Emp().setEmpId("111").setName("张四"));
        list.add(new Emp().setEmpId("222").setName("Mr c"));
        list.add(new Emp().setEmpId("333").setName("Korean"));
        list.add(new Emp().setEmpId("111").setName("张五"));
        list.add(new Emp().setEmpId("333").setName("3060 TI"));
        list.add(new Emp().setEmpId("111").setName("张六"));
        list.add(new Emp().setEmpId("333").setName("2060 TI"));
        list.add(new Emp().setEmpId("444").setName("李四"));
        list.add(new Emp().setEmpId("333").setName("1080 TI"));
        list.add(new Emp().setEmpId("444").setName("李五"));

        System.out.println(list);

        Map<String, List<Emp>> listMap = list.parallelStream().collect(Collectors.groupingBy(Emp::getEmpId));
        System.out.println(listMap);

        // 分组执行
        List<Paraller.ParallerAction> actions = new ArrayList<>(listMap.size());
        listMap.forEach((key, value) -> {
            actions.add(new Action(key, value));
        });
        // 遍历调用执行
        Paraller.forEach(actions, null);
    }

    /**
     * 每个线程的具体执行
     */
    private class Action implements Paraller.ParallerAction {

        String keyF;
        List<Emp> listF;
        // 构造方法,将需要的数据传进来;
        public Action(String key,List<Emp> list) {
            this.keyF = key;
            this.listF = list;
        }

        // 实际执行动作
        @Override
        public void action() {
            System.out.println(keyF+"-------"+listF.toString());
            Paraller.shutDown();
        }
    }

测试

public class TestMulThread {
    public static void main(String[] args) {

        MulThread mulThread = new MulThread();
        mulThread.mulThreadRun();
    }
}

相关文章

  • 多线程

    创建一个多线程 创建多线程-继承线程类 创建多线程-实现Runnable接口 创建多线程-匿名类code

  • 多线程

    1、你理解的多线程(什么是多线程、多线程的原理、多线程的优缺点)? 首先多线程就是在一个进程里面开启了多条线程同时...

  • mysql.connector

    一、基本操作 二、多线程存取Mysql数据 创建表 (一)多线程存入数据 这是在一个表中多线程插入数据最好是多线程...

  • iOS面试之多线程大全

    多线程 多线程内容如下: GCD NSOperation NSThread 多线程与锁 1.GCD 作为一个开发者...

  • 多线程

    多线程 多线程:一个进程中开辟多条线程,同时完成不同的任务,就是多线程。 创建方式:pthread:POSIX线程...

  • 多进程和多线程的应用场景

    其实,使用多线程编程还是使用多进程编程,有一个简单的原则,如果能使用多线程实现的,就用多线程,不能使用多线程实现的...

  • iOS 开发基础(8)--多线程的理解

    多线程 什么是多线程?多线程就是一个进程中可以开启多条线程,每条线程可以并行执行不同的任务,提高执行效率;一个基本...

  • 多线程笔记1-线程的共享与协作

    什么是多线程的共享? 什么是多线程之间的协作? 多线程的共享:是指多个线程访问同一个对象。 多线程的协作是指:当A...

  • jenkov - 1.Java并发与多线程教程

    什么是多线程 多线程的意思是在一个程序里有多个线程执行。一个线程就像一个独立的CPU在执行你的程序。所以,多线程程...

  • Python多线程

    Python多线程 多任务不仅可以使用多进程完成,也可使用多线程完成。一个进程可以包含很多线程,但至少含有一个线程...

网友评论

      本文标题:一个多线程

      本文链接:https://www.haomeiwen.com/subject/doiwhrtx.html