Semaphore使用

作者: 桐桑入梦 | 来源:发表于2020-03-20 09:51 被阅读0次

    首先创建一个池(对象创建代价高昂或者是临界资源),用于分配资源。

    package concurrency;
    
    import java.lang.reflect.InvocationTargetException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Semaphore;
    
    public class Pool<T> {
        private int size;
        private List<T> items = new ArrayList<T>();
        private volatile boolean[] checkedOut;
        private Semaphore avaliable;
    
        public Pool(Class<T> classObject,  int size){
            this.size = size;
            checkedOut = new boolean[size];
            avaliable = new Semaphore(size, true);
    
            try{
                for(int i = 0; i < size; i++) {
                    items.add(classObject.getDeclaredConstructor().newInstance());
                }
            }catch(Exception e){
                throw new RuntimeException(e);
            }
        }
    
        public T checkOut() throws InterruptedException {
            avaliable.acquire(); //P操作
            return getItem();
        }
    
        public void checkIn(T item){
            //确认归还成功再进行V操作,防止出现资源没有归还成功,但是又被申请的情况
            if(releaseItem(item)) //V操作
                avaliable.release();
        }
    
        private synchronized T getItem(){
            for(int i = 0; i < size; i++){
                if(!checkedOut[i]){
                    checkedOut[i] = true;
                    return items.get(i);
                }
            }
            return null;
        }
    
        private synchronized boolean releaseItem(T item){
            int index = items.indexOf(item);
            if(index == -1) return false;
            if(checkedOut[index]){
                checkedOut[index] = false;
                return true;
            }
            return false;
        }
    }
    

    例子:一个创建代价高昂的对象类型

    package concurrency;
    
    public class Fat {
        //使用volatile是为了防止优化
        private volatile double d;
        private static int counter = 0;
        private final int id = counter++;
        
        public Fat(){
            for(int i = 0; i < 10000; i++){
                d += (Math.E + Math.PI) / (double)i;
            }
        }
        
        public void operation(){
            System.out.println(this);
        }
        
        public String toString(){
            return "Fat " + id;
        }
    }
    

    测试Pool

    package concurrency;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    class CheckOutTask<T> implements Runnable{
        private static int counter = 0;
        private final int id = counter++;
        private Pool<T> pool;
    
        public CheckOutTask(Pool<T> pool){
            this.pool = pool;
        }
    
        @Override
        public void run(){
            try{
                T item = pool.checkOut();
                System.out.println("CheckOut " + item);
                TimeUnit.SECONDS.sleep(1);
                System.out.println("CheckIn " + item);
                pool.checkIn(item);
            }catch(InterruptedException e){
                //后续处理
            }
        }
    }
    
    public class SemaphoreDemo {
        final static int SIZE = 25;
    
        public static void main(String[] args) throws InterruptedException{
            //创建资源池,用于分配Fat对象
            final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
            //创建线程池,用于分配线程
            ExecutorService exec = Executors.newCachedThreadPool();
            //提交任务,每个任务用于申请获得Fat对象和归还Fat对象
            for(int i = 0; i < SIZE; i++)
                exec.execute(new CheckOutTask<Fat>(pool));
            System.out.println("All CheckOutTasks created.");
    
            //申请所有对象池中的对象,如果已被申请,则等待
            List<Fat> list = new ArrayList<Fat>();
            for(int i = 0; i < SIZE; i++){
                Fat f = pool.checkOut();
                System.out.println(i + ": main() thread checkedOut()");
                f.operation();
                list.add(f);
            }
    
            //执行到这里的时候,对象池pool中的资源已经全部被申请,故下面的申请被阻塞
            Future<?> blocked = exec.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        pool.checkOut();
                    } catch (InterruptedException e) {
                        System.out.println("CheckOut() interrupted.");
                    }
                }
            });
            //演示被阻塞了2 seconds
            TimeUnit.SECONDS.sleep(2);
            //中断blocked进行的阻塞操作
            blocked.cancel(true);
    
            //归还
            System.out.println("Checking in objects in " + list);
            for(Fat f : list)
                pool.checkIn(f);
    
            //重复的归还,忽略该操作
            for(Fat f : list)
                pool.checkIn(f);
    
            exec.shutdown();
        }
    }
    

    相关文章

      网友评论

        本文标题:Semaphore使用

        本文链接:https://www.haomeiwen.com/subject/uqudyhtx.html