多线程对于共享资源可以进行写操作和读操作,容易存在并发问题。必须要确保以下三点:
- 多线程可以同时对共享资源进行读操作;
- 多线程不能同时对共享资源进行写操作;
- 多线程不能同时对共享资源进行写操作和读操作。
读/写操作能否并行化进行:
读/写 | READ | WRITE |
---|---|---|
READ | Y | N |
WRITE | N | N |
因此读操作加锁和写操作总加锁的逻辑是存在区别的,设计一个读写分离的锁时很有必要的。这样一个锁不仅要满足以上三点,还要实现写操作的高优先级设定。
读写锁实现
public class ReadWriteLock {
private int readingReaders = 0; // 正在读的线程
private int waitingReaders = 0; // 等待读的线程
private int writingWriters = 0; // 正在写的线程
private int waitingWriters = 0; // 等待写的线程
private boolean writePrefer = true; // 写优先级
/**
* 默认构造函数,写优先于读
*/
public ReadWriteLock() {
this(true);
}
public ReadWriteLock(boolean writeprefer) {
this.writePrefer = writeprefer;
}
/**
* 读加锁
*/
public synchronized void readLock() throws InterruptedException {
this.waitingReaders++;
try {
// 存在正在运行的写线程 || 写优先级高,存在正在等待写锁的写线程
while (writingWriters > 0 || (writePrefer && waitingWriters > 0)) {
this.wait();
}
this.readingReaders++;
} finally {
this.waitingReaders--;
}
}
/**
* 读释放锁
*/
public synchronized void readUnlock() {
this.readingReaders--;
this.notifyAll();
}
/**
* 写加锁
*/
public synchronized void writeLock() throws InterruptedException {
this.waitingWriters++;
try {
// 存在正在运行的写线程/读线程
while (writingWriters > 0 || readingReaders > 0) {
this.wait();
}
this.writingWriters++;
} finally {
this.waitingWriters--;
}
}
/**
* 写释放锁
*/
public synchronized void writeUnlock() {
this.writingWriters--;
this.notifyAll();
}
}
定义共享资源
public class ShardData {
private char[] buffer;
private final ReadWriteLock lock = new ReadWriteLock();
public ShardData(int size) {
buffer = new char[size];
for (int i = 0; i < size; i++) {
buffer[i] = '*';
}
}
public char[] read() throws InterruptedException {
try {
lock.readLock(); // 读之前加读锁
return this.doRead();
} finally {
lock.readUnlock(); // 释放读锁
}
}
private char[] doRead() {
char[] newBuffer = new char[buffer.length];
for (int i = 0; i < buffer.length; i++) {
newBuffer[i] = buffer[i];
}
slowly(50);
return newBuffer;
}
public void write(char c) throws InterruptedException {
try {
lock.writeLock(); // 写之前加写锁
this.doWrite(c);
} finally {
lock.writeUnlock(); // 释放写锁
}
}
private void doWrite(char c) {
for (int i = 0; i < buffer.length; i++) {
buffer[i] = c;
slowly(10);
}
}
private void slowly(int mills) {
try {
Thread.sleep(mills);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
定义写线程和读线程
public class WriterWorker extends Thread {
private static final Random random = new Random(System.currentTimeMillis());
private int index = 0;
private final ShardData shardData;
private final String filter;
public WriterWorker(ShardData shardData, String filter) {
this.shardData = shardData;
this.filter = filter;
}
@Override
public void run() {
try {
while (true) {
char c = nextChar();
shardData.write(c);
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private char nextChar() {
char c = filter.charAt(index);
index++;
if (index >= filter.length())
index = 0;
return c;
}
}
public class ReaderWorker extends Thread {
private final ShardData data;
public ReaderWorker(ShardData data) {
this.data = data;
}
@Override
public void run() {
try {
while (true) {
char[] readBuf = data.read();
System.out.println(Thread.currentThread().getName() + " reads " + String.valueOf(readBuf));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
客户端测试
public class Client {
public static void main(String[] args) {
// 共享资源
final ShardData data = new ShardData(10);
// 启5个读线程
new ReaderWorker(data).start();
new ReaderWorker(data).start();
new ReaderWorker(data).start();
new ReaderWorker(data).start();
new ReaderWorker(data).start();
// 启两个写线程
new WriterWorker(data, "ilovechina").start();
new WriterWorker(data, "ilovenanjing").start();
}
}
测试结果
![](https://img.haomeiwen.com/i12022128/526c0f6801d7d004.png)
网友评论