美文网首页Java 杂谈我爱编程
Java 并发:多线程锁计数器

Java 并发:多线程锁计数器

作者: 凌云_00 | 来源:发表于2018-04-16 15:05 被阅读27次

      在公司一个数据导入的场景中因为需要导入的数据量非常大,在本地导入一次需要十几分钟,估算线上导入的时间会翻倍,为了缩短导入时间,需要使用并发,但是导入完成后需要给用户反馈,而反馈代码又写在主线程中,所以就需要,在线程启动后,主线程挂起,在所有线程完成后,主线程恢复执行。
      问题提出后,心中有了个大致方案,并且在测试后能够顺利执行,但是在TestCase完成后,发现JDK 1.5中就有相似的场景解决方案,所以学习研究了一番,再次记录,并分享给大家

    原有方案

      原有的解决方案是利用以下这几个包来实现的,通过Condition锁和while(true)的等待通知模式,实现了主线程的挂起和恢复

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    

    主线程 挂起机制

      主线程通过while(true) 和 index计数 实现了不停的挂起的操作

            lock.lock();
    
            int index = 0;
    
            while (true) {
                
                // threaNum 线程数量
                if (index < threadNum) {
                    condition.await();
                } else {
                    break;
                }
                
                index++;
            }
    
            lock.unlock();
    
    

    线程 唤醒机制

      在执行的线程最后执行唤醒机制,唤醒主线程,

            lock.lock();
    
            condition.signalAll();
    
            lock.unlock();
    

      通过线程的唤醒,和主线程的挂起操作,主线程不停被唤醒,然后再次挂起,直到最后一个线程唤醒主线程,index = threadNum 主线程跳出循环,继续执行

    JDK 原生方案

      JDK 1.5中 提供了CountDownLatch这个并发工具类,解决了多线程并发,主线程等待最后执行的效果。

    构造器

    // count 为 线程数量
    public CountDownLatch(int count) 
    

    常用方法

    // //调用此方法的线程会被挂起,直到count值为0才继续执行
    public void await();
    
    // 在等待timeOut时间后,如果count值还没到0 立即执行当前线程
    public boolean await(long timeout, TimeUnit unit)
    
    // count减一
    public void countDown()
    

    案例

    public class TestCase {
    
         public static void main(String[] args) {  
             
             // 创建对象,并声明有2个线程需要执行
             final CountDownLatch countDownLatch= new CountDownLatch(2);
     
             new Thread(){
    
                 public void run() {
    
                     try {
                         System.out.println("我是小弟1:"+Thread.currentThread().getName()+"正在执行");
                        Thread.sleep(3000);
                        System.out.println("我是小弟1:"+Thread.currentThread().getName()+"执行完毕");
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                 };
             }.start();
     
             new Thread(){
    
                 public void run() {
    
                     try {
                         System.out.println("我是小弟2:"+Thread.currentThread().getName()+"正在执行");
                         Thread.sleep(3000);
                         System.out.println("我是小弟2:"+Thread.currentThread().getName()+"执行完毕");
                         countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                 };
             }.start();
     
             try {
    
                System.out.println("等待2小弟干活呢...");
    
                countDownLatch.await();
    
                System.out.println("2个小弟干完了");
    
                System.out.println("老大我要继续干活了");
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
         }
    }
    
    执行结果
    我是小弟1:Thread-0正在执行
    我是小弟2:Thread-1正在执行
    
    等待2小弟干活呢...
    
    我是小弟1:Thread-0执行完毕
    我是小弟2:Thread-1执行完毕
    
    2个小弟干完了
    
    老大我要继续干活了
    

      大家可以发现 使用CountDownLatch 类来解决问题,更简洁和方便,不需要在写额外的循环和锁机制

    concurrent中其他有趣和实用的工具类

    CyclicBarrier

      CyclicBarrier翻译回环栅栏,实现的功能是让多个线程运行到某一标志点后挂起,当所有线程都到此标志位后再一起运行,类似起跑线,当所有人都各就位后才能唤醒,开始奔跑。

    应用场景 - 多个线程做任务,等到达集合点同步后交给后面的线程做汇总

    构造器

    // count 指明多少个线程要到达特定标志
    public CyclicBarrier(int count) {}
    
    // barrierAction为当所有线程到特定标志位后执行的内容
    public CyclicBarrier(int count, Runnable barrierAction) {}
    

    方法

    // 挂起当前线程,知道所有线程到达标志位,在执行
    public int await() ;
    
    // 挂起timeOut时间,如果所有线程还未到位,则到位的线程直接继续执行
    public int await(long timeout, TimeUnit unit);
    
    

    案例1-指定线程数

    public class Test {
    
        public static void main(String[] args) {
    
            int N = 4;
            CyclicBarrier barrier  = new CyclicBarrier(N);
    
            for(int i=0;i<N;i++) new Writer(barrier).start();
    
        }
    
        static class Writer extends Thread{
    
            private CyclicBarrier cyclicBarrier;
    
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
     
            @Override
            public void run() {
    
                System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
    
                try {
    
                    Thread.sleep(5000);      //以睡眠来模拟写入数据操作
    
                    System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
    
                    cyclicBarrier.await();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
                    e.printStackTrace();
                }
                System.out.println("所有线程写入完毕,继续处理其他任务...");
    
            }
        }
    }
    
    

    运行结果

    线程Thread-0正在写入数据...
    线程Thread-3正在写入数据...
    线程Thread-2正在写入数据...
    线程Thread-1正在写入数据...
    线程Thread-2写入数据完毕,等待其他线程写入完毕
    线程Thread-0写入数据完毕,等待其他线程写入完毕
    线程Thread-3写入数据完毕,等待其他线程写入完毕
    线程Thread-1写入数据完毕,等待其他线程写入完毕
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    
    案例2-指定执行内容
    public class Test {
    
        public static void main(String[] args) {
    
            int N = 4;
            CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {
    
                @Override
                public void run() {
                    System.out.println("当前线程"+Thread.currentThread().getName());   
                }
    
            });
     
            for(int i=0;i<N;i++) new Writer(barrier).start();
        }
    
        static class Writer extends Thread{
    
            private CyclicBarrier cyclicBarrier;
    
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
     
            @Override
            public void run() {
    
                System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
    
                try {
                    Thread.sleep(5000);      //以睡眠来模拟写入数据操作
    
                    System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
    
                    cyclicBarrier.await();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
                    e.printStackTrace();
                }
    
                System.out.println("所有线程写入完毕,继续处理其他任务...");
    
            }
        }
    }
    

    运行结果

    线程Thread-0正在写入数据...
    线程Thread-1正在写入数据...
    线程Thread-2正在写入数据...
    线程Thread-3正在写入数据...
    
    线程Thread-0写入数据完毕,等待其他线程写入完毕
    线程Thread-1写入数据完毕,等待其他线程写入完毕
    线程Thread-2写入数据完毕,等待其他线程写入完毕
    线程Thread-3写入数据完毕,等待其他线程写入完毕
    
    当前线程Thread-3
    
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    所有线程写入完毕,继续处理其他任务...
    

      从结果可以看出,当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable。

    案例3-CyclicBarrier重用

    public class Test {
    
        public static void main(String[] args) {
    
            int N = 4;
            CyclicBarrier barrier  = new CyclicBarrier(N);
     
            for(int i=0;i<N;i++) {
                new Writer(barrier).start();
            }
     
            try {
                Thread.sleep(25000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
     
            System.out.println("CyclicBarrier重用");
     
            for(int i=0;i<N;i++) {
                new Writer(barrier).start();
            }
    
        }
    
        static class Writer extends Thread{
    
            private CyclicBarrier cyclicBarrier;
    
            public Writer(CyclicBarrier cyclicBarrier) {
                this.cyclicBarrier = cyclicBarrier;
            }
     
            @Override
            public void run() {
    
                System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
    
                try {
    
                    Thread.sleep(5000);      //以睡眠来模拟写入数据操作
    
                    System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
     
                    cyclicBarrier.await();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch(BrokenBarrierException e){
                    e.printStackTrace();
                }
    
                System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");
    
            }
        }
    }
    

    运行结果

    线程Thread-0正在写入数据...
    线程Thread-1正在写入数据...
    线程Thread-3正在写入数据...
    线程Thread-2正在写入数据...
    
    线程Thread-1写入数据完毕,等待其他线程写入完毕
    线程Thread-3写入数据完毕,等待其他线程写入完毕
    线程Thread-2写入数据完毕,等待其他线程写入完毕
    线程Thread-0写入数据完毕,等待其他线程写入完毕
    
    Thread-0所有线程写入完毕,继续处理其他任务...
    Thread-3所有线程写入完毕,继续处理其他任务...
    Thread-1所有线程写入完毕,继续处理其他任务...
    Thread-2所有线程写入完毕,继续处理其他任务...
    
    CyclicBarrier重用
    
    线程Thread-4正在写入数据...
    线程Thread-5正在写入数据...
    线程Thread-6正在写入数据...
    线程Thread-7正在写入数据...
    
    线程Thread-7写入数据完毕,等待其他线程写入完毕
    线程Thread-5写入数据完毕,等待其他线程写入完毕
    线程Thread-6写入数据完毕,等待其他线程写入完毕
    线程Thread-4写入数据完毕,等待其他线程写入完毕
    
    Thread-4所有线程写入完毕,继续处理其他任务...
    Thread-5所有线程写入完毕,继续处理其他任务...
    Thread-6所有线程写入完毕,继续处理其他任务...
    Thread-7所有线程写入完毕,继续处理其他任务...
    

      从执行结果可以看出,在初次的4个线程越过barrier状态后,又可以用来进行新一轮的使用。而CountDownLatch无法进行重复使用。

    Semaphore

      Semaphore翻译成字面意思为 信号量,信号量就是可以声明多把锁(包括一把锁:此时为互斥信号量)。
      举个例子:一个房间如果只能容纳5个人,多出来的人必须在门外面等着。如何去做呢?一个解决办法就是:房间外面挂着五把钥匙,每进去一个人就取走一把钥匙,没有钥匙的不能进入该房间而是在外面等待。每出来一个人就把钥匙放回原处以方便别人再次进入。

    应用场景 - 流量控制,即控制能够访问的最大线程数。

    构造器

    //参数permits表示许可数目,即同时可以允许多少线程进行访问
    public Semaphore(int permits) {}
    
    //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可,默认是非公平的
    public Semaphore(int permits, boolean fair) {}
    

    常用方法

    //获取一个许可,若无许可能够获得,则会一直等待,直到获得许可
    public void acquire();
    
    //获取x个许可
    public void acquire(int x);
    
    //释放一个许可,注意,在释放许可之前,必须先获获得许可。
    public void release() ; 
    
    //释放x个许可
    public void release(int x) ;  
    
    
    以上4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法
    //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire()   
    
    //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
    public boolean tryAcquire(long timeout, TimeUnit unit)
    
    //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
    public boolean tryAcquire(int permits) 
    
    //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit) 
    

      另外还可以通过availablePermits()方法得到可用的许可数目。

    案例

      假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现

    public class Test {
    
        public static void main(String[] args) {
    
            int N = 8;            //工人数
            Semaphore semaphore = new Semaphore(5); //机器数目
    
            for(int i=0;i<N;i++) new Worker(i,semaphore).start();
        }
     
        static class Worker extends Thread{
    
            private int num;
            private Semaphore semaphore;
    
            public Worker(int num,Semaphore semaphore){
                this.num = num;
                this.semaphore = semaphore;
            }
     
            @Override
            public void run() {
    
                try {
    
                    semaphore.acquire();
    
                    System.out.println("工人"+this.num+"占用一个机器在生产...");
    
                    Thread.sleep(2000);
    
                    System.out.println("工人"+this.num+"释放出机器");
    
                    semaphore.release();       
        
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
        }
    }
    

    运行结果

    工人0占用一个机器在生产...
    工人1占用一个机器在生产...
    工人2占用一个机器在生产...
    工人4占用一个机器在生产...
    工人5占用一个机器在生产...
    
    工人0释放出机器
    工人2释放出机器
    
    工人3占用一个机器在生产...
    工人7占用一个机器在生产...
    
    工人4释放出机器
    工人5释放出机器
    工人1释放出机器
    
    工人6占用一个机器在生产...
    
    工人3释放出机器
    工人7释放出机器
    工人6释放出机器
    

    1. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
      - CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
      - 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
      - CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
    
    2. Semaphore和锁类似,一般用于控制对某组资源的访问权限。

    Phaser

      Phaser是更加复杂和强大的同步辅助类。它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时(CyclicBarrier是分成两步)就可以选择使用Phaser。
      Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
      跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。

    Phaser状态

      -  活跃态:当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。
      -  终止态:当所有的线程都取消注册的时候,Phaser就处于终止态,此时Phaser没有任何参与者。
    

    常用方法

    // 类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行
    arriveAndAwaitAdvance()
    
    // 把执行到此的线程从Phaser中注销掉
    arriveAndDeregister()
    
    // 判断Phaser是否终止
    isTerminated()
    
    // 将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程
    register()
    
    // 强制Phaser进入终止态
    forceTermination()
    

    案例

      使用Phaser类同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展为为.log的文件。这个任务分成以下三个步骤:
    1. 在执行的文件夹及其子文件夹中获取扩展名为.log的文件
    2. 对每一步的结果进行过滤,删除修改时间超过24小时的文件
    3. 将结果打印到控制台

      在第一步和第二步结束的时候,都会检查所查找到的结果列表是不是有元素存在。如果结果列表是空的,对应的线程将结束执行,并从Phaser中删除。(也就是动态减少任务数)

    文件查找类
    public class FileSearch implements Runnable {
        private String initPath;
    
        private String end;
        
        private List<String> results;
    
        private Phaser phaser;
    
        public FileSearch(String initPath, String end, Phaser phaser) {
            this.initPath = initPath;
            this.end = end;
            this.phaser=phaser;
            results=new ArrayList<>();
        }
        @Override
        public void run() {
    
            phaser.arriveAndAwaitAdvance();//等待所有的线程创建完成,确保在进行文件查找的时候所有的线程都已经创建完成了
            
            System.out.printf("%s: Starting.\n",Thread.currentThread().getName());
            
            // 1st Phase: 查找文件
            File file = new File(initPath);
            if (file.isDirectory()) {
                directoryProcess(file);
            }
            
            // 如果查找结果为false,那么就把该线程从Phaser中移除掉并且结束该线程的运行
            if (!checkResults()){
                return;
            }
            
            // 2nd Phase: 过滤结果,过滤出符合条件的(一天内的)结果集
            filterResults();
            
            // 如果过滤结果集结果是空的,那么把该线程从Phaser中移除,不让它进入下一阶段的执行
            if (!checkResults()){
                return;
            }
            
            // 3rd Phase: 显示结果
            showInfo();
            phaser.arriveAndDeregister();//任务完成,注销掉所有的线程
            System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
        }
        private void showInfo() {
            for (int i=0; i<results.size(); i++){
                File file=new File(results.get(i));
                System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
            }
            // Waits for the end of all the FileSearch threads that are registered in the phaser
            phaser.arriveAndAwaitAdvance();
        }
        private boolean checkResults() {
            if (results.isEmpty()) {
                System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
                System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
                //结果为空,Phaser完成并把该线程从Phaser中移除掉
                phaser.arriveAndDeregister();
                return false;
            } else {
                // 等待所有线程查找完成
                System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
                phaser.arriveAndAwaitAdvance();
                return true;
            }        
        }
        private void filterResults() {
            List<String> newResults=new ArrayList<>();
            long actualDate=new Date().getTime();
            for (int i=0; i<results.size(); i++){
                File file=new File(results.get(i));
                long fileDate=file.lastModified();
                
                if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
                    newResults.add(results.get(i));
                }
            }
            results=newResults;
        }
        private void directoryProcess(File file) {
            // Get the content of the directory
            File list[] = file.listFiles();
            if (list != null) {
                for (int i = 0; i < list.length; i++) {
                    if (list[i].isDirectory()) {
                        // If is a directory, process it
                        directoryProcess(list[i]);
                    } else {
                        // If is a file, process it
                        fileProcess(list[i]);
                    }
                }
            }
        }
        private void fileProcess(File file) {
            if (file.getName().endsWith(end)) {
                results.add(file.getAbsolutePath());
            }
        }
    }
    
    主函数
    public static void main(String[] args) {
            Phaser phaser = new Phaser(3);
    
            FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
            FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
            FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);
    
            Thread systemThread = new Thread(system, "System");
            systemThread.start();
            Thread appsThread = new Thread(apps, "Apps");
            appsThread.start();        
            Thread documentsThread = new Thread(documents, "Documents");
            documentsThread.start();
            try {
                systemThread.join();
                appsThread.join();
                documentsThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("Terminated: %s\n", phaser.isTerminated());
        }
    

      例子中Phaser分了三个步骤:查找文件、过滤文件、打印结果。并且在查找文件和过滤文件结束后对结果进行分析,如果是空的,将此线程从Phaser中注销掉。也就是说,下一阶段,该线程将不参与运行。

      在run()方法中,开头调用了phaser的arriveAndAwaitAdvance()方法来保证所有线程都启动了之后再开始查找文件。在查找文件和过滤文件阶段结束之后,都对结果进行了处理。即:如果结果是空的,那么就把该条线程移除,如果不空,那么等待该阶段所有线程都执行完该步骤之后在统一执行下一步。最后,任务执行完后,把Phaser中的线程均注销掉。

      Phaser其实有两个状态:活跃态和终止态。当存在参与同步的线程时,Phaser就是活跃的。并且在每个阶段结束的时候同步。当所有参与同步的线程都取消注册的时候,Phase就处于终止状态。在这种状态下,Phaser没有任务参与者。

      Phaser主要功能就是执行多阶段任务,并保证每个阶段点的线程同步。在每个阶段点还可以条件或者移除参与者。主要涉及方法arriveAndAwaitAdvance()和register()和arriveAndDeregister()

    相关文章

      网友评论

      本文标题:Java 并发:多线程锁计数器

      本文链接:https://www.haomeiwen.com/subject/advskftx.html