首先创建一个池(对象创建代价高昂或者是临界资源),用于分配资源。
package concurrency;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
public class Pool<T> {
private int size;
private List<T> items = new ArrayList<T>();
private volatile boolean[] checkedOut;
private Semaphore avaliable;
public Pool(Class<T> classObject, int size){
this.size = size;
checkedOut = new boolean[size];
avaliable = new Semaphore(size, true);
try{
for(int i = 0; i < size; i++) {
items.add(classObject.getDeclaredConstructor().newInstance());
}
}catch(Exception e){
throw new RuntimeException(e);
}
}
public T checkOut() throws InterruptedException {
avaliable.acquire(); //P操作
return getItem();
}
public void checkIn(T item){
//确认归还成功再进行V操作,防止出现资源没有归还成功,但是又被申请的情况
if(releaseItem(item)) //V操作
avaliable.release();
}
private synchronized T getItem(){
for(int i = 0; i < size; i++){
if(!checkedOut[i]){
checkedOut[i] = true;
return items.get(i);
}
}
return null;
}
private synchronized boolean releaseItem(T item){
int index = items.indexOf(item);
if(index == -1) return false;
if(checkedOut[index]){
checkedOut[index] = false;
return true;
}
return false;
}
}
例子:一个创建代价高昂的对象类型
package concurrency;
public class Fat {
//使用volatile是为了防止优化
private volatile double d;
private static int counter = 0;
private final int id = counter++;
public Fat(){
for(int i = 0; i < 10000; i++){
d += (Math.E + Math.PI) / (double)i;
}
}
public void operation(){
System.out.println(this);
}
public String toString(){
return "Fat " + id;
}
}
测试Pool
package concurrency;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
class CheckOutTask<T> implements Runnable{
private static int counter = 0;
private final int id = counter++;
private Pool<T> pool;
public CheckOutTask(Pool<T> pool){
this.pool = pool;
}
@Override
public void run(){
try{
T item = pool.checkOut();
System.out.println("CheckOut " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println("CheckIn " + item);
pool.checkIn(item);
}catch(InterruptedException e){
//后续处理
}
}
}
public class SemaphoreDemo {
final static int SIZE = 25;
public static void main(String[] args) throws InterruptedException{
//创建资源池,用于分配Fat对象
final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
//创建线程池,用于分配线程
ExecutorService exec = Executors.newCachedThreadPool();
//提交任务,每个任务用于申请获得Fat对象和归还Fat对象
for(int i = 0; i < SIZE; i++)
exec.execute(new CheckOutTask<Fat>(pool));
System.out.println("All CheckOutTasks created.");
//申请所有对象池中的对象,如果已被申请,则等待
List<Fat> list = new ArrayList<Fat>();
for(int i = 0; i < SIZE; i++){
Fat f = pool.checkOut();
System.out.println(i + ": main() thread checkedOut()");
f.operation();
list.add(f);
}
//执行到这里的时候,对象池pool中的资源已经全部被申请,故下面的申请被阻塞
Future<?> blocked = exec.submit(new Runnable() {
@Override
public void run() {
try {
pool.checkOut();
} catch (InterruptedException e) {
System.out.println("CheckOut() interrupted.");
}
}
});
//演示被阻塞了2 seconds
TimeUnit.SECONDS.sleep(2);
//中断blocked进行的阻塞操作
blocked.cancel(true);
//归还
System.out.println("Checking in objects in " + list);
for(Fat f : list)
pool.checkIn(f);
//重复的归还,忽略该操作
for(Fat f : list)
pool.checkIn(f);
exec.shutdown();
}
}
网友评论