美文网首页Java【重复造轮子系列】Java 杂谈技术干货
【重复造轮子系列】——使用 wait 和 notify 实现 C

【重复造轮子系列】——使用 wait 和 notify 实现 C

作者: 苦行孙 | 来源:发表于2018-03-23 11:14 被阅读108次

前言

前几天看面试题的时候,偶然在知乎看到了一道某家大型互联网公司出的面试题 “一主多从 多线程协作”,题目是这样的

问:客户请求下单服务(OrderService),服务端会验证用户的身份(RemotePassportService),用户的银行信用(RemoteBankService),用户的贷款记录(RemoteLoanService)。为提高并发效率,要求三项服务验证工作同时进行,如其中任意一项验证失败,则立即返回失败,否则等待所有验证结束,成功返回。要求Java实现

知乎原链接

由于 CountDownLatch 内部是使用 AQS 同步器实现的,我就想到了能不能用 synchronized 内置锁,来模拟实现一下闭锁的功能,经过了一天的思考,使用 wait 和 notify 做了一个简单的例子.......

什么是 CountDownLatch ?

CountDownLatch 是一个非常实用的多线程控制工具类。在 JDK1.5 之后被引入,这个工具通常用来控制线程等待,它可以让一个线程或多个线程等待其它线程完成各自的操作以后,再继续执行。

在这里有关于 CountDownLatch 的使用方式和应用场景就不再描述了,话不多说,下面是具体的实现代码。

【我只是做了一个简单的实现, 至于性能和安全性方面, 还希望大神们多多指导】

模拟实现 CountDownLatch

首先定义一个基本的功能接口:


interface CountDownLatchSimulate {

    int getCount();

    void countDown();

    void await() throws InterruptedException;

    int threadCount();
}

使用 Synchronized 实现的工具类如下:


import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

/**
 * @author Lin Shu
 * Please contact me if you have any questions.
 * My e-mail is syt0438@gmail.com
 * Date: 2018/3/22
 * Time: 11:37
 */
public class CountDownLatchSynchronized implements CountDownLatchSimulate {
    private static final Logger LOGGER = Logger.getGlobal();

    private final Object lock = new Object();
    /**
     * 通过 state 实现类似 CountDownLatch 的state,
     * 多个任务等待 state 为 0 时,恢复执行
     */
    private final AtomicInteger state;
    /**
     * 记录当前闭锁是否已经关闭,防止出现所有子线程 notify 已经发生,而 await 还没有发生的现象.
     */
    private final AtomicBoolean lockClosed = new AtomicBoolean(false);
    /**
     * 通过 Map 保证 await 的线程,一定是在其它线程执行完之后唤醒
     */
    private volatile ConcurrentHashMap<String, Thread> threads = new ConcurrentHashMap<>();

    public CountDownLatchSynchronized(int state) {
        this.state = new AtomicInteger(state);
    }

    @Override
    public int getCount() {
        return state.get();
    }

    /**
     * 1. 通过 state 记录闭锁状态
     * 2. 通过一个 Map 来保证 await 线程始终在其它countDown线程执行完成之后恢复执行
     * 3. 当所有其它线程都已经执行完成并且调用了 notify 方法之后,由于 await 线程
     * 还未执行到 await 方法未阻塞,从而错过了 notify 通知,因此会一直阻塞下去
     * 通过 lockClosed 来保存闭锁当前的状态,来解决丢失通知的问题
     */
    @Override
    public void countDown() {
        synchronized (lock) {
            final int val = state.decrementAndGet();

            if (val == 0) {
                /*
                 * 当闭锁状态等于 0 时,闭锁达到临界值,通知所有线程恢复执行
                 * */

                try {
                    LOGGER.warning("[" + threads.size() + "|" + getCount() + "]Notify All : " + Thread.currentThread());

                    lock.notifyAll();

                    LOGGER.info("val: " + val + " : threadCount: " + threadCount());
                } finally {
                    /*
                     * 保存闭锁的关闭状态
                     * */

                    lockClosed.compareAndSet(false, true);

                    LOGGER.warning("[" + threads.size() + "|" + getCount() + "]Count Down Closed: " + Thread.currentThread());
                }
            } else {

                /*
                 * 当闭锁状态大于 0 时,当前线程进入等待状态,
                 * 并将当前线程保存到 Map 当中,以保证 await 线程的执行顺序
                 * */

                try {
                    threads.put(Thread.currentThread().getName(), Thread.currentThread());
                    LOGGER.warning("[" + threads.size() + "|" + getCount() + "]Put Thread: " + Thread.currentThread());

                    lock.wait();
                } catch (InterruptedException e) {
                    Logger.getGlobal().warning("[" + threadCount() + "|" + getCount() + "]Count Down Thread Interupted: " + Thread.currentThread());
                } finally {
                    threads.remove(Thread.currentThread().getName(), Thread.currentThread());
                    LOGGER.warning("[" + threads.size() + "|" + getCount() + "]Remove Thread: " + Thread.currentThread());


                    if (threads.isEmpty()) {
                        lock.notifyAll();
                    }

                    LOGGER.info("val: " + val + " : threadCount: " + threadCount() + "|" + getCount());
                }
            }
        }
    }

    @Override
    public void await() throws InterruptedException {
        synchronized (lock) {
            if (!lockClosed.get()) {
                /*
                 * 如果闭锁状态 > 0 并且 lockClosed 为假,未到达关闭状态,则当前线程进入等待状态
                 * */
                lock.wait();
            } else {
                /*
                 * 如果闭锁状态 == 0 并且 lockClosed 为假,到达关闭状态,当前线程不等待,继续执行
                 * */

                LOGGER.info("[" + System.currentTimeMillis() + "|" + getCount() + "] Count Down Latch is Closed become to Skip wait [" + Thread.currentThread().getName() + "]让出CPU执行权!");
            }

            while (!threads.isEmpty()) {
                /*
                 * 如果其它线程还没有执行完成,则当前线程进入等待状态,让出 CUP 的执行权
                 * */

                LOGGER.info("[" + System.currentTimeMillis() + "|" + getCount() + "]Await线程[" + Thread.currentThread().getName() + "]让出CPU执行权!");

                if (!lockClosed.get()) {
                    lock.wait();
                } else {
                    break;
                }
            }

            LOGGER.info("[" + System.currentTimeMillis() + "|" + getCount() + "]Await线程[" + Thread.currentThread().getName() + "]退出等待状态!");

        }
    }

    @Override
    public int threadCount() {
        return threads.size();
    }
}


具体的代码之后,会上传到我的 github 上去,【我是传送门,点击我】

总结

通过本文的介绍,希望大家能够了解 CountDownLatch 的应用场景和工作机制。

第一次写技术文章,文笔和写作逻辑方面难免有一些问题的存在,还望各位同仁多多指教。

相关文章

网友评论

    本文标题:【重复造轮子系列】——使用 wait 和 notify 实现 C

    本文链接:https://www.haomeiwen.com/subject/xawtcftx.html