美文网首页
Java线程间通信

Java线程间通信

作者: kdong | 来源:发表于2018-10-09 10:20 被阅读0次

    涉及到多个线程协同完成某个任务的时候,就用到了线程间通信的相关知识点。
    这其中涉及到的知识点有:
    (1)thread.join()
    (2)object.wait()
    (3)object.notify()
    (4)CountdownLatch()
    (5)CyclicBarrier
    (6)FutureTask
    (7)Callable

    一、如何让两个线程依次执行?
    假设有两个线程,线程A和线程B,依次让它们打印1到3:

     private static void demo1()
        {
            Thread A=new Thread(() -> printNumber("A"));
    
            Thread B=new Thread(() -> printNumber("B"));
    
            A.start();
            B.start();
        }
    
    private static void printNumber(String threadName)
        {
            int i=0;
            while(i++<3)
            {
                try{
                    Thread.sleep(100);
                }catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
    
                System.out.println(threadName+" print:"+i);
            }
    
        }
    

    打印输出结果:

    B print:1
    A print:1
    B print:2
    A print:2
    A print:3
    B print:3
    

    可以看出A和B是同时打印的。如果想让A和B依次执行的话,则需要使用thread.join()方法。

      private static void demo2()
        {
            Thread A=new Thread(() -> printNumber("A"));
    
            Thread B=new Thread(() -> {
                try{
                    A.join();
                }catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
    
                printNumber("B");
    
            });
    
            A.start();
            B.start();
        }
    

    现在的打印结果:

    A print:1
    A print:2
    A print:3
    B print:1
    B print:2
    B print:3
    

    可以看到现在是顺序执行。

    二、如何让线程按照指定的顺序执行?

    还是上面那个例子,如果想让A线程打印完A 1之后,接着执行完B线程,接着再执行A线程,thread.join()方法显然不能办到。这时候需要使用object的wait()和notify()方法。

    private static void demo3()
        {
            Object lock=new Object();
            Thread A=new Thread(() -> {
                synchronized (lock)
                {
                    System.out.println("A 1");
                    try{
                        lock.wait();
                    }catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
    
                    System.out.println("A 2");
                    System.out.println("A 3");
    
                }
            });
    
            Thread B=new Thread(() -> {
                synchronized (lock)
                {
                    System.out.println("B 1");
                    System.out.println("B 2");
                    System.out.println("B 3");
                    lock.notify();
                }
            });
    
            A.start();
            B.start();
        }
    

    打印结果:

    A 1
    B 1
    B 2
    B 3
    A 2
    A 3
    

    lock为新建的线程A和线程B共享的对象锁。线程A获取锁,打印A 1,再调用lock.wait(),交出锁的控制权。线程B获取锁,执行完毕后,再调用lock.notify(),唤醒在等待的线程A,线程A继续执行完毕。
    为了显示更清楚,更方便理解,现加入log:

     private static void demo3Info() {
            Object lock = new Object();
            Thread A = new Thread(() -> {
                System.out.println("INFO: A 等待锁 ");
                synchronized (lock) {
                    System.out.println("INFO: A 得到了锁 lock");
                    System.out.println("A 1");
                    try {
                        System.out.println("INFO: A 准备进入等待状态,放弃锁 lock 的控制权 ");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("INFO: 有人唤醒了 A, A 重新获得锁 lock");
                    System.out.println("A 2");
                    System.out.println("A 3");
                }
            });
            Thread B = new Thread(() -> {
                System.out.println("INFO: B 等待锁 ");
                synchronized (lock) {
                    System.out.println("INFO: B 得到了锁 lock");
                    System.out.println("B 1");
                    System.out.println("B 2");
                    System.out.println("B 3");
                    System.out.println("INFO: B 打印完毕,调用 notify 方法 ");
                    lock.notify();
                }
            });
            A.start();
            B.start();
        }
    

    打印结果:

    INFO: A 等待锁 
    INFO: A 得到了锁 lock
    A 1
    INFO: A 准备进入等待状态,放弃锁 lock 的控制权 
    INFO: B 等待锁 
    INFO: B 得到了锁 lock
    B 1
    B 2
    B 3
    INFO: B 打印完毕,调用 notify 方法 
    INFO: 有人唤醒了 A, A 重新获得锁 lock
    A 2
    A 3
    

    三、四个线程A、B、C、D,A、B、C线程执行完毕之后,再去执行D线程。
    thread.join()的作用是让一个线程执行完之后再执行另外一个线程,明显不符合当前的这个需求。在这里,我们希望A、B、C线程可以同时执行,执行完毕之后,再执行线程D。针对这种情况,我们可以使用CountdownLatch来实现这种通信要求。

    创建一个计数器,设置初始值,CountdownLatch countDownLatch = new CountDownLatch(2);
    在 等待线程 里调用 countDownLatch.await() 方法,进入等待状态,直到计数值变成 0;
    在 其他线程 里,调用 countDownLatch.countDown() 方法,该方法会将计数值减小 1;
    当 其他线程 的 countDown() 方法把计数值变成 0 时,等待线程 里的 countDownLatch.await() 立即退出,继续执行下面的代码。
    
    private static void runDAfterABC() {
            int worker = 3;
            CountDownLatch countDownLatch = new CountDownLatch(worker);
            new Thread(() -> {
                System.out.println("D is waiting for other three threads");
                try {
                    countDownLatch.await();
                    System.out.println("All done, D starts working");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            for (char threadName='A'; threadName <= 'C'; threadName++) {
                final String tN = String.valueOf(threadName);
                new Thread(() -> {
                    System.out.println(tN + " is working");
                    try {
                        Thread.sleep(100);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println(tN + " finished");
                    countDownLatch.countDown();
                }).start();
            }
        }
    

    打印结果:

    D is waiting for other three threads
    B is working
    C is working
    A is working
    B finished
    A finished
    C finished
    All done, D starts working
    

    CountDownLatch 适用于一个线程去等待多个线程的情况。线程D中调用await方法进入等待,线程A、B、C开始执行,每执行完一次,计时器减去1,当计时器为0时,触发await运行结束,继续执行线程D直到结束。

    四、A、B、C三线程准备好之后,同时运行
    为了实现线程之间的互相通信,我们可以利用 CyclicBarrier 数据结构,它的基本用法是:

    先创建一个公共 CyclicBarrier 对象,设置 同时等待 的线程数,CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    这些线程同时开始自己做准备,自身准备完毕后,需要等待别人准备完毕,这时调用 cyclicBarrier.await(); 即可开始等待别人;
    当指定的 同时等待 的线程数都调用了 cyclicBarrier.await();时,意味着这些线程都准备完毕好,然后这些线程才 同时继续执行。
    
    private static void runABCWhenAllReady() {
            int runner = 3;
            CyclicBarrier cyclicBarrier = new CyclicBarrier(runner);
            final Random random = new Random();
            for (char runnerName='A'; runnerName <= 'C'; runnerName++) {
                final String rN = String.valueOf(runnerName);
                new Thread(() -> {
                    long prepareTime = random.nextInt(10000) + 100;
                    System.out.println(rN + " is preparing for time: " + prepareTime);
                    try {
                        Thread.sleep(prepareTime);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    try {
                        System.out.println(rN + " is prepared, waiting for others");
                        cyclicBarrier.await(); // 当前运动员准备完毕,等待别人准备好
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    System.out.println(rN + " starts running"); // 所有运动员都准备好了,一起开始跑
                }).start();
            }
        }
    

    打印结果:

    A is preparing for time: 5606
    C is preparing for time: 8550
    B is preparing for time: 5172
    B is prepared, waiting for others
    A is prepared, waiting for others
    C is prepared, waiting for others
    C starts running
    A starts running
    B starts running
    

    五、子线程完成某项任务后,将得到的结果返回主线程

    在这里需要使用Callable和FutureTask一起使用,但需要注意的是,FutureTask获取结果的时候回阻塞主线程。
    例如,我们利用子线程将1加到100的结果返回给主线程:

    private static void doTaskWithResultInWorker() {
            Callable<Integer> callable = () -> {
                System.out.println("Task starts");
                Thread.sleep(1000);
                int result = 0;
                for (int i=0; i<=100; i++) {
                    result += i;
                }
                System.out.println("Task finished and return result");
                return result;
            };
            FutureTask<Integer> futureTask = new FutureTask<>(callable);
            new Thread(futureTask).start();
            try {
                System.out.println("Before futureTask.get()");
                System.out.println("Result: " + futureTask.get());
                System.out.println("After futureTask.get()");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    

    打印结果:

    Before futureTask.get()
    Task starts
    Task finished and return result
    Result: 5050
    After futureTask.get()
    

    可以看到,主线程调用 futureTask.get() 方法时阻塞主线程;然后 Callable 内部开始执行,并返回运算结果;此时 futureTask.get() 得到结果,主线程恢复运行。
    这里我们可以学到,通过 FutureTask 和 Callable 可以直接在主线程获得子线程的运算结果,只不过需要阻塞主线程。当然,如果不希望阻塞主线程,可以考虑利用 ExecutorService,把 FutureTask 放到线程池去管理执行。

    相关文章

      网友评论

          本文标题:Java线程间通信

          本文链接:https://www.haomeiwen.com/subject/ntxeaftx.html