美文网首页
java线程之Semaphore类基本使用

java线程之Semaphore类基本使用

作者: dimdark | 来源:发表于2018-03-28 13:28 被阅读0次

    Semaphore 类的主要作用就是 控制线程并发的数量

    1. 内部类 Sync FairSync NonfairSyn

    Sync 内部类:

    abstract static class Sync extends AbstractQueuedSynchronizer {
            /**
             * 构造函数
             */
            Sync(int permits) {
                setState(permits);
            }
    
            /** 
             * 返回许可证的个数
             */
            final int getPermits() {
                return getState();
            }
    
            /**
             * 减少reductions个许可证
             */
            final void reducePermits(int reductions) {
                for (;;) {
                    int current = getState();
                    int next = current - reductions;
                    if (next > current) { // 发生溢出
                        throw new Error("Permit count underflow");
                    }
                    if (compareAndSetState(current, next)) { // 调用父类AQS的原子操作方法来更新state字段
                        return;
                    }
                }
            }
    
            /**
             * 消耗完所有的许可证, 返回所消耗的许可证的个数
             */
            final int drainPermits() {
                for (;;) {
                    int current = getState();
                    if (current == 0 || compareAndSetState(current, 0)) {
                        return current;
                    }
                }
            }
    
            /**
             * 非公平模式下尝试获取许可证的辅助方法
             */
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 || compareAndSetState(available, remaining)) {
                        return remaining;
                    }
                }
            }
    
            /**
             * 重写AQS的tryReleaseShared方法来指出如何释放"锁"
             * sync的公平版本和非公平版本均用到这个方法
             */
            protected final boolean tryReleaseShared(int releases) {
                 for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) { // 发生溢出
                        throw new Error("Maximum permit count exceeded");
                    }
                    if (compareAndSetState(current, next)) {
                        return true;
                    }
                 }
            }
    
        }
    

    FairSync 内部类:

    FairSync内部类
    NonfairSync 内部类: NonfairSync内部类
    2. Semaphore 类的构造方法
    Semaphore类的构造方法
    3. Semaphore 类的常用方法
    acquire系列方法
    release系列方法
    其他常用方法
    4. 例子

    使用 Semaphore 实现 多生产者/多消费者模式

    /**
     * @author dimdark
     */
    public class CommonUtil {
    
        /**
         * 生产者数量
         */
        public static final int PRODUCER_COUNT = 10;
    
        /**
         * 消费者数量
         */
        public static final int CONSUMER_COUNT = 20;
    
        /**
         * 生产者与消费者在临界资源所用到的锁及相应的condition
         */
        public static final Lock lock = new ReentrantLock();
        public static final Condition consumeCondition = lock.newCondition();
        public static final Condition produceCondition = lock.newCondition();
    
    }
    
    /**
     * 容器, 用来存放食物
     * @author dimdark
     */
    public class Container {
    
        public static final String[] food = new String[5]; // 只能存放5份食物
    
        /**
         * 判断容器是否已经没有食物
         */
        public static boolean isFoodEmpty() {
            return IntStream.range(0, food.length).noneMatch(i -> food[i] != null);
        }
    
        /**
         * 判断容器是否已经装满食物
         */
        public static boolean isFoodFull() {
            return IntStream.range(0, food.length).allMatch(i -> food[i] != null);
        }
    
        /**
         * 往容器里添加食物
         * 注意: 该方法只能被生产者调用以确保容器此时至少有空间可以放下食物
         */
        public static void putFood() {
            for (int i = 0; i < food.length; ++i) {
                if (food[i] == null) {
                    food[i] = "food-" + i;
                    return;
                }
            }
        }
    
        /**
         * 向容器中取出食物
         * 注意: 该方法只能被消费者调用以确保容器此时至少有一份食物
         */
        public static void getFood() {
            for (int i = 0; i < food.length; ++i) {
                if (food[i] != null) {
                    food[i] = null;
                    return;
                }
            }
        }
    
    }
    
    /**
     * @author dimdark
     */
    public class Producer extends Thread {
    
        private static final Semaphore semaphore;
        private static final Lock lock;
        private static final Condition consumeCondition;
        private static final Condition produceCondition;
    
        static {
            semaphore = new Semaphore(CommonUtil.PRODUCER_COUNT);
            lock = CommonUtil.lock;
            consumeCondition = CommonUtil.consumeCondition;
            produceCondition = CommonUtil.produceCondition;
        }
    
        public void produce() {
            try {
                semaphore.acquire();
                lock.lock();
                while (Container.isFoodFull()) {
                    produceCondition.await();
                }
                Container.putFood();
                System.out.println("producer " + Thread.currentThread().getName() + " produce food");
                consumeCondition.signalAll();
            } catch(InterruptedException e) {
                System.err.println("producer " + Thread.currentThread().getName() + " happens some error!");
            } finally {
                lock.unlock();
                semaphore.release();
            }
        }
    
        @Override
        public void run() {
            while (true) {
                produce();
            }
        }
    
    }
    
    /**
     * @author dimdark
     */
    public class Consumer extends Thread {
    
        private static final Semaphore semaphore;
        private static final Lock lock;
        private static final Condition consumeCondition;
        private static final Condition produceCondition;
    
        static {
            semaphore = new Semaphore(CommonUtil.CONSUMER_COUNT);
            lock = CommonUtil.lock;
            consumeCondition = CommonUtil.consumeCondition;
            produceCondition = CommonUtil.produceCondition;
        }
    
        public void consume() {
            try {
                semaphore.acquire();
                lock.lock();
                while (Container.isFoodEmpty()) {
                    consumeCondition.await();
                }
                Container.getFood();
                System.out.println("consumer " + Thread.currentThread().getName() + " consume food");
                produceCondition.signalAll();
            } catch(InterruptedException e) {
                System.err.println("consumer " + Thread.currentThread().getName() + " happens some error!");
            } finally {
                lock.unlock();
                semaphore.release();
            }
        }
    
        @Override
        public void run() {
            while (true) {
                consume();
            }
        }
    
    }
    
    /**
     * @author dimdark
     */
    public class Main {
    
        public static void main(String[] args) {
            Thread[] producers = new Thread[CommonUtil.PRODUCER_COUNT];
            Thread[] consumers = new Thread[CommonUtil.CONSUMER_COUNT];
            for (Thread consumer : consumers) {
                consumer = new Consumer();
                consumer.start();
            }
            for (Thread producer : producers) {
                producer = new Producer();
                producer.start();
            }
        }
    
    }
    

    相关文章

      网友评论

          本文标题:java线程之Semaphore类基本使用

          本文链接:https://www.haomeiwen.com/subject/bgsfqftx.html