美文网首页Android开发
线程串行并行调度实现

线程串行并行调度实现

作者: AnonyPer | 来源:发表于2019-10-17 19:21 被阅读0次

线程串行并行调度实现

问题描述

问题描述:线程A、B、C并行执行,然后和线程D串行执行,如何实现。

问题具体化:现在有A、B、C三个线程,每一个线程分别完成打印0~4的任务,有一个线程D,在A、B、C完成打印之后打印“Hello world!”。

解决思路

我们都知道线程就是用来实现并发执行的,而要实现的结果是在A、B、C三个线程并行执行完之后才执行线程D的任务,那么就需要获取等到A、B、C三个线程的结果,之后再调用线程D的执行方法,就需要想办法让线程D暂不执行,等待A、B、C三个线程执行完才执行,根据这个思路,我们可以想到的解决方案方法如下:

  • 通过一些flag变量控制判断A、B、C三个线程是否执行完

这是一个理解起来最简单也是最原始的方法,通过不断循环判断A、B、C三个线程对应的标识符是否完成执行来决定是否该执行线程D。

  • 在线程D执行前阻塞线程,等待A、B、C三个线程执行完在执行

这个思路和上面差不多,只不过一个是循环判断,一个是阻塞线程。阻塞线程的方法有很多,sleep了,上锁了之类的,但是我们除了阻塞线程外,还需要有一个时机去唤醒阻塞,这个时机的触发点就是A、B、C三个线程执行完,综合考虑,能满足这样条件的阻塞方式有以下几种:

1、thread.join() 阻塞等待当前线程执行完

2、利用FeatureTask实现的线程,通过get()方法可以阻塞等待线程结果返回

3、CountDownLatch,通过闭锁计数器的方式,通过await方法阻塞线程,在A、B、C三个线程执行完之后减少计数,唤醒阻塞

4、CyclicBarrier,栅栏原理,通过其await方法阻塞线程,在A、B、C三个线程执行完之后触发到达栅栏数,唤醒阻塞

以上就是能想到的实现方式,接下来一个一个用代码的方式实现并简要介绍其原理和特性。

解决方案及原理分析

通过flag变量控制

  • 代码实现
//线程类
class MyRunnable implements Runnable {
    Flag flag;//变量
    CountDownLatch countDownLatch;//闭锁
    CyclicBarrier cyclicBarrier;//栅栏

    /**
     * 变量控制的方法
     *
     * @param flag
     */
    public MyRunnable(Flag flag) {
        this.flag = flag;
    }

    /**
     * 通过其他阻塞的方法时调用
     */
    public MyRunnable() {

    }
    /**
     * 通过CountDownLatch的方式
     */
    public MyRunnable(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    /**
     * 通过CyclicBarrier的方式
     */
    public MyRunnable(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + i);
        }
        if (countDownLatch != null) {//通过闭锁方式
            countDownLatch.countDown();
        }
        if (cyclicBarrier != null) {//通过栅栏方式
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
        if (flag != null) {//通过flag变量方式
            flag.setFlag(true);
            System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + flag.isFlag());
        }
    }
}

class Flag {
    boolean flag = false;

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    public boolean isFlag() {
        return flag;
    }
}

/**
 * 线程调度方法1 flag变量控制
 * Created by anonyper on 2019/10/17.
 */
public class ThreadTest {
    Flag flagA = new Flag();//考虑一下这里为什么用一个对象包装一个boolean而不是直接用boolean对象来传递呢
    Flag flagB = new Flag();
    Flag flagC = new Flag();

    public void testThread() {
        Thread threadA = new Thread(new MyRunnable(flagA), "A");
        Thread threadB = new Thread(new MyRunnable(flagB), "B");
        Thread threadC = new Thread(new MyRunnable(flagC), "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        while (!flagA.isFlag() || !flagB.isFlag() || !flagC.isFlag()) {
            Thread.yield();//释放CPU资源
            //this.wait(0L); //这样也可以等待 释放CPU资源
            //sleep(0L);//这样也可以等待 释放CPU资源
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//执行结果:
当前线程:A >> 0
当前线程:B >> 0
当前线程:C >> 0
当前线程:A >> 1
当前线程:A >> 2
当前线程:B >> 1
当前线程:C >> 1
当前线程:B >> 2
当前线程:A >> 3
当前线程:B >> 3
当前线程:A >> 4
当前线程:A >> true
当前线程:B >> 4
当前线程:B >> true
当前线程:C >> 2
当前线程:C >> 3
当前线程:C >> 4
当前线程:C >> true
当前线程:D >> Hello World!
  • 原理分析

通过wihle循环以及变量控制,让当前线程等待A、B、C三个线程执行完之后在执行D线程。这种方法是理解简单,但是实现挺麻烦的,线程越多要控制的变量就越多,非常不便。

注意实现Runnable中参数传递,这里涉及到值传递和对象传递的知识点。

Thread.join方法

  • 代码实现
/**
 * 线程调度方法1 flag变量控制
 * Created by anonyper on 2019/10/17.
 */
public class ThreadTest {

    public void testThread() {
        Thread threadA = new Thread(new MyRunnable(), "A");
        Thread threadB = new Thread(new MyRunnable(), "B");
        Thread threadC = new Thread(new MyRunnable(), "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        try {
            threadA.join();//阻塞等待线程A运行完
            threadB.join();//阻塞等待线程B运行完
            threadC.join();//阻塞等待线程C运行完
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//执行结果
当前线程:A >> 0
当前线程:C >> 0
当前线程:B >> 0
当前线程:B >> 1
当前线程:A >> 1
当前线程:C >> 1
当前线程:B >> 2
当前线程:C >> 2
当前线程:A >> 2
当前线程:C >> 3
当前线程:B >> 3
当前线程:C >> 4
当前线程:A >> 3
当前线程:A >> 4
当前线程:B >> 4
当前线程:D >> Hello World!
  • 原理分析

线程的join方法会阻塞等待当前线程执行完成,其源代码如下:

public final void join() throws InterruptedException {
    this.join(0L);
}
public final synchronized void join(long var1) throws InterruptedException {
        long var3 = System.currentTimeMillis();//当前时间
        long var5 = 0L;
        if (var1 < 0L) {//等待时间不能小于0
            throw new IllegalArgumentException("timeout value is negative");
        } else {
            if (var1 == 0L) {//如果为零,则等待执行完成
                while(this.isAlive()) {//判断线程是否激活,涉及到线程状态,执行完之后返回false
                    this.wait(0L);//等待0秒后唤醒
                }
            } else {//等待一定时间后判断是否执行完
                while(this.isAlive()) {
                    long var7 = var1 - var5;
                    if (var7 <= 0L) {
                        break;
                    }

                    this.wait(var7);
                    var5 = System.currentTimeMillis() - var3;
                }
            }

        }
    }

我们看到join方法如果不传时间则默认等待线程执行完,但是其过程可能会被Interrupted,所以使用该方法需要针对异常做好判断。

FeatureTask的get方法

  • 代码实现
/**
 * 实现Callable接口,重写call方法
 */
class MyCall implements Callable<Boolean> {
    @Override
    public Boolean call() {
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + i);
        }
        return true;
    }
}

/**
 * 线程调度方法1 flag变量控制
 * Created by anonyper on 2019/10/17.
 */
public class ThreadTest {

    public void testThread() {
        FutureTask futureTaskA = new FutureTask(new MyCall());
        FutureTask futureTaskB = new FutureTask(new MyCall());
        FutureTask futureTaskC = new FutureTask(new MyCall());
        Thread threadA = new Thread(futureTaskA, "A");
        Thread threadB = new Thread(futureTaskB, "B");
        Thread threadC = new Thread(futureTaskC, "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        try {
            futureTaskA.get();
            futureTaskB.get();
            futureTaskC.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//运行结果
当前线程:A >> 0
当前线程:C >> 0
当前线程:B >> 0
当前线程:A >> 1
当前线程:A >> 2
当前线程:C >> 1
当前线程:A >> 3
当前线程:B >> 1
当前线程:C >> 2
当前线程:C >> 3
当前线程:B >> 2
当前线程:A >> 4
当前线程:B >> 3
当前线程:C >> 4
当前线程:B >> 4
当前线程:D >> Hello World!
  • 原理分析

1、Callable接口

public interface Callable<V> {
    V call() throws Exception;
}

2、Future接口

public interface Future<V> {
    boolean cancel(boolean var1);//取消

    boolean isCancelled();//是否取消

    boolean isDone();//是否完成

    V get() throws InterruptedException, ExecutionException;//返回结果

    V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;//等待具体时间内返回结果
}

2、RunnableFuture接口实现了Runnable、Future接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

3、FutureTask实现了RunnableFuture接口,构造函数接受一个Callable对象

public class FutureTask<V> implements RunnableFuture<V> {
    private Callable<V> callable;
    public FutureTask(Callable<V> var1) {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            this.callable = var1;
            this.state = 0;
        }
    }
    public void run() {
      ...
      Callable var1 = this.callable;
      var2 = var1.call;
      ...
    }
        public V get() throws InterruptedException, ExecutionException {//get方法
        int var1 = this.state;
        if (var1 <= COMPLETING) {//状态未完成时等待完成
            var1 = this.awaitDone(false, 0L);
        }

        return this.report(var1);//完成后返回结果
    }
    private int awaitDone(boolean var1, long var2) throws InterruptedException {
        long var4 = var1 ? System.nanoTime() + var2 : 0L;
        FutureTask.WaitNode var6 = null;
        boolean var7 = false;

        while(!Thread.interrupted()) {
            int var8 = this.state;
            if (var8 > COMPLETING) {//完成或被打断,返回结果
                if (var6 != null) {
                    var6.thread = null;
                }
                return var8;
            }

            if (var8 == COMPLETING) {//快要完成了,等待一会
                Thread.yield();
            } else if (var6 == null) {
                var6 = new FutureTask.WaitNode();
            } else if (!var7) {
                var7 = UNSAFE.compareAndSwapObject(this, waitersOffset, var6.next = this.waiters, var6);
            } else if (var1) {//有等待时间限制
                var2 = var4 - System.nanoTime();
                if (var2 <= 0L) {
                    this.removeWaiter(var6);
                    return this.state;
                }

                LockSupport.parkNanos(this, var2);
            } else {
                LockSupport.park(this);//通过LockSupport.park方法阻塞
            }
        }
                //最后没有等待结果时被唤醒(LockSupport.uppark())抛出打断异常
        this.removeWaiter(var6);
        throw new InterruptedException();
    }

4、FutureTask对象作为runnable实现类,传入Thread中,调用run方法调用的是callable的call方法,get方法在call方法没有返回时阻塞线程,等待结果返回。

该方法学会了理解起来也简单,但是相对写的类比较多,如果比较在意线程结果返回值来做判断条件的话,可以使用。

CountDownLatch实现

  • 代码实现
//MyRunnable的代码
        /**
     * 通过CountDownLatch的方式
     */
    public MyRunnable(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
        @Override
    public void run() {
        ...
        if (countDownLatch != null) {//完成后减1
            countDownLatch.countDown();
        }
        ...
    }

public class ThreadTest {

    public void testThread() {
        CountDownLatch countDownLatch = new CountDownLatch(3);//闭锁的数量 三个线程
        //MyRunnable的实现见第一种方式代码,此处不在重复
        Thread threadA = new Thread(new MyRunnable(countDownLatch), "A");
        Thread threadB = new Thread(new MyRunnable(countDownLatch), "B");
        Thread threadC = new Thread(new MyRunnable(countDownLatch), "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//执行结果
当前线程:A >> 0
当前线程:B >> 0
当前线程:C >> 0
当前线程:A >> 1
当前线程:B >> 1
当前线程:A >> 2
当前线程:A >> 3
当前线程:C >> 1
当前线程:B >> 2
当前线程:A >> 4
当前线程:C >> 2
当前线程:B >> 3
当前线程:C >> 3
当前线程:B >> 4
当前线程:C >> 4
当前线程:D >> Hello World!
  • 原理分析

CountDownLatch的核心方法有三个:

1、构造方式传入一个计数数量

private final CountDownLatch.Sync sync;

public CountDownLatch(int var1) {
    if (var1 < 0) {
        throw new IllegalArgumentException("count < 0");
    } else {
        this.sync = new CountDownLatch.Sync(var1);
    }
}

2、countDown计数减1

public void countDown() {
    this.sync.releaseShared(1);
}

3、await阻塞等待,还有一个方法是await(long var1, TimeUnit var3),等待具体的时间

public void await() throws InterruptedException {
    this.sync.acquireSharedInterruptibly(1);
}

public boolean await(long var1, TimeUnit var3) throws InterruptedException {
    return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1));
}

这三个方法都关联一个类:Sync,该类继承AbstractQueuedSynchronizer类,简称AQS,AQS是用来构建锁或者其他同步组件(信号量、事件等)的基础框架类,JDK中许多并发工具类的内部实现都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch等等,具体源码此处不做分析。

CountDownLatch简单理解就是await方法阻塞线程,在等待计数为0时唤醒等待,减少计数的方法可以是在不同的线程中,也可以在同一个线程中。使用也比较简单,理解也容易。

CyclicBarrier实现

  • 代码实现
//MyRunnable中的代码
/**
  * 通过CyclicBarrier的方式
  */
public MyRunnable(CyclicBarrier cyclicBarrier) {
  this.cyclicBarrier = cyclicBarrier;
}


@Override
public void run() {
  ...
    if (cyclicBarrier != null) {//通过栅栏方式
    try {
        cyclicBarrier.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }
    }
  ...
}

public class ThreadTest {

    public void testThread() {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4);//这个地方是4,不是3
        Thread threadA = new Thread(new MyRunnable(cyclicBarrier), "A");
        Thread threadB = new Thread(new MyRunnable(cyclicBarrier), "B");
        Thread threadC = new Thread(new MyRunnable(cyclicBarrier), "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//执行结果
当前线程:B >> 0
当前线程:B >> 1
当前线程:C >> 0
当前线程:B >> 2
当前线程:C >> 1
当前线程:B >> 3
当前线程:B >> 4
当前线程:A >> 0
当前线程:C >> 2
当前线程:C >> 3
当前线程:A >> 1
当前线程:C >> 4
当前线程:A >> 2
当前线程:A >> 3
当前线程:A >> 4
当前线程:D >> Hello World!
  • 原理分析

CyclicBarrier是通过ReentrantLock锁来实现的,具体源码就不分析了,其里面也有几个重要的方法:

1、构造方法

public CyclicBarrier(int var1){//传入等待条件数
  ...
}

2、等待方法,当wait方法调用次数达到设定的次数之后,统一唤醒所有等待地方。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return this.dowait(false, 0L);
    } catch (TimeoutException var2) {
        throw new Error(var2);
    }
}

CyclicBarrier和CountDownLatch有点类似,但是也有点不一样,用在跑步比赛上的区别就是:

1、CountDownLatch 计数减到0之后,阻塞的地方就可以开始跑了(阻塞了一个地方)

2、CyclicBarrier 当有运动员没有准备好(调用await方法)时,其他的运动员都等着(await阻塞),只有都准备好了再开始跑。

3、CountDownLatch计数只能用一次,CyclicBarrier可以循环使用。

所以用CyclicBarrier的实现思路,相当于让A、B、C三个线程阻塞在最后一步,然后线程D就绪,A、B、C三个线程收个尾之后线程D开始运行,严格意义上并不是A、B、C三个线程和D串行,在理论上实现了串行。

总结

以上就是我能想到的实现A、B、C三个线程并发然后和线程D串行执行的几种方法,代码都以贴出。如果有其他方法,欢迎提出补充。

相关文章

  • 线程串行并行调度实现

    线程串行并行调度实现 问题描述 问题描述:线程A、B、C并行执行,然后和线程D串行执行,如何实现。 问题具体化:现...

  • 并发

    并发的解释早期的单核用时间片调度模拟并发(宏观并行,微观串行)。目前的多核CPU加上超线程技术已实现真正的并行,所...

  • iOS 多线程技术总结

    概览 进程与线程的概念 多线程的由来 并行与并发 多线程的实现 串行与并行 线程的几种状态 串行队列与并发队列区别...

  • iOS多线程小结

    同步异步串行并行 同步串行:不开启线程 同步并行:不开启线程 异步串行:最多开启一个线程 异步并行:开启线程 同步...

  • 多线程GCD的使用

    一、同步/异步、串行/并行的区别 1.同步/异步 同步/异步是指线程与线程之间的关系。 2.串行/并行 串行、并行...

  • 常用的GCD记录一下

    子线程并行 串行 主线程 串行队列 子线程 并行队列 子线程 栅栏函数 控制执行顺序 避免数据竞争 多线...

  • GCD

    1、同步串行队列 2、同步并行队列 3、异步串行队列 4、异步并行队列 5、死锁 主线程中创建同步串行队列 主线程...

  • GCD 小结

    一、 同步/异步、串行/并行的区别 1.同步/异步 同步/异步是指线程与线程之间的关系. 2.串行/并行 串行/并...

  • 多线程面试题高级

    1、首先认识清楚,多线程分为:并行和串行,,并行和串行又可以包括同步线程和异步线程。GCD仅仅支持FIFO(先入先...

  • 【iOS出租屋进阶】之多线程GCD详解

    线程、任务和队列的概念 异步、同步 & 并行、串行的特点 组合 |并行队列|串行队列 |主队列----|----|...

网友评论

    本文标题:线程串行并行调度实现

    本文链接:https://www.haomeiwen.com/subject/thgvmctx.html