Semaphore
类的主要作用就是 控制线程并发的数量
1. 内部类 Sync
FairSync
NonfairSyn
Sync
内部类:
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 构造函数
*/
Sync(int permits) {
setState(permits);
}
/**
* 返回许可证的个数
*/
final int getPermits() {
return getState();
}
/**
* 减少reductions个许可证
*/
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) { // 发生溢出
throw new Error("Permit count underflow");
}
if (compareAndSetState(current, next)) { // 调用父类AQS的原子操作方法来更新state字段
return;
}
}
}
/**
* 消耗完所有的许可证, 返回所消耗的许可证的个数
*/
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0)) {
return current;
}
}
}
/**
* 非公平模式下尝试获取许可证的辅助方法
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
/**
* 重写AQS的tryReleaseShared方法来指出如何释放"锁"
* sync的公平版本和非公平版本均用到这个方法
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) { // 发生溢出
throw new Error("Maximum permit count exceeded");
}
if (compareAndSetState(current, next)) {
return true;
}
}
}
}
FairSync
内部类:
NonfairSync
内部类:
NonfairSync内部类
2. Semaphore
类的构造方法
Semaphore类的构造方法
3. Semaphore
类的常用方法
acquire系列方法
release系列方法
其他常用方法
4. 例子
使用 Semaphore
实现 多生产者/多消费者模式
/**
* @author dimdark
*/
public class CommonUtil {
/**
* 生产者数量
*/
public static final int PRODUCER_COUNT = 10;
/**
* 消费者数量
*/
public static final int CONSUMER_COUNT = 20;
/**
* 生产者与消费者在临界资源所用到的锁及相应的condition
*/
public static final Lock lock = new ReentrantLock();
public static final Condition consumeCondition = lock.newCondition();
public static final Condition produceCondition = lock.newCondition();
}
/**
* 容器, 用来存放食物
* @author dimdark
*/
public class Container {
public static final String[] food = new String[5]; // 只能存放5份食物
/**
* 判断容器是否已经没有食物
*/
public static boolean isFoodEmpty() {
return IntStream.range(0, food.length).noneMatch(i -> food[i] != null);
}
/**
* 判断容器是否已经装满食物
*/
public static boolean isFoodFull() {
return IntStream.range(0, food.length).allMatch(i -> food[i] != null);
}
/**
* 往容器里添加食物
* 注意: 该方法只能被生产者调用以确保容器此时至少有空间可以放下食物
*/
public static void putFood() {
for (int i = 0; i < food.length; ++i) {
if (food[i] == null) {
food[i] = "food-" + i;
return;
}
}
}
/**
* 向容器中取出食物
* 注意: 该方法只能被消费者调用以确保容器此时至少有一份食物
*/
public static void getFood() {
for (int i = 0; i < food.length; ++i) {
if (food[i] != null) {
food[i] = null;
return;
}
}
}
}
/**
* @author dimdark
*/
public class Producer extends Thread {
private static final Semaphore semaphore;
private static final Lock lock;
private static final Condition consumeCondition;
private static final Condition produceCondition;
static {
semaphore = new Semaphore(CommonUtil.PRODUCER_COUNT);
lock = CommonUtil.lock;
consumeCondition = CommonUtil.consumeCondition;
produceCondition = CommonUtil.produceCondition;
}
public void produce() {
try {
semaphore.acquire();
lock.lock();
while (Container.isFoodFull()) {
produceCondition.await();
}
Container.putFood();
System.out.println("producer " + Thread.currentThread().getName() + " produce food");
consumeCondition.signalAll();
} catch(InterruptedException e) {
System.err.println("producer " + Thread.currentThread().getName() + " happens some error!");
} finally {
lock.unlock();
semaphore.release();
}
}
@Override
public void run() {
while (true) {
produce();
}
}
}
/**
* @author dimdark
*/
public class Consumer extends Thread {
private static final Semaphore semaphore;
private static final Lock lock;
private static final Condition consumeCondition;
private static final Condition produceCondition;
static {
semaphore = new Semaphore(CommonUtil.CONSUMER_COUNT);
lock = CommonUtil.lock;
consumeCondition = CommonUtil.consumeCondition;
produceCondition = CommonUtil.produceCondition;
}
public void consume() {
try {
semaphore.acquire();
lock.lock();
while (Container.isFoodEmpty()) {
consumeCondition.await();
}
Container.getFood();
System.out.println("consumer " + Thread.currentThread().getName() + " consume food");
produceCondition.signalAll();
} catch(InterruptedException e) {
System.err.println("consumer " + Thread.currentThread().getName() + " happens some error!");
} finally {
lock.unlock();
semaphore.release();
}
}
@Override
public void run() {
while (true) {
consume();
}
}
}
/**
* @author dimdark
*/
public class Main {
public static void main(String[] args) {
Thread[] producers = new Thread[CommonUtil.PRODUCER_COUNT];
Thread[] consumers = new Thread[CommonUtil.CONSUMER_COUNT];
for (Thread consumer : consumers) {
consumer = new Consumer();
consumer.start();
}
for (Thread producer : producers) {
producer = new Producer();
producer.start();
}
}
}
网友评论