美文网首页
Java基础-线程并发工具类

Java基础-线程并发工具类

作者: 涛涛123759 | 来源:发表于2021-04-21 11:14 被阅读0次

    Android知识总结

    一、分而治之原理(fork/join )

    十大经典算法:快速排序、推排序、归并排序、二叉排序、线性查找、深度优先、广度优先、Dijkstra、动态规化、朴素贝叶斯分类。

    在计算机十大经典算法中,快速排序归并排序二分查找用的是分而治之原理。

    1、定义

    在Java的Fork/Join框架中,使用两个类完成上述操作

    • 1、ForkJoinTask:我们要使用Fork/Join框架,首先需要创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接集成ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类:
    • RecursiveTask同步用法同时演示有返回结果,统计整形数组中所有元素的和。
    • RecursiveAction异步用法同时演示不要求返回值,遍历指定目标(含子目录)寻找指定类型文件。
    • 2、ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
    • 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务(工作窃取算法)。

    2、归并排序-同步用法

    2.1、数组集合

    public class MakeArray {
        public static final int MAX_COUNT = 40000;
    
        public static int[] getArrays(){
            int[] nums = new int[MAX_COUNT];
            Random random = new Random();
            for (int i = 0; i < MAX_COUNT; i++) {
                nums[i] = random.nextInt(MAX_COUNT);
            }
            return nums;
        }
    }
    

    2.2、求数组中的和

    public class SunArray {
        public static class SumTask extends RecursiveTask<Integer>{
            private static final int THRESHOLD = MakeArray.MAX_COUNT/10;
            private int[] nums;
            private int fromIndex;
            private int toIndex;
    
            public SumTask(int[] nums, int fromIndex, int toIndex) {
                this.nums = nums;
                this.fromIndex = fromIndex;
                this.toIndex = toIndex;
            }
    
            @Override
            protected Integer compute() { //运用递归算法
                if (toIndex - fromIndex < THRESHOLD){
                    System.out.println("form index = " + fromIndex + "toIndex = " + toIndex);
                    int count = 0;
                    for (int i = fromIndex; i < toIndex; i++) {
                        count += nums[i];
                    }
                    return count;
                } else {
                    int mid = (toIndex + fromIndex) / 2;
                    SumTask left = new SumTask(nums, fromIndex, mid);
                    SumTask right = new SumTask(nums, mid, toIndex);
                    invokeAll(left, right);
                    return left.join() + right.join();
                }
            }
        }
        
        public static void main(String[] argc){
            int[] arrays = MakeArray.getArrays();
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            SumTask sumTask = new SumTask(arrays, 0, arrays.length);
            long start = System.currentTimeMillis();
            forkJoinPool.invoke(sumTask);
            System.out.println("The count is" + sumTask.join() +
                    "spend time" + (System.currentTimeMillis() - start) + "ms");
        }
    }
    

    3、异步用法

    /**
     *类说明:遍历指定目录(含子目录)找寻指定类型文件
     */
    public class FindDirsFiles extends RecursiveAction {
        private File path;
        public FindDirsFiles(File path) {
            this.path = path;
        }
        @Override
        protected void compute() {
            List<FindDirsFiles> subTasks = new ArrayList<>();
            File[] files = path.listFiles();
            if (files!=null){
                for (File file : files) {
                    if (file.isDirectory()) {
                        // 对每个子目录都新建一个子任务。
                        subTasks.add(new FindDirsFiles(file));
                    } else {
                        // 遇到文件,检查。
                        if (file.getAbsolutePath().endsWith("txt")){
                            System.out.println("文件:" + file.getAbsolutePath());
                        }
                    }
                }
                if (!subTasks.isEmpty()) {
                    // 在当前的 ForkJoinPool 上调度所有的子任务。
                    for (FindDirsFiles subTask : invokeAll(subTasks)) {
                        subTask.join();
                    }
                }
            }
        }
    
        public static void main(String [] args){
            try {
                // 用一个 ForkJoinPool 实例调度总任务
                ForkJoinPool pool = new ForkJoinPool();
                FindDirsFiles task = new FindDirsFiles(new File("F:/"));
    
                /*异步提交*/
                pool.execute(task);
                /*主线程做自己的业务工作*/
                System.out.println("Task is Running......");
                Thread.sleep(1);
                int otherWork = 0;
                for(int i=0;i<100;i++){
                    otherWork = otherWork+i;
                }
                System.out.println("Main Thread done sth......,otherWork=" +otherWork);
                task.join();//阻塞方法
                System.out.println("Task end");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    二、CountDownLatch 计数器

    CountDownLatch 示意图
    • countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
    • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,闭锁上等待的线程就可以恢复工作了。
    • 使用AQS的共享方式,内部实现了AbstractQueuedSynchronizer的内部类。
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
            Sync(int count) {
                setState(count);
            }
            int getCount() {
                return getState();
            }
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c - 1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    

    注意:一个线程可以多次减一;闭锁线程可以有多个且闭锁线程执行任务时其他线程可能还在执行

    示例演示

    /**
     *类说明:演示CountDownLatch用法,
     * 共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
     */
    public class UseCountDownLatch {
        static CountDownLatch latch = new CountDownLatch(6);
        /*初始化线程*/
        private static class MyRunnable implements Runnable {
            @Override
            public void run() {
                System.out.println("Thread_" + Thread.currentThread().getId()
                        + " ready init work......");
                latch.countDown();
                for (int i = 0; i < 2; i++) {
                    System.out.println("Thread_" + Thread.currentThread().getId()
                            + " ........continue do its work");
                }
            }
        }
    
        /*业务线程等待latch的计数器为0完成*/
        private static class MyThread implements Runnable {
            @Override
            public void run() {
                try {
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                for (int i = 0; i < 3; i++) {
                    System.out.println("BusiThread_" + Thread.currentThread().getId()
                            + " do business-----");
                }
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1);
                        System.out.println("Thread_" + Thread.currentThread().getId()
                                + " ready init work step 1st......");
                        latch.countDown();
                        System.out.println("begin step 2nd.......");
                        Thread.sleep(1);
                        System.out.println("Thread_" + Thread.currentThread().getId()
                                + " ready init work step 2nd......");
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            new Thread(new MyThread()).start();
            for (int i = 0; i <= 3; i++) {
                Thread thread = new Thread(new MyRunnable());
                thread.start();
            }
            latch.await();
            System.out.println("Main do ites work........");
        }
    }
    

    三、CyclicBarrier

    • CyclicBarrier 可以反复的调用,可理解为循环栅栏
    • CyclicBarrier是有工作线程本身协调完成的,CountDownLatch 工作线程是由外面的线程协调完成的。
    • CyclicBarrier 在工作线程之后可以用 barrierAction 来完成汇总的;CountDownLatch 在运行时不能做其他的操作的。。
    • CyclicBarrier 是参与线程的个数是相等的,CountDownLatch 可以不相同。
    CyclicBarrier 示意图
    • 示例1
    public class UseCyclicBarrier {
        public static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());
        //存放子线程工作结果的容器
        private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();
    
        public static void main(String[] args) {
            for (int i = 0; i <= 4; i++) {
                Thread thread = new Thread(new SubThread());
                thread.start();
            }
    
        }
    
        private static class CollectThread implements Runnable {
    
            @Override
            public void run() {
                StringBuilder result = new StringBuilder();
                for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
                    result.append("[" + workResult.getValue() + "]");
                }
                System.out.println(" the result = " + result);
                System.out.println("do other business........");
            }
        }
    
        private static class SubThread implements Runnable {
    
            @Override
            public void run() {
                long id = Thread.currentThread().getId();
                resultMap.put(Thread.currentThread().getId() + "", id);
                Random r = new Random();
                try {
                    Thread.sleep(1000 + id);
                    System.out.println("Thread_" + id + " ....do something ");
                    barrier.await();
                    Thread.sleep(1000 + id);
                    barrier.await(); //可以反复的调用
                    System.out.println("Thread_" + id + " ....do its business ");
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            }
        }
    }
    
    • 示例2
    public class CyclicBarrierDemo {
        public static class Soldier implements Runnable {
            private String soldier;
            private final CyclicBarrier cyclic;
    
            Soldier(CyclicBarrier cyclic, String soldierName) {
                this.cyclic = cyclic;
                this.soldier = soldierName;
            }
    
            public void run() {
                try {
                    //等待所有士兵到齐
                    cyclic.await();
                    doWork();
                    //等待所有士兵完成工作
                    cyclic.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
    
            void doWork() {
                try {
                    Thread.sleep(Math.abs(new Random().nextInt() % 10000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(soldier + ":任务完成");
            }
        }
    
        public static class BarrierRun implements Runnable {
            boolean flag;
            int N;
    
            public BarrierRun(boolean flag, int N) {
                this.flag = flag;
                this.N = N;
            }
    
            public void run() {
                if (flag) {
                    System.out.println("司令:[士兵" + N + "个,任务完成!]");
                } else {
                    System.out.println("司令:[士兵" + N + "个,集合完毕!]");
                    flag = true;
                }
            }
        }
    
        public static void main(String args[]) throws InterruptedException {
            final int N = 10;
            Thread[] allSoldier = new Thread[N];
            boolean flag = false;
            CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
            //设置屏障点,主要是为了执行这个方法
            System.out.println("集合队伍!");
            for (int i = 0; i < N; ++i) {
                System.out.println("士兵 " + i + " 报道!");
                allSoldier[i] = new Thread(new Soldier(cyclic, "士兵 " + i));
                allSoldier[i].start();
            }
        }
    }
    

    四、Semaphore 信号量

    Semaphore 可以通过其限制执行的线程数量,达到限流(流控)的效果。

    当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。


    • 示例1
    private static class MyRunnable implements Runnable {
        // 成员属性 Semaphore对象
        private final Semaphore semaphore;
    
        public MyRunnable(Semaphore semaphore) {
            this.semaphore = semaphore;
        }
    
        public void run() {
            String threadName = Thread.currentThread().getName();
            // 获取许可
            boolean acquire = semaphore.tryAcquire();
            // 未获取到许可 结束
            if (!acquire) {
                System.out.println("线程【" + threadName + "】未获取到许可,结束");
                return;
            }
            // 获取到许可
            try {
                System.out.println("线程【" + threadName + "】获取到许可");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放许可
                semaphore.release();
                System.out.println("线程【" + threadName + "】释放许可");
            }
        }
    }
    
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i <= 10; i ++) {
            MyRunnable runnable = new MyRunnable(semaphore);
            Thread thread = new Thread(runnable, "Thread-" + i);
            thread.start();
        }
    }
    
    • 示例2-数据库连接池
    //数据库连接实现
    public class SqlConnectImpl implements Connection{
        
        /*拿一个数据库连接*/
        public static final Connection fetchConnection(){
            return new SqlConnectImpl();
        }
        ...
    }
    
    //一个数据库连接池的实现
    public class DBPoolSemaphore {
        
        private final static int POOL_SIZE = 10;
        //两个指示器,分别表示池子还有可用连接和已用连接
        private final Semaphore useful, useless;
        //存放数据库连接的容器
        private static LinkedList<Connection> pool = new LinkedList<Connection>();
        //初始化池
        static {
            for (int i = 0; i < POOL_SIZE; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
        public DBPoolSemaphore() {
            this.useful = new Semaphore(10);
            this.useless = new Semaphore(0);
        }
        
        /*归还连接*/
        public void returnConnect(Connection connection) throws InterruptedException {
            if(connection!=null) {
                System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!"
                        +"可用连接数:"+useful.availablePermits());
                useless.acquire();
                synchronized (pool) {
                    pool.addLast(connection);
                }
                useful.release();
            }
        }
        
        /*从池子拿连接*/
        public Connection takeConnect() throws InterruptedException {
            useful.acquire();
            Connection connection;
            synchronized (pool) {
                connection = pool.removeFirst();
            }
            useless.release();
            return connection;
        }
        
    }
    
    //测试类
    public class AppTest {
    
        private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
        
        private static class BusiThread extends Thread{
            @Override
            public void run() {
                Random r = new Random();//让每个线程持有连接的时间不一样
                long start = System.currentTimeMillis();
                try {
                    Connection connect = dbPool.takeConnect();
                    System.out.println("Thread_"+Thread.currentThread().getId()
                            +"_获取数据库连接共耗时【"+(System.currentTimeMillis()-start)+"】ms.");
                    SleepTools.ms(100+r.nextInt(100));//模拟业务操作,线程持有连接查询数据
                    System.out.println("查询数据完成,归还连接!");
                    dbPool.returnConnect(connect);
                } catch (InterruptedException e) {
                }
            }
        }
        
        public static void main(String[] args) {
            for (int i = 0; i < 50; i++) {
                Thread thread = new BusiThread();
                thread.start();
            }
        }
    }
    

    五、Exchange

    主要用于两个线程间的数据交换


    public class UseExchange {
        private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>();
    
        public static void main(String[] args) {
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Set<String> setA = new HashSet<String>();//存放数据的容器
                    try {
                        //添加数据
                        setA.add("dataA");
                        setA = exchange.exchange(setA);//交换set
                        /*处理交换后的数据*/
                        for (String str : setA) {
                            System.out.println(Thread.currentThread() + " A :" + str);
                        }
                        System.out.println(Thread.currentThread() + " A :" + exchange);
                    } catch (InterruptedException e) {
                    }
                }
            }).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    Set<String> setB = new HashSet<String>();//存放数据的容器
                    try {
                        //添加数据
                        setB.add("dataB1");
                        setB.add("dataB2");
                        setB = exchange.exchange(setB);//交换set
                        /*处理交换后的数据*/
                        for (String str : setB) {
                            System.out.println(Thread.currentThread() + " B :" + str);
                        }
                        System.out.println(Thread.currentThread() + " B :" + exchange);
                    } catch (InterruptedException e) {
                    }
                }
            }).start();
    
        }
    }
    

    六、Callable、 Future和FutureTask

    public class UseFuture {
        /*实现Callable接口,允许有返回值*/
        private static class UseCallable implements Callable<Integer> {
            private int sum;
    
            @Override
            public Integer call() throws Exception {
                System.out.println("Callable子线程开始计算!");
                Thread.sleep(2000);
                for (int i = 0; i < 5000; i++) {
                    sum = sum + i;
                }
                System.out.println("Callable子线程计算结束!结果为: " + sum);
                return sum;
            }
        }
    
        public static void main(String[] args)
                throws InterruptedException, ExecutionException {
    
            UseCallable useCallable = new UseCallable();
            FutureTask<Integer> futureTask //用FutureTask包装Callable
                    = new FutureTask<>(useCallable);
            new Thread(futureTask).start();//交给Thread去运行
            Random r = new Random();
            Thread.sleep(1000);
            if (r.nextBoolean()) {//用随机的方式决定是获得结果还是终止任务
                System.out.println("Get UseCallable result = " + futureTask.get());
            } else {
                System.out.println("中断计算。  ");
                futureTask.cancel(true);
            }
    
        }
    }
    

    相关文章

      网友评论

          本文标题:Java基础-线程并发工具类

          本文链接:https://www.haomeiwen.com/subject/bsudlltx.html