美文网首页
Semaphore 的作用、应用场景和实战

Semaphore 的作用、应用场景和实战

作者: wuchao226 | 来源:发表于2021-04-13 16:04 被阅读0次

    CyclicBarrier 的作用、应用场景和实战
    CountDownLatch 的作用、应用场景和实战

    概述

    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

    过程如下图所示:

    Semaphore 常用方法

    构造方法:

    public Semaphore(int permits)
    public Semaphore(int permits, boolean fair)
    
    • permits 表示许可线程的数量
    • fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

    常用方法:

    public void acquire() throws InterruptedException
    public boolean tryAcquire()
    public void release()
    public int availablePermits()
    public final int getQueueLength() 
    public final boolean hasQueuedThreads()
    protected void reducePermits(int reduction)
    protected Collection<Thread> getQueuedThreads()
    
    • acquire() 表示阻塞并获取许可
    • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
    • release() 表示释放许可
    • int availablePermits():返回此信号量中当前可用的许可证数。
    • int getQueueLength():返回正在等待获取许可证的线程数。
    • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
    • void reducePermit(int reduction):减少 reduction 个许可证
    • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

    Semaphore 使用场景

    可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求, 要读取几万个文件的数据,因为都是 IO 密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有 10 个,这时我们必须控制只有 10 个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用 Semaphore 来做流量控制。

    代码示例:

    /**
     * 类说明:演示Semaphore用法,一个数据库连接池的实现
     */
    public class DBPoolSemaphore {
    
        private final static int POOL_SIZE = 10;
        //两个指示器,分别表示池子还有可用连接和已用连接
        private final Semaphore useful, useless;
        //存放数据库连接的容器
        private static LinkedList<Connection> pool = new LinkedList<Connection>();
    
        //初始化池
        static {
            for (int i = 0; i < POOL_SIZE; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
    
        public DBPoolSemaphore() {
            this.useful = new Semaphore(10);
            this.useless = new Semaphore(0);
        }
    
        /*归还连接*/
        public void returnConnect(Connection connection) throws InterruptedException {
            if (connection != null) {
                System.out.println("当前有" + useful.getQueueLength() + "个线程等待数据库连接!!"
                        + "可用连接数:" + useful.availablePermits());
                useless.acquire();
                synchronized (pool) {
                    pool.addLast(connection);
                }
                useful.release();
            }
        }
    
        /*从池子拿连接*/
        public Connection takeConnect() throws InterruptedException {
            useful.acquire();
            Connection connection;
            synchronized (pool) {
                connection = pool.removeFirst();
            }
            useless.release();
            return connection;
        }
    }
    
    /**
     * 类说明:测试数据库连接池
     */
    public class AppTest {
    
        private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
    
        private static class BusiThread extends Thread {
            @Override
            public void run() {
                Random r = new Random();//让每个线程持有连接的时间不一样
                long start = System.currentTimeMillis();
                try {
                    Connection connect = dbPool.takeConnect();
                    System.out.println("Thread_" + Thread.currentThread().getId()
                            + "_获取数据库连接共耗时【" + (System.currentTimeMillis() - start) + "】ms.");
                    SleepTools.ms(100 + r.nextInt(100));//模拟业务操作,线程持有连接查询数据
                    System.out.println("查询数据完成,归还连接!");
                    dbPool.returnConnect(connect);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 50; i++) {
                Thread thread = new BusiThread();
                thread.start();
            }
        }
    }
    

    打印结果:

    Thread_12_获取数据库连接共耗时【0】ms.
    Thread_14_获取数据库连接共耗时【0】ms.
    Thread_17_获取数据库连接共耗时【0】ms.
    Thread_15_获取数据库连接共耗时【0】ms.
    Thread_18_获取数据库连接共耗时【0】ms.
    Thread_20_获取数据库连接共耗时【0】ms.
    Thread_13_获取数据库连接共耗时【0】ms.
    Thread_16_获取数据库连接共耗时【0】ms.
    Thread_11_获取数据库连接共耗时【0】ms.
    Thread_19_获取数据库连接共耗时【0】ms.
    查询数据完成,归还连接!
    当前有5个线程等待数据库连接!!可用连接数:0
    Thread_21_获取数据库连接共耗时【121】ms.
    查询数据完成,归还连接!
    当前有4个线程等待数据库连接!!可用连接数:0
    Thread_22_获取数据库连接共耗时【133】ms.
    查询数据完成,归还连接!
    当前有3个线程等待数据库连接!!可用连接数:0
    查询数据完成,归还连接!
    当前有2个线程等待数据库连接!!可用连接数:0
    Thread_23_获取数据库连接共耗时【145】ms.
    Thread_24_获取数据库连接共耗时【144】ms.
    查询数据完成,归还连接!
    查询数据完成,归还连接!
    当前有1个线程等待数据库连接!!可用连接数:0
    当前有1个线程等待数据库连接!!可用连接数:0
    Thread_25_获取数据库连接共耗时【158】ms.
    查询数据完成,归还连接!
    当前有0个线程等待数据库连接!!可用连接数:1
    查询数据完成,归还连接!
    查询数据完成,归还连接!
    当前有0个线程等待数据库连接!!可用连接数:2
    当前有0个线程等待数据库连接!!可用连接数:2
    查询数据完成,归还连接!
    当前有0个线程等待数据库连接!!可用连接数:4
    查询数据完成,归还连接!
    当前有0个线程等待数据库连接!!可用连接数:5
    查询数据完成,归还连接!
    当前有0个线程等待数据库连接!!可用连接数:6
    查询数据完成,归还连接!
    当前有0个线程等待数据库连接!!可用连接数:7
    查询数据完成,归还连接!
    当前有0个线程等待数据库连接!!可用连接数:8
    查询数据完成,归还连接!
    当前有0个线程等待数据库连接!!可用连接数:9
    

    从打印结果可以看出,一次只有 10 个线程执行 acquire(),只有线程进行 release() 方法后才会有别的线程执行 acquire()。

    注意: Semaphore 只是对资源并发访问的线程数进行监控,并不会保证线程安全。

    相关文章

      网友评论

          本文标题:Semaphore 的作用、应用场景和实战

          本文链接:https://www.haomeiwen.com/subject/sgcclltx.html