美文网首页Android开发
线程串行并行调度实现

线程串行并行调度实现

作者: AnonyPer | 来源:发表于2019-10-17 19:21 被阅读0次

    线程串行并行调度实现

    问题描述

    问题描述:线程A、B、C并行执行,然后和线程D串行执行,如何实现。

    问题具体化:现在有A、B、C三个线程,每一个线程分别完成打印0~4的任务,有一个线程D,在A、B、C完成打印之后打印“Hello world!”。

    解决思路

    我们都知道线程就是用来实现并发执行的,而要实现的结果是在A、B、C三个线程并行执行完之后才执行线程D的任务,那么就需要获取等到A、B、C三个线程的结果,之后再调用线程D的执行方法,就需要想办法让线程D暂不执行,等待A、B、C三个线程执行完才执行,根据这个思路,我们可以想到的解决方案方法如下:

    • 通过一些flag变量控制判断A、B、C三个线程是否执行完

    这是一个理解起来最简单也是最原始的方法,通过不断循环判断A、B、C三个线程对应的标识符是否完成执行来决定是否该执行线程D。

    • 在线程D执行前阻塞线程,等待A、B、C三个线程执行完在执行

    这个思路和上面差不多,只不过一个是循环判断,一个是阻塞线程。阻塞线程的方法有很多,sleep了,上锁了之类的,但是我们除了阻塞线程外,还需要有一个时机去唤醒阻塞,这个时机的触发点就是A、B、C三个线程执行完,综合考虑,能满足这样条件的阻塞方式有以下几种:

    1、thread.join() 阻塞等待当前线程执行完

    2、利用FeatureTask实现的线程,通过get()方法可以阻塞等待线程结果返回

    3、CountDownLatch,通过闭锁计数器的方式,通过await方法阻塞线程,在A、B、C三个线程执行完之后减少计数,唤醒阻塞

    4、CyclicBarrier,栅栏原理,通过其await方法阻塞线程,在A、B、C三个线程执行完之后触发到达栅栏数,唤醒阻塞

    以上就是能想到的实现方式,接下来一个一个用代码的方式实现并简要介绍其原理和特性。

    解决方案及原理分析

    通过flag变量控制

    • 代码实现
    //线程类
    class MyRunnable implements Runnable {
        Flag flag;//变量
        CountDownLatch countDownLatch;//闭锁
        CyclicBarrier cyclicBarrier;//栅栏
    
        /**
         * 变量控制的方法
         *
         * @param flag
         */
        public MyRunnable(Flag flag) {
            this.flag = flag;
        }
    
        /**
         * 通过其他阻塞的方法时调用
         */
        public MyRunnable() {
    
        }
        /**
         * 通过CountDownLatch的方式
         */
        public MyRunnable(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        /**
         * 通过CyclicBarrier的方式
         */
        public MyRunnable(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + i);
            }
            if (countDownLatch != null) {//通过闭锁方式
                countDownLatch.countDown();
            }
            if (cyclicBarrier != null) {//通过栅栏方式
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
            if (flag != null) {//通过flag变量方式
                flag.setFlag(true);
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + flag.isFlag());
            }
        }
    }
    
    class Flag {
        boolean flag = false;
    
        public void setFlag(boolean flag) {
            this.flag = flag;
        }
    
        public boolean isFlag() {
            return flag;
        }
    }
    
    /**
     * 线程调度方法1 flag变量控制
     * Created by anonyper on 2019/10/17.
     */
    public class ThreadTest {
        Flag flagA = new Flag();//考虑一下这里为什么用一个对象包装一个boolean而不是直接用boolean对象来传递呢
        Flag flagB = new Flag();
        Flag flagC = new Flag();
    
        public void testThread() {
            Thread threadA = new Thread(new MyRunnable(flagA), "A");
            Thread threadB = new Thread(new MyRunnable(flagB), "B");
            Thread threadC = new Thread(new MyRunnable(flagC), "C");
            Thread threadD = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
                }
            }, "D");
            threadA.start();
            threadB.start();
            threadC.start();
            while (!flagA.isFlag() || !flagB.isFlag() || !flagC.isFlag()) {
                Thread.yield();//释放CPU资源
                //this.wait(0L); //这样也可以等待 释放CPU资源
                //sleep(0L);//这样也可以等待 释放CPU资源
            }
            threadD.start();
        }
    
        public static void main(String[] args) {
            new ThreadTest().testThread();
        }
    }
    
    //执行结果:
    当前线程:A >> 0
    当前线程:B >> 0
    当前线程:C >> 0
    当前线程:A >> 1
    当前线程:A >> 2
    当前线程:B >> 1
    当前线程:C >> 1
    当前线程:B >> 2
    当前线程:A >> 3
    当前线程:B >> 3
    当前线程:A >> 4
    当前线程:A >> true
    当前线程:B >> 4
    当前线程:B >> true
    当前线程:C >> 2
    当前线程:C >> 3
    当前线程:C >> 4
    当前线程:C >> true
    当前线程:D >> Hello World!
    
    • 原理分析

    通过wihle循环以及变量控制,让当前线程等待A、B、C三个线程执行完之后在执行D线程。这种方法是理解简单,但是实现挺麻烦的,线程越多要控制的变量就越多,非常不便。

    注意实现Runnable中参数传递,这里涉及到值传递和对象传递的知识点。

    Thread.join方法

    • 代码实现
    /**
     * 线程调度方法1 flag变量控制
     * Created by anonyper on 2019/10/17.
     */
    public class ThreadTest {
    
        public void testThread() {
            Thread threadA = new Thread(new MyRunnable(), "A");
            Thread threadB = new Thread(new MyRunnable(), "B");
            Thread threadC = new Thread(new MyRunnable(), "C");
            Thread threadD = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
                }
            }, "D");
            threadA.start();
            threadB.start();
            threadC.start();
            try {
                threadA.join();//阻塞等待线程A运行完
                threadB.join();//阻塞等待线程B运行完
                threadC.join();//阻塞等待线程C运行完
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            threadD.start();
        }
    
        public static void main(String[] args) {
            new ThreadTest().testThread();
        }
    }
    
    //执行结果
    当前线程:A >> 0
    当前线程:C >> 0
    当前线程:B >> 0
    当前线程:B >> 1
    当前线程:A >> 1
    当前线程:C >> 1
    当前线程:B >> 2
    当前线程:C >> 2
    当前线程:A >> 2
    当前线程:C >> 3
    当前线程:B >> 3
    当前线程:C >> 4
    当前线程:A >> 3
    当前线程:A >> 4
    当前线程:B >> 4
    当前线程:D >> Hello World!
    
    • 原理分析

    线程的join方法会阻塞等待当前线程执行完成,其源代码如下:

    public final void join() throws InterruptedException {
        this.join(0L);
    }
    public final synchronized void join(long var1) throws InterruptedException {
            long var3 = System.currentTimeMillis();//当前时间
            long var5 = 0L;
            if (var1 < 0L) {//等待时间不能小于0
                throw new IllegalArgumentException("timeout value is negative");
            } else {
                if (var1 == 0L) {//如果为零,则等待执行完成
                    while(this.isAlive()) {//判断线程是否激活,涉及到线程状态,执行完之后返回false
                        this.wait(0L);//等待0秒后唤醒
                    }
                } else {//等待一定时间后判断是否执行完
                    while(this.isAlive()) {
                        long var7 = var1 - var5;
                        if (var7 <= 0L) {
                            break;
                        }
    
                        this.wait(var7);
                        var5 = System.currentTimeMillis() - var3;
                    }
                }
    
            }
        }
    

    我们看到join方法如果不传时间则默认等待线程执行完,但是其过程可能会被Interrupted,所以使用该方法需要针对异常做好判断。

    FeatureTask的get方法

    • 代码实现
    /**
     * 实现Callable接口,重写call方法
     */
    class MyCall implements Callable<Boolean> {
        @Override
        public Boolean call() {
            for (int i = 0; i < 5; i++) {
                try {
                    Thread.sleep(new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + i);
            }
            return true;
        }
    }
    
    /**
     * 线程调度方法1 flag变量控制
     * Created by anonyper on 2019/10/17.
     */
    public class ThreadTest {
    
        public void testThread() {
            FutureTask futureTaskA = new FutureTask(new MyCall());
            FutureTask futureTaskB = new FutureTask(new MyCall());
            FutureTask futureTaskC = new FutureTask(new MyCall());
            Thread threadA = new Thread(futureTaskA, "A");
            Thread threadB = new Thread(futureTaskB, "B");
            Thread threadC = new Thread(futureTaskC, "C");
            Thread threadD = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
                }
            }, "D");
            threadA.start();
            threadB.start();
            threadC.start();
            try {
                futureTaskA.get();
                futureTaskB.get();
                futureTaskC.get();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            threadD.start();
        }
    
        public static void main(String[] args) {
            new ThreadTest().testThread();
        }
    }
    
    //运行结果
    当前线程:A >> 0
    当前线程:C >> 0
    当前线程:B >> 0
    当前线程:A >> 1
    当前线程:A >> 2
    当前线程:C >> 1
    当前线程:A >> 3
    当前线程:B >> 1
    当前线程:C >> 2
    当前线程:C >> 3
    当前线程:B >> 2
    当前线程:A >> 4
    当前线程:B >> 3
    当前线程:C >> 4
    当前线程:B >> 4
    当前线程:D >> Hello World!
    
    • 原理分析

    1、Callable接口

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    2、Future接口

    public interface Future<V> {
        boolean cancel(boolean var1);//取消
    
        boolean isCancelled();//是否取消
    
        boolean isDone();//是否完成
    
        V get() throws InterruptedException, ExecutionException;//返回结果
    
        V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;//等待具体时间内返回结果
    }
    

    2、RunnableFuture接口实现了Runnable、Future接口

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    

    3、FutureTask实现了RunnableFuture接口,构造函数接受一个Callable对象

    public class FutureTask<V> implements RunnableFuture<V> {
        private Callable<V> callable;
        public FutureTask(Callable<V> var1) {
            if (var1 == null) {
                throw new NullPointerException();
            } else {
                this.callable = var1;
                this.state = 0;
            }
        }
        public void run() {
          ...
          Callable var1 = this.callable;
          var2 = var1.call;
          ...
        }
            public V get() throws InterruptedException, ExecutionException {//get方法
            int var1 = this.state;
            if (var1 <= COMPLETING) {//状态未完成时等待完成
                var1 = this.awaitDone(false, 0L);
            }
    
            return this.report(var1);//完成后返回结果
        }
        private int awaitDone(boolean var1, long var2) throws InterruptedException {
            long var4 = var1 ? System.nanoTime() + var2 : 0L;
            FutureTask.WaitNode var6 = null;
            boolean var7 = false;
    
            while(!Thread.interrupted()) {
                int var8 = this.state;
                if (var8 > COMPLETING) {//完成或被打断,返回结果
                    if (var6 != null) {
                        var6.thread = null;
                    }
                    return var8;
                }
    
                if (var8 == COMPLETING) {//快要完成了,等待一会
                    Thread.yield();
                } else if (var6 == null) {
                    var6 = new FutureTask.WaitNode();
                } else if (!var7) {
                    var7 = UNSAFE.compareAndSwapObject(this, waitersOffset, var6.next = this.waiters, var6);
                } else if (var1) {//有等待时间限制
                    var2 = var4 - System.nanoTime();
                    if (var2 <= 0L) {
                        this.removeWaiter(var6);
                        return this.state;
                    }
    
                    LockSupport.parkNanos(this, var2);
                } else {
                    LockSupport.park(this);//通过LockSupport.park方法阻塞
                }
            }
                    //最后没有等待结果时被唤醒(LockSupport.uppark())抛出打断异常
            this.removeWaiter(var6);
            throw new InterruptedException();
        }
    
    

    4、FutureTask对象作为runnable实现类,传入Thread中,调用run方法调用的是callable的call方法,get方法在call方法没有返回时阻塞线程,等待结果返回。

    该方法学会了理解起来也简单,但是相对写的类比较多,如果比较在意线程结果返回值来做判断条件的话,可以使用。

    CountDownLatch实现

    • 代码实现
    //MyRunnable的代码
            /**
         * 通过CountDownLatch的方式
         */
        public MyRunnable(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
            @Override
        public void run() {
            ...
            if (countDownLatch != null) {//完成后减1
                countDownLatch.countDown();
            }
            ...
        }
    
    public class ThreadTest {
    
        public void testThread() {
            CountDownLatch countDownLatch = new CountDownLatch(3);//闭锁的数量 三个线程
            //MyRunnable的实现见第一种方式代码,此处不在重复
            Thread threadA = new Thread(new MyRunnable(countDownLatch), "A");
            Thread threadB = new Thread(new MyRunnable(countDownLatch), "B");
            Thread threadC = new Thread(new MyRunnable(countDownLatch), "C");
            Thread threadD = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
                }
            }, "D");
            threadA.start();
            threadB.start();
            threadC.start();
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            threadD.start();
        }
    
        public static void main(String[] args) {
            new ThreadTest().testThread();
        }
    }
    
    //执行结果
    当前线程:A >> 0
    当前线程:B >> 0
    当前线程:C >> 0
    当前线程:A >> 1
    当前线程:B >> 1
    当前线程:A >> 2
    当前线程:A >> 3
    当前线程:C >> 1
    当前线程:B >> 2
    当前线程:A >> 4
    当前线程:C >> 2
    当前线程:B >> 3
    当前线程:C >> 3
    当前线程:B >> 4
    当前线程:C >> 4
    当前线程:D >> Hello World!
    
    • 原理分析

    CountDownLatch的核心方法有三个:

    1、构造方式传入一个计数数量

    private final CountDownLatch.Sync sync;
    
    public CountDownLatch(int var1) {
        if (var1 < 0) {
            throw new IllegalArgumentException("count < 0");
        } else {
            this.sync = new CountDownLatch.Sync(var1);
        }
    }
    

    2、countDown计数减1

    public void countDown() {
        this.sync.releaseShared(1);
    }
    

    3、await阻塞等待,还有一个方法是await(long var1, TimeUnit var3),等待具体的时间

    public void await() throws InterruptedException {
        this.sync.acquireSharedInterruptibly(1);
    }
    
    public boolean await(long var1, TimeUnit var3) throws InterruptedException {
        return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1));
    }
    

    这三个方法都关联一个类:Sync,该类继承AbstractQueuedSynchronizer类,简称AQS,AQS是用来构建锁或者其他同步组件(信号量、事件等)的基础框架类,JDK中许多并发工具类的内部实现都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch等等,具体源码此处不做分析。

    CountDownLatch简单理解就是await方法阻塞线程,在等待计数为0时唤醒等待,减少计数的方法可以是在不同的线程中,也可以在同一个线程中。使用也比较简单,理解也容易。

    CyclicBarrier实现

    • 代码实现
    //MyRunnable中的代码
    /**
      * 通过CyclicBarrier的方式
      */
    public MyRunnable(CyclicBarrier cyclicBarrier) {
      this.cyclicBarrier = cyclicBarrier;
    }
    
    
    @Override
    public void run() {
      ...
        if (cyclicBarrier != null) {//通过栅栏方式
        try {
            cyclicBarrier.await();
          } catch (InterruptedException e) {
            e.printStackTrace();
          } catch (BrokenBarrierException e) {
            e.printStackTrace();
          }
        }
      ...
    }
    
    public class ThreadTest {
    
        public void testThread() {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(4);//这个地方是4,不是3
            Thread threadA = new Thread(new MyRunnable(cyclicBarrier), "A");
            Thread threadB = new Thread(new MyRunnable(cyclicBarrier), "B");
            Thread threadC = new Thread(new MyRunnable(cyclicBarrier), "C");
            Thread threadD = new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
                }
            }, "D");
            threadA.start();
            threadB.start();
            threadC.start();
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            threadD.start();
        }
    
        public static void main(String[] args) {
            new ThreadTest().testThread();
        }
    }
    
    //执行结果
    当前线程:B >> 0
    当前线程:B >> 1
    当前线程:C >> 0
    当前线程:B >> 2
    当前线程:C >> 1
    当前线程:B >> 3
    当前线程:B >> 4
    当前线程:A >> 0
    当前线程:C >> 2
    当前线程:C >> 3
    当前线程:A >> 1
    当前线程:C >> 4
    当前线程:A >> 2
    当前线程:A >> 3
    当前线程:A >> 4
    当前线程:D >> Hello World!
    
    • 原理分析

    CyclicBarrier是通过ReentrantLock锁来实现的,具体源码就不分析了,其里面也有几个重要的方法:

    1、构造方法

    public CyclicBarrier(int var1){//传入等待条件数
      ...
    }
    
    

    2、等待方法,当wait方法调用次数达到设定的次数之后,统一唤醒所有等待地方。

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return this.dowait(false, 0L);
        } catch (TimeoutException var2) {
            throw new Error(var2);
        }
    }
    

    CyclicBarrier和CountDownLatch有点类似,但是也有点不一样,用在跑步比赛上的区别就是:

    1、CountDownLatch 计数减到0之后,阻塞的地方就可以开始跑了(阻塞了一个地方)

    2、CyclicBarrier 当有运动员没有准备好(调用await方法)时,其他的运动员都等着(await阻塞),只有都准备好了再开始跑。

    3、CountDownLatch计数只能用一次,CyclicBarrier可以循环使用。

    所以用CyclicBarrier的实现思路,相当于让A、B、C三个线程阻塞在最后一步,然后线程D就绪,A、B、C三个线程收个尾之后线程D开始运行,严格意义上并不是A、B、C三个线程和D串行,在理论上实现了串行。

    总结

    以上就是我能想到的实现A、B、C三个线程并发然后和线程D串行执行的几种方法,代码都以贴出。如果有其他方法,欢迎提出补充。

    相关文章

      网友评论

        本文标题:线程串行并行调度实现

        本文链接:https://www.haomeiwen.com/subject/thgvmctx.html