美文网首页
线程之间的通信

线程之间的通信

作者: 云芈山人 | 来源:发表于2021-07-03 17:41 被阅读0次

当多个线程一起协调完成一件事时,那么线程之间的沟通就显得尤为重要。

1. 假设线程A,线程B,两个线程分别打印1-3三个数字。

    private static void ThreadDemo1(){
        Thread A = new Thread(new Runnable() {
            public void run() {
                printNum("A");
            }
        });
        Thread B = new Thread(new Runnable() {
            public void run() {
                printNum("B");
            }
        });
        A.start();
        B.start();
    }

    //数字打印  
    private static void printNum(String threadname){
        int i = 0;
        while ( i++ < 3 ){
            try{
                Thread.sleep(100);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
            System.out.println( threadname + "print:" + i );
        }
    }
    
    public static void main(String[] args) {
        ThreadDemo1();
    }

运行得到的结果:

Aprint:1
Bprint:1
Bprint:2
Aprint:2
Aprint:3
Bprint:3

分析:

可以看出线程A和线程B是同时运行的,且顺序是随机的,每次运行得到的结果可能不一样。

2. 如何让A、B两个线程依次执行?

如果希望线程B在线程A全部打印完成后再开始打印。可以利用thread.join()方法。

   private static void ThreadDemo2(){
        final Thread A = new Thread(new Runnable() {
            public void run() {
                printNum("A");
            }
        });
        
        Thread B = new Thread(new Runnable() {
            public void run() {
                System.out.println("B 开始等待 A");
                try{
                    A.join();
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
                printNum("B");
            }
        });
        B.start();
        A.start();
    }

   private static void printNum(String threadname){
        int i = 0;
        while ( i++ < 3 ){
            try{
                Thread.sleep(100);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
            System.out.println( threadname + "print:" + i );
        }
    }
    
    public static void main(String[] args) {
        ThreadDemo2();
    }

运行得到的结果:

B 开始等待 A
Aprint:1
Aprint:2
Aprint:3
Bprint:1
Bprint:2
Bprint:3

分析:

Thread.join
含义:
简单来说,就是线程没有执行完之前,会一直阻塞在join方法处。

join对于线程的作用.png

Thread.join源码

public class Thread implements Runnable {
    public final void join() throws InterruptedException {
        join(0);
    }

    public final synchronized void join(long millis) throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {//判断是否携带阻塞的超时时间,等于0表示没有设置超时时间
            while (isAlive()) {//isAlive获取线程状态,无线等待直到AThread线程结束
                wait(0); //调用Object中的wait方法实现线程的阻塞
            }
        } else {//阻塞直到超时
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

}

总结:Thread.join其实底层是通过wait/notifyall来实现线程的通信达到线程阻塞的目的;当线程执行结束以后,会触发两个事情,第一个是设置native线程对象为null、第二个是通过notifyall方法,让等待在A对象锁上的wait方法被唤醒。

3. 那如何让 A、B两个线程按照指定方式有序交叉运行呢?

希望 A 在打印完 1 后,再让 B 打印 1, 2, 3,最后再回到 A 继续打印 2, 3。这种需求下,显然 Thread.join() 已经不能满足了。我们需要更细粒度的锁来控制执行顺序。

这里,我们可以利用 object.wait() 和 object.notify() 两个方法来实现。

private static void ThreadDemo3(){
        final Object lock = new Object();  
        Thread A = new Thread(new Runnable() {
            public void run() {
                synchronized(lock){
                    System.out.println("Aprint: 1");
                    try{
                        System.out.println("Aprint: wait");
                        lock.wait();
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                    System.out.println("Aprint: 2");
                    System.out.println("Aprint: 3");
                }
            }
        });
        
        Thread B = new Thread(new Runnable() {
            public void run() {
                synchronized (lock) {
                    System.out.println("Bprint: 1");
                    System.out.println("Bprint: 2");
                    System.out.println("Bprint: 3");
                    lock.notify();
                }
            }
        });
        A.start();
        B.start();
    }
    
    public static void main(String[] args) {
        ThreadDemo3();
    }

运行得到的结果:

Aprint: 1
Aprint: wait
Bprint: 1
Bprint: 2
Bprint: 3
Aprint: 2
Aprint: 3

分析:

  1. 首先建立一个A和B共享的对象锁 lock = new Object();
  2. 当A得到锁后,先打印1,然后调用 lock.wait()方法,交出锁的控制权,进入wait状态;
  3. 对B而言,由于A最开始得到了锁,导致B无法执行;直到A调用 lock.wait()释放控制权后,B才得到了锁;
  4. 在B得到锁后,B开始打印1,2,3;然后调用 lock.notify()方法,唤醒正在wait的A;
  5. A被唤醒后,继续打印剩下的2,3。

wait():使调用该方法的线程释放共享资源锁,然后从运行状态退出,进入等待队列,直到被再次唤醒。
notify():随机唤醒等待队列中等待同一共享资源的一个线程,并使该线程退出等待队列,进入可运行状态,也就是notify()方法仅通知一个线程。

4. 四个线程 A B C D,其中 D 要等到 A B C 全执行完毕后才执行,而且 A B C 是同步运行的。

如果使用 thread.join(),可以让一个线程等另一个线程运行完毕后再继续执行,那我们可以在 D 线程里依次 join A B C,不过这也就使得 A B C 必须依次执行,而我们要的是这三者能同步运行。

我们希望达到的目的是:A B C 三个线程同时运行,各自独立运行完后通知 D;对 D 而言,只要 A B C 都运行完了,D 再开始运行。针对这种情况,我们可以利用 CountdownLatch 来实现这类通信方式。它的基本用法是:

  1. 创建一个计数器,设置初始值,CountDownLatch countDownLatch = new CountDownLatch(2);
  2. 在等待线程里调用countDownLatch.await()方法,进入等待状态,直到计数器变成0;
  3. 在其他线程里,调用countDownLatch.countDown()方法,该方法会将计数值减少1;
  4. 在其他线程里countDown()方法把计数值变成0时,等待线程里的countDownLatch.await()立即退出,继续执行下面的代码。
private static void runDAfterABC() {  
        int worker = 3;  
        final CountDownLatch countDownLatch = new CountDownLatch(worker);  
        new Thread(new Runnable() {  
            @Override  
            public void run() {  
                System.out.println("D is waiting for ABC");  
                try {  
                    countDownLatch.await();  
                    System.out.println("All done, D starts working");  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
        for (char threadName='A'; threadName <= 'C'; threadName++) {  
            final String tN = String.valueOf(threadName);  
            new Thread(new Runnable() {  
                @Override  
                public void run() {  
                    System.out.println(tN + " is working");  
                    try {  
                        Thread.sleep(100);  
                    } catch (Exception e) {  
                        e.printStackTrace();  
                    }  
                    System.out.println(tN + " finished");  
                    countDownLatch.countDown();  
                }  
            }).start();  
        }  
    }  
    
    public static void main(String[] args) {
        runDAfterABC();
    }

运行结果:

D is waiting for ABC
A is working
C is working
B is working
C finished
B finished
A finished
All done, D starts working

分析:

CountDownLatch可以用来倒计时,但当计数完毕,只有一个线程的await()会得到响应,无法让多个线程同时触发。 【CountDownLatch 适用于一个线程去等待多个线程的情况。】

5. 线程 A B C 各自开始准备,直到三者都准备完毕,然后再同时运行 。

为了实现线程间互相等待这种需求,我们可以利用 CyclicBarrier 数据结构。
它的基本用法是:

  1. 先创建一个公共CyclicBarrier对象,设置同时等待的线程数,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
  2. 这些线程同时开始自己做准备,自身准备完毕后,需要等待别人准备完毕,这是调用cyclicBarrier.await(),即可开始等待别人;
  3. 当指定的同时等待的线程数都调用了cyclicBarrier.await()时,意味着这些线程都准备完毕,然后这些线程才同时继续执行。
    public static void runABCWhenAllReady(){
        int runner = 3;
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(runner);
        final Random random = new Random();
        for ( char runnername = 'A' ;runnername <= 'C' ; runnername++ ){
            final String rN = String.valueOf(runnername);
            new Thread(new Runnable(){
                @Override
                public void run() {
                    long prepareTime = random.nextInt(10000)+100;
                    System.out.println(rN + " is preparing for time:"+prepareTime);
                    try{
                        Thread.sleep(prepareTime);
                    }catch(Exception e){
                        e.printStackTrace();
                    }
                    try{
                        System.out.println(rN + " is preparing,waiting for others");
                        //当前线程准备完毕后,等待别人准备好
                        cyclicBarrier.await();
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }catch(BrokenBarrierException e){
                        e.printStackTrace();
                    }
                    //所有线程都准备好了,准备一起跑
                    System.out.println(rN+" starts running");
                }
            }).start();;
        }
    }
    
    public static void main(String[] args) {
        runABCWhenAllReady();
    }

运行结果:

A is preparing for time:1150
C is preparing for time:10096
B is preparing for time:4127
A is preparing,waiting for others
B is preparing,waiting for others
C is preparing,waiting for others
C starts running
A starts running
B starts running

6. 子线程完成某件任务后,把得到的结果回传给主线程。

可以利用接口类Callable

private static void doTaskWithResultInWorker() {  
        Callable<Integer> callable = new Callable<Integer>() {  
            @Override  
            public Integer call() throws Exception {  
                System.out.println("Task starts");  
                Thread.sleep(1000);  
                int result = 0;  
                for (int i=0; i<=100; i++) {  
                    result += i;  
                }  
                System.out.println("Task finished and return result");  
                return result;  
            }  
        };  
        FutureTask<Integer> futureTask = new FutureTask(callable);  
        new Thread(futureTask).start();  
        try {  
            System.out.println("Before futureTask.get()");  
            System.out.println("Result: " + futureTask.get());  
            System.out.println("After futureTask.get()");  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        } catch (ExecutionException e) {  
            e.printStackTrace();  
        }  
    } 
    
    public static void main(String[] args) {
        doTaskWithResultInWorker();
    }

运行结果:

Before futureTask.get()
Task starts
Task finished and return result
Result: 5050
After futureTask.get()

分析:

Runnable源码

@FunctionalInterface
public interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

Callable源码

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable与Runnable最大的区别就是返回一泛型V结果。

那么,如何把子线程的结果回传回来呢?在Java里,有个类配合Callable使用:FutureTask,不过需注意,它获取结果的get方法会阻塞主线程。

public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

 private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
}

从运行结果可以看出,主线程调用futureTask.get()方法时阻塞主线程;然后Callable内部开始执行,并返回运行结果;此时futureTask.get()得到结果,主线程恢复运行。

通过FutureTask和Callable可以直接在主线程获得子线程对的运算结果,只不过需要阻塞主线程。当然不希望阻塞主线程,可以考虑利用ExecutorService,把FutureTask放到线程池去管理执行。

相关文章

  • Java内存模型

    线程之间的通信和同步 线程之间的通信和同步是并发编程领域的关键问题。 线程之间的通信 通信是指线程之间以何种机制来...

  • 线程间通信

    线程间通信就是子线程和主线程之间的通信

  • 说一下线程之间的通信

    线程之间的通信基本概念: 在一个进程中,线程往往不是孤立存在的,多个线程之间经常需要进行通信; 线程之间通信的体现...

  • 线程与线程,进程与进程之间的通信

    线程与线程之间的通信 一,为什么要线程通信? 1>多个线程...

  • iOS开发多线程--线程通信

    线程之间的通信 简单说明线程间通信:在1个进程中,线程往往不是孤立存在的,多个线程之间需要经常进行通信。 线程间通...

  • java多线程(八)JMM和底层实现原理

    线程之间的通信线程的通信是指线程之间以何种机制来交换信息。在编程中,线程之间的通信机制有两种,共享内存和消息传递。...

  • ios 多线程的故事4

    线程间通信 线程间通信:在1个进程中,线程往往不是孤立存在的,多个线程之间需要经常进行通信 线程间通信的体现 1个...

  • 最详细分析Java 内存模型

    并发编程中, 线程之间如何通信及线程之间如何同步, 通信是指线程之间以何种机制来交换信息。在命令式编程中,线程之间...

  • Java线程通信-线程协调API

    线程通信 想要实现多个线程之间的协同,如:线程执行顺序,获取某个线程执行结果等,则需要使用线程之间互相通信。 文件...

  • JMM和底层实现原理

    1.并发编程领域的关键问题 1.1 线程之间的通信 线程的通信是指线程之间以何种机制来交换信息。在编程中,线程之间...

网友评论

      本文标题:线程之间的通信

      本文链接:https://www.haomeiwen.com/subject/ayoyultx.html