美文网首页
Volatile Happens-before 原则

Volatile Happens-before 原则

作者: leeehao | 来源:发表于2020-08-08 16:15 被阅读0次

在研究 synchronizedReentrantLock 时产生一个疑问, synchronized 通过 monitor 机制保证线程间可见性,但是 ReentrantLock 是如何保证的呢。

ReentrantLock 如何保证线程间可见性

ReentrantLock 基于 AQS 实现,AQS 的核心之一就是 state 管理。
state 被 volatile 修饰保证此变量在线程间可见,但是在查阅相关框架和库时发现某些作者竟然可以通过 ReentrantLock 实现和 synchronized 一致的线程可见性。
原来是我理解错了,volatile 并不仅仅保证被修饰的变量在线程间可见性,而是保证在写入 volatile 变量后,其他线程读取此 volatile 变量后的所有操作都保证可见性。
此特性正是由于 Happens-before 原则,ReentrantLock 受此原则约束。

java.util.concurrent.locks.Lock 接口的Javadoc中有这样一段话:

All Lock implementations must enforce the same memory synchronization semantics as provided by the built-in monitor lock :

  • A successful lock operation acts like a successful monitorEnter action
  • A successful unlock operation acts like a successful monitorExit action

Unsuccessful locking and unlocking operations, and reentrant locking/unlocking operations, do not require any memory synchronization effects.

这段话的核心是j.u.c.locks.Lock接口的实现类具有和synchronized内置锁一样的内存同步语义。

文字表达些许干燥,上示例代码

Volatile Happens-before 示例

import java.util.concurrent.TimeUnit;

public class HappensBefore {

    public volatile int a = 1;
    public boolean run = false;

    public static void main(String[] args) {
        HappensBefore happensBefore = new HappensBefore();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            happensBefore.run = true;
            happensBefore.a = 2;
        }).start();

        new Thread(() -> {
            while (true) {
                while (happensBefore.run) {
                    System.out.println("check running");
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                while (happensBefore.a == 2) {
                    if (happensBefore.run) {
                        System.out.println("is running");
                        return;
                    }
                }
            }
        }).start();
    }

}

本段代码会输出 is running 但程序不会停止运行,因为 run 变量无法保证在线程2的可见性。
但是线程3正常会正常输出,验证了 Happens-before 原则。

反向验证

如果对一个变量的读操作先行发生于后面对这个变量的读操作,是否成立 Happens-before 呢

public class HappensBefore {

    public volatile int a = 1;
    public boolean run = false;

    public static void main(String[] args) {
        HappensBefore happensBefore = new HappensBefore();

        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            happensBefore.run = true;
            System.out.println(happensBefore.a);
            // happensBefore.a = 2;
        }).start();

        new Thread(() -> {
            while (true) {
                while (happensBefore.run) {
                    System.out.println("check running");
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                while (happensBefore.a == 2) {
                    if (happensBefore.run) {
                        System.out.println("is running");
                        return;
                    }
                }
            }
        }).start();
    }
}

程序仅输出了 1 并没有输出 check running 说明遵守:

volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;

什么是 Happens-before

  1. 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作;
  2. 锁定规则:一个unLock操作先行发生于后面对同一个锁额lock操作;
  3. volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作;
  4. 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C;
  5. 线程启动规则:Thread对象的start()方法先行发生于此线程的每个一个动作;
  6. 线程中断规则:对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生;
  7. 线程终结规则:线程中所有的操作都先行发生于线程的终止检测,我们可以通过Thread.join()方法结束、
  8. Thread.isAlive()的返回值手段检测到线程已经终止执行;
  9. 对象终结规则:一个对象的初始化完成先行发生于他的finalize()方法的开始;

利用 ReentranLock 保证可见性生产示例

package com.baidu.brpc.utils;

import lombok.Getter;
import lombok.Setter;

import java.util.ArrayList;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Since {@linkplain java.util.concurrent.ExecutorService ExecutorService} is a
 * bit slow, we have to invent some wheels here.
 * <p>
 * NOT implement the {@linkplain java.util.concurrent.Executor Executor}
 * interface unless we see the necessity
 * @author Zhangyi Chen (chenzhangyi01@baidu.com)
 * @update Wenwei hu (huwenwei@baidu.com)
 */
public final class ThreadPool {
    private static final int DEFAULT_QUEUE_SIZE = 1024;

    // Stand alone lock
    private BoundedQueue<Runnable> produced;
    private BoundedQueue<Runnable> toConsume;
    private Lock consumerLock;
    private Lock producerLock;
    private Condition isProducerNotFullCondition;
    private Condition isProducerNotEmptyCondition;
    private ArrayList<Thread> threads;

    private volatile boolean stopped;

    public ThreadPool(int initialThreadNum, ThreadFactory threadFactory) {
        this(initialThreadNum, threadFactory, 0);
    }

    public ThreadPool(int initialThreadNum, ThreadFactory threadFactory, int queueSize) {
        if (initialThreadNum <= 0) {
            throw new IllegalArgumentException(
                    "initialThreadNum=" + initialThreadNum
                     + " should be positive" );
        }
        threads = new ArrayList<Thread>(initialThreadNum);
        stopped = false;

        if (queueSize <= 0) {
            queueSize = DEFAULT_QUEUE_SIZE;
        }
        produced = new BoundedQueue<Runnable>(queueSize);
        toConsume = new BoundedQueue<Runnable>(queueSize);
        consumerLock = new ReentrantLock();
        producerLock = new ReentrantLock();
        isProducerNotEmptyCondition = producerLock.newCondition();
        isProducerNotFullCondition = producerLock.newCondition();
        // Start working threads at last, don't put any code after, or there
        // will be race condition
        for (int i = 0; i < initialThreadNum; ++i) {
            Thread tr = threadFactory.newThread(new Runnable() {
                @Override
                public void run() {
                    consume();
                }
            });
            tr.start();
            threads.add(tr);
        }
    }

    private void consume() {
        while (true) {
            Runnable task = null;
            while (true) {
                consumerLock.lock();
                try {
                    if (!toConsume.isEmpty()) {
                        task = toConsume.pop();
                        break;
                    }
                } finally {
                    consumerLock.unlock();
                }

                producerLock.lock();
                try {
                    while (!stopped && produced.isEmpty()) {
                        try {
                            isProducerNotEmptyCondition.await();
                        } catch (InterruptedException ex) {
                            // ignore
                        }
                    }
                    if (!produced.isEmpty()) {
                        if (produced.isFull()) {
                            isProducerNotFullCondition.signalAll();
                        }
                        consumerLock.lock();
                        try {
                            BoundedQueue<Runnable> tmp = produced;
                            produced = toConsume;
                            toConsume = tmp;
                        } finally {
                            consumerLock.unlock();
                        }
                    } else {
                        // stopped must be true
                        break;
                    }
                } finally {
                    producerLock.unlock();
                }
            }
            if (task != null) {
                task.run();
            } else {
                // The thread pool was shut down
                break;
            }
        }
    }

    public void stop() {
        stopped = true;
        producerLock.lock();
        try {
            isProducerNotEmptyCondition.signalAll();
            isProducerNotFullCondition.signalAll();
        } finally {
            producerLock.unlock();
        }
    }

    public void join() {
        synchronized (threads) {
            for (Thread tr : threads) {
                try {
                    tr.join();
                } catch (InterruptedException e) {
                    // pass
                }
            }
            threads.clear();
        }
    }

    public boolean submit(Runnable task) {
        Runnable[] tasks = { task };
        return submit(tasks, 0, 1) == 1;
    }

    public long submit(Runnable []tasks, int offset, int len) {
        int cur = offset;
        int end = offset + len;
        while (!stopped && cur < end) {
            producerLock.lock();
            try {
                while (produced.isFull()) {
                    try {
                        isProducerNotFullCondition.await();
                    } catch (InterruptedException ex) {
                        // ignore
                    }
                }
                int toProduce = Math.min(produced.remainingCapacity(),
                        end - cur);
                if (toProduce > 0) {
                    boolean wasEmpty = produced.isEmpty();
                    produced.addAll(tasks, cur, toProduce);
                    if (wasEmpty) {
                        isProducerNotEmptyCondition.signalAll();
                    }
                }
                cur += toProduce;
            } finally {
                producerLock.unlock();
            }
        }
        return cur - offset;
    }

    public StatInfo getStatInfo() {
        StatInfo statInfo = new StatInfo();
        statInfo.setThreadNum(threads.size());
        statInfo.setDefaultQueueCapacity(DEFAULT_QUEUE_SIZE);
        statInfo.setProducerQueueSize(produced.size());
        statInfo.setConsumerQueueSize(toConsume.size());
        return statInfo;
    }

    @Setter
    @Getter
    public static class StatInfo {
        private int threadNum;
        private int defaultQueueCapacity;
        private int producerQueueSize;
        private int consumerQueueSize;
    }

    public boolean isStopped() {
        return stopped;
    }

}

相关参考

http://ifeve.com/java%E9%94%81%E6%98%AF%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%95%B0%E6%8D%AE%E5%8F%AF%E8%A7%81%E6%80%A7%E7%9A%84/
https://www.cnblogs.com/chenssy/p/6393321.html
https://github.com/baidu/brpc-java/blob/master/brpc-java-communication/src/main/java/com/baidu/brpc/utils/ThreadPool.java

相关文章

网友评论

      本文标题:Volatile Happens-before 原则

      本文链接:https://www.haomeiwen.com/subject/kpehdktx.html