锁定的核心语言惯用法一直是用于方法和静态语句块的synchronized关键字。这个关键字实际上是硬连接到HotSpot JVM中的。我们在代码中分配的每个对象,无论是String,Array还是成熟的JSON文档,都在本机GC级别的标头中内置锁定功能。对于JIT编译器(根据特定锁的特定状态和争用级别来编译和重新编译字节码)也是如此。
同步块的问题在于它们全部或全部不存在—关键部分中不能有多个线程。尤其在消费者/生产者场景中,某些线程尝试专门编辑一些数据,而其他线程仅尝试读取数据并且可以共享访问权限。
ReadWriteLocks是一个完美解决方案。您可以指定哪些线程阻止其他所有线程(写线程),以及哪些线程与其他线程一起使用以消费内容(读线程)。
与同步块不同,RW锁不是JVM内置的,但是有相同的功能。尽管如此,要实现锁定习惯,您仍然需要指示CPU自动执行特定操作或以特定顺序执行操作,以避免出现竞争情况。传统上,这是通过进入JVM的神奇门户完成的—the unsafe class。RW锁使用“比较交换”(CAS)操作将值直接设置为内存,这是其线程排队算法的一部分。
即便如此,RW锁仍然不够快,有时甚至被证明确实很慢,有时候常常为之困扰。但是,JDK1.8有了新的StampedLock。该RW锁采用了Java 8 JDK中新增的一组算法和内存防护功能,使该锁更快更可靠。
使用锁。 从表面上看,StampedLocks的使用更为复杂。他们采用的是long型的stamp概念,可以用作任何锁定/解锁操作所使用的票证。这意味着要解锁R / W操作,您需要为其传递相关的锁定标记。传递错误的stamp,您将遇到异常甚至更糟的意外行为的风险。
另一个需要牢记的关键是,与RW锁不同,StampedLocks不可重入。因此,尽管它们可能更快,但它们有一个缺点,那就是线程现在可以使自己陷入死锁。实际上,这意味着比以往任何时候都更应该确保锁和stamp不会逸出其封闭的代码块。
long stamp = lock.writeLock(); //blocking lock, returns a stamp
try {
write(stamp); // this is a bad move, you’re letting the stamp escape
} finally {
lock.unlock(stamp); // release the lock in the same block - way better
}
我对这种设计的另一个讨厌之处是,stamp被定义为long型的值,对您实际上没有任何意义。我希望使用锁定操作来返回一个描述stamp的对象-其类型(R / W),锁定时间,拥有者线程等。这样可以使调试和日志记录更加容易。但是,这可能是有意的,目的是防止开发人员在代码的不同部分之间传递stamp,并节省分配对象的成本。
乐观锁。 就此锁的新功能而言,最重要的部分是新的乐观锁模式。研究和实践经验表明,读操作在大多数情况下都无法与写操作相平衡。结果,获得一个成熟的读取锁可能被证明是过大的。更好的方法可能是继续执行读取,并在读取的最后查看该值是否实际上已被修改。如果是这种情况,您可以重试读取,或者升级到较重的锁。
long stamp = lock.tryOptimisticRead(); // non blocking
read();
if (!lock.validate(stamp)) { // if a write occurred, try again with a read lock
long stamp = lock.readLock();
try {
read();
} finally {
lock.unlock(stamp);
}
}
选择锁的最大麻烦之一是,其在生产中的实际行为将根据应用程序状态而有所不同。这意味着锁的选择不能凭空选择,而必须考虑代码将在其下执行的实际条件。
并发读取器线程与写入器线程的数量将更改您应使用的锁—Synchronized同步块或RW锁。由于这些数字在JVM的生命周期中会发生变化,因此这变得更加困难,具体取决于应用程序状态和线程争用。
为了说明这一点,我对不同锁定级别和R / W线程组合下的四种锁定模式进行了压力测试—Symchronized同步块锁定,RW锁定,Stamped RW锁定和RW乐观锁定。读取器线程将消耗计数器的值,而写入器线程将其从0递增到100000000。
代码
计数器接口
public interface Counter {
/**
* 获取当前计数
*/
public long getCounter();
/**
* 计数自增
*/
public void increment();
}
实现一、Synchronized同步代码块
public class Synchronized implements Counter {
private Object lock = new Object();
private int counter;
@Override
public long getCounter() {
synchronized (lock) {
return counter;
}
}
@Override
public void increment() {
synchronized (lock) {
++counter;
}
}
}
实现二、ReentrantReadWriteLock读写锁
public class RWLock implements Counter {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private Lock rlock = lock.readLock();
private Lock wlock = lock.writeLock();
private long counter;
@Override
public long getCounter() {
try {
rlock.lock();
return counter;
} finally {
rlock.unlock();
}
}
@Override
public void increment() {
try {
wlock.lock();
++counter;
} finally {
wlock.unlock();
}
}
}
实现三、StampedLock悲观锁
public class Stamped implements Counter {
private StampedLock lock = new StampedLock();
private long counter;
public long s, t;
@Override
public long getCounter() {
long stamp = lock.tryOptimisticRead();
try {
long result = counter;
if (lock.validate(stamp)) {
return result;
}
stamp = lock.readLock();
result = counter;
lock.unlockRead(stamp);
return counter;
} finally {
}
}
@Override
public void increment() {
long stamp = lock.writeLock();
try {
++counter;
} finally {
lock.unlockWrite(stamp);
}
}
}
实现四、StampedLock乐观锁
public class OptimisticStamped implements Counter {
private StampedLock rwlock = new StampedLock();
private long counter;
private long success;
private long total;
@Override
public long getCounter() {
long stamp = rwlock.tryOptimisticRead();
total++;
if (rwlock.validate(stamp)) {
success++;
return counter;
}
return counter;
}
@Override
public void increment() {
long stamp = rwlock.writeLock();
try {
++counter;
} finally {
rwlock.unlockWrite(stamp);
}
}
public long getSuccess() {
return success;
}
public long getTotal() {
return total;
}
}
读任务
public class Reader implements Runnable {
private final Counter counter;
public Reader(Counter counter)
{
this.counter = counter;
}
@Override
public void run() {
while (true) {
if (Thread.interrupted()) {
break;
}
long count = this.counter.getCounter();
if (count > Main.TARGET_NUMBER) {
Main.publish(System.currentTimeMillis());
break;
}
}
}
}
写任务
public class Writer implements Runnable {
private final Counter counter;
public Writer(Counter counter) {
this.counter = counter;
}
@Override
public void run() {
// 写线程,不断自增
while (true) {
if (Thread.interrupted()) {
break;
}
counter.increment();
}
}
}
测试类
public class Main {
// 默认计数最大值
public static long TARGET_NUMBER = 100000000L;
// 默认线程数
public static int READTHREADS = 10;
// 默认线程数
public static int WRITETHREADS = 10;
// 默认测试轮数
public static int ROUNDS = 10;
// 计数器
private static String COUNTER = Counters.DIRTY.toString();
private static ExecutorService es;
private static int round;
// 计时开始
private static long start;
private static Boolean[] rounds;
/**
* 计数器类型(锁类型)
*/
private enum Counters {
DIRTY,
VOLATILE,
SYNCHRONIZED,
RWLOCK,
ATOMIC,
ADDER,
STAMPED,
OPTIMISTIC
}
public static void main(String[] args) {
COUNTER = Counters.DIRTY.toString();
// 设置参数
if (args.length > 0)
{
COUNTER = args[0];
}
if (args.length > 1)
{
READTHREADS = Integer.valueOf(args[1]);
}
if (args.length > 2)
{
WRITETHREADS = Integer.valueOf(args[2]);
}
if (args.length > 3)
{
ROUNDS = Integer.valueOf(args[3]);
}
if (args.length > 4)
{
TARGET_NUMBER = Long.valueOf(args[4]);
}
rounds = new Boolean[ROUNDS];
System.out.println("Using " + COUNTER + ". readthreads: " + READTHREADS + ". writethreads: " + WRITETHREADS +
". rounds: " + ROUNDS + ". Target: " + TARGET_NUMBER);
for (round = 0; round < ROUNDS; round++) {
rounds[round] = Boolean.FALSE;
// 获取计数器类型
Counter counter = getCounter();
// 创建线程池
es = Executors.newFixedThreadPool(READTHREADS + WRITETHREADS);
start = System.currentTimeMillis();
for (int j = 0; j < READTHREADS; j ++)
{
es.execute(new Reader(counter));
}
for (int j = 0; j < WRITETHREADS; j ++)
{
es.execute(new Writer(counter));
}
try {
es.awaitTermination(10, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 实例化计数器
*/
public static Counter getCounter() {
Counters counterTypeEnum = Counters.valueOf(COUNTER);
switch (counterTypeEnum) {
case ADDER:
return new Adder();
case ATOMIC:
return new Atomic();
case DIRTY:
return new Dirty();
case RWLOCK:
return new RWLock();
case SYNCHRONIZED:
return new Synchronized();
case VOLATILE:
return new Volatile();
case STAMPED:
return new Stamped();
case OPTIMISTIC:
return new OptimisticStamped();
default:
return null;
}
}
public static void publish(long end) {
synchronized (rounds[round]) {
if (Boolean.FALSE.equals(rounds[round])) {
// 打印耗费时长
System.out.println("round" + round + " cost: " + (end - start));
rounds[round] = Boolean.TRUE;
es.shutdownNow();
}
}
}
}
测试
5 readers vs. 5 writers:
5 readers vs. 5 writers.png堆叠五个并发的读取器线程和五个写入器线程,我们看到Stamped的锁性能比同步块提高了3倍。RW锁定也表现良好。奇怪的是,乐观锁在表面上应该是最快的,但实际上却是最慢的。
10 readers vs. 10 writers:
10 readers vs. 10 writers.png接下来,我将争用级别增加到十个写线程和十个读线程。在这里,情况开始发生重大变化。RW锁现在比在相同级别上执行的Stamped锁和同步锁慢一个数量级。请注意,令人惊讶的是乐观锁定仍然是最慢的的Stamped RW锁定。
16 readers vs. 4 writers:
16 readers vs. 4 writers.png接下来,我保持高水平的争论,同时通过读者线程来实现平衡:16位读者与4位作者。RW锁继续证明了其被替换的原因—慢了一百倍。Stamped和Optimistic的性能很好,Synchronized同步块也不差。
19 readers vs. 1 writers:
19 readers vs. 1 writers.png最后,我研究了单个写者线程如何针对19个读者进行处理。注意,结果要慢得多,因为单线程需要更长的时间才能完成工作。在这里,我们得到一些非常有趣的结果。毫不奇怪,RW锁需要无穷大才能完成。Stamped锁定并没有做得更好…乐观锁定在这里显然是赢家,比RW锁定高100倍。即使如此,请记住,此锁定模式可能会使您失败,因为在此期间可能会发生写入器。Synchronized,我们忠实的老客户,同步化继续取得可喜的成果。
结论:似乎平均而言,最佳性能仍然是由内部同步锁提供的。 即使这样,这里的意思并不是说它将在所有情况下都表现最佳。 主要是为了表明,在将代码投入生产之前,应该基于测试预期的争用级别以及读取器和写入器线程之间的划分来选择锁定习惯用法。 否则,您将冒着严重的生产调试痛苦的风险。
网友评论