Semaphore使用

作者: 桐桑入梦 | 来源:发表于2020-03-20 09:51 被阅读0次

首先创建一个池(对象创建代价高昂或者是临界资源),用于分配资源。

package concurrency;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;

public class Pool<T> {
    private int size;
    private List<T> items = new ArrayList<T>();
    private volatile boolean[] checkedOut;
    private Semaphore avaliable;

    public Pool(Class<T> classObject,  int size){
        this.size = size;
        checkedOut = new boolean[size];
        avaliable = new Semaphore(size, true);

        try{
            for(int i = 0; i < size; i++) {
                items.add(classObject.getDeclaredConstructor().newInstance());
            }
        }catch(Exception e){
            throw new RuntimeException(e);
        }
    }

    public T checkOut() throws InterruptedException {
        avaliable.acquire(); //P操作
        return getItem();
    }

    public void checkIn(T item){
        //确认归还成功再进行V操作,防止出现资源没有归还成功,但是又被申请的情况
        if(releaseItem(item)) //V操作
            avaliable.release();
    }

    private synchronized T getItem(){
        for(int i = 0; i < size; i++){
            if(!checkedOut[i]){
                checkedOut[i] = true;
                return items.get(i);
            }
        }
        return null;
    }

    private synchronized boolean releaseItem(T item){
        int index = items.indexOf(item);
        if(index == -1) return false;
        if(checkedOut[index]){
            checkedOut[index] = false;
            return true;
        }
        return false;
    }
}

例子:一个创建代价高昂的对象类型

package concurrency;

public class Fat {
    //使用volatile是为了防止优化
    private volatile double d;
    private static int counter = 0;
    private final int id = counter++;
    
    public Fat(){
        for(int i = 0; i < 10000; i++){
            d += (Math.E + Math.PI) / (double)i;
        }
    }
    
    public void operation(){
        System.out.println(this);
    }
    
    public String toString(){
        return "Fat " + id;
    }
}

测试Pool

package concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class CheckOutTask<T> implements Runnable{
    private static int counter = 0;
    private final int id = counter++;
    private Pool<T> pool;

    public CheckOutTask(Pool<T> pool){
        this.pool = pool;
    }

    @Override
    public void run(){
        try{
            T item = pool.checkOut();
            System.out.println("CheckOut " + item);
            TimeUnit.SECONDS.sleep(1);
            System.out.println("CheckIn " + item);
            pool.checkIn(item);
        }catch(InterruptedException e){
            //后续处理
        }
    }
}

public class SemaphoreDemo {
    final static int SIZE = 25;

    public static void main(String[] args) throws InterruptedException{
        //创建资源池,用于分配Fat对象
        final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
        //创建线程池,用于分配线程
        ExecutorService exec = Executors.newCachedThreadPool();
        //提交任务,每个任务用于申请获得Fat对象和归还Fat对象
        for(int i = 0; i < SIZE; i++)
            exec.execute(new CheckOutTask<Fat>(pool));
        System.out.println("All CheckOutTasks created.");

        //申请所有对象池中的对象,如果已被申请,则等待
        List<Fat> list = new ArrayList<Fat>();
        for(int i = 0; i < SIZE; i++){
            Fat f = pool.checkOut();
            System.out.println(i + ": main() thread checkedOut()");
            f.operation();
            list.add(f);
        }

        //执行到这里的时候,对象池pool中的资源已经全部被申请,故下面的申请被阻塞
        Future<?> blocked = exec.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    pool.checkOut();
                } catch (InterruptedException e) {
                    System.out.println("CheckOut() interrupted.");
                }
            }
        });
        //演示被阻塞了2 seconds
        TimeUnit.SECONDS.sleep(2);
        //中断blocked进行的阻塞操作
        blocked.cancel(true);

        //归还
        System.out.println("Checking in objects in " + list);
        for(Fat f : list)
            pool.checkIn(f);

        //重复的归还,忽略该操作
        for(Fat f : list)
            pool.checkIn(f);

        exec.shutdown();
    }
}

相关文章

网友评论

    本文标题:Semaphore使用

    本文链接:https://www.haomeiwen.com/subject/uqudyhtx.html