美文网首页
等待超时模式&一个带超时时间的数据库连接池Demo

等待超时模式&一个带超时时间的数据库连接池Demo

作者: 火火说技术 | 来源:发表于2018-04-11 21:22 被阅读0次

等待超时模式&一个带超时时间的数据库连接池Demo

调用一个方法的时候等待一定的时间,如果该方法可以再给定时间之内获取到结果,则将结果立刻进行返回,反之,超时返回默认的结果。

在等待/通知的经典范式的基础上

增加两个变量

REMAINING = T (等待时间)
FUTURE = now + T (等待时间)

执行过程简化为, wait(REMAINING),在执行 REMAINING = FURURE - now 如果REMAINING小于等于0表示已经超时了,name就需要直接退出,否则再继续执行 wait(REMAINING)的操作。

伪代码:

public synchronized Object get(long time) throws InterruptedException{
    long future = System.currentTimeMillis() + time;
    long remaining = mills;
    // 当超时的时间大于0并且result返回值不满足要求的时候, 继续等待
    while ((result == null) && remaining > 0){
        // 等待remaining时间
        wait(remaining);
        
        // 控制睡眠时间
        remaining = future - System.currentTimeMillis();
    }
    return result;
}

以上的方式在原来等待/通知的经典范式上增加了等待时间,而不是一直进行wait操作,比原有的范式增加了部分的灵活性。按照调用者的要求“按时”返回

一个简单的数据库连接池的示例

利用超时等待模式,来构建一个简单的数据库的连接池,并且在客户端获取连接的时候,可以进行指定时间的获取,如果在指定时间内没有获取到,则返回给客户端一个null。设定连接池的大小为5个。

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;

/**
 * 一个简单的数据库连接池
 *
 * @author gongyan
 * @date 2018/4/10
 */
public class DBConnectionPool {

    /**
     * 内部连接池队列
     */
    private LinkedList<Connection> pool = new LinkedList<Connection>();

    /**
     * 构造连接数
     *
     * @param initSize
     */
    public DBConnectionPool(int initSize) {
        if (initSize > 0) {
            for (int i = 0; i < initSize; i++) {
                pool.add(ConnectionDriver.ConnectionHandler.createConnection());
            }
        }
    }

    /**
     * 归还数据库的连接
     *
     * @param connection
     */
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool) {
                // 链接释放后需要进行通知,这样其他消费者可以感知到连接池中也已经归还了一个链接方便获取
                pool.addLast(connection);

                // 通知其他的线程可以来获取连接了
                pool.notifyAll();
            }
        }
    }

    /**
     * 获取数据库的链接
     *
     * @return
     */
    public Connection getConnection(long timeout) throws InterruptedException {
        synchronized (pool) {
            if (timeout <= 0) {
                while (pool.isEmpty()) {
                    pool.wait();
                }
                return pool.removeFirst();
            } else {
                // 设置超时时间
                long future = timeout + System.currentTimeMillis();

                // 设置停顿时间
                long remaining = timeout;
                while (pool.isEmpty() && remaining > 0) {
                    pool.wait(remaining);
                    remaining = future - System.currentTimeMillis();
                }
                Connection conn = null;
                if (!pool.isEmpty()) {
                    conn = pool.removeFirst();
                }
                return conn;
            }
        }
    }

}


/**
 * 链接的创建类
 */
class ConnectionDriver {

    /**
     * 生成一个代理类
     */
    static class ConnectionHandler implements InvocationHandler {

        /**
         * 代理
         *
         * @param proxy
         * @param method
         * @param args
         * @return
         * @throws Throwable
         */
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if (method.getName().equals("commit")) {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread() + " 睡眠结束");
            }
            return null;
        }

        /**
         * 创建代理对象
         *
         * @return
         */
        public static final Connection createConnection() {
            return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClass().getClassLoader(), new Class[]{Connection.class}, new ConnectionHandler());
        }
    }
}

运行入口类

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 连接池测试类
 *
 * @author gongyan
 * @date 2018/4/10
 */
public class MainTest {

    /**
     * 数据库连接池
     */
    private static DBConnectionPool dbConnectionPool = new DBConnectionPool(5);

    /**
     * 统一开始信号
     */
    static CountDownLatch start = new CountDownLatch(1);

    static CountDownLatch end;

    public static void main(String[] args) throws Exception {

        int threadCount = 6;
        end = new CountDownLatch(threadCount);

        /** 每个线程要去拿多少次 */
        int count = 2;

        /** 设置获取和非获取 */
        AtomicInteger got = new AtomicInteger(0);
        AtomicInteger notGot = new AtomicInteger(0);

        /** 线程数 */
        for (int i = 0; i < threadCount; i++) {
            new Thread(new ConnectionRunner(count, got, notGot), "getThread" + i).start();
        }

        /** 全部线程统一开始 */
        start.countDown();

        /** 等待全部结束 */
        end.await();

        System.out.println("total invoke: " + (threadCount * count));
        System.out.println("got connection: " + got);
        System.out.println("notGot connection: " + notGot);

    }

    /**
     * 获取连接的执行器
     */
    static class ConnectionRunner implements Runnable {

        /**
         * 拿到了多少次
         */
        AtomicInteger got;
        /**
         * 没拿到多少次
         */
        AtomicInteger notGot;
        /**
         * 总共拿多少次
         */
        int count;

        public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        public void run() {

            /** 等待统一的指令 */
            try {
                start.await();
            } catch (Exception e) {

            }

            while (count > 0) {
                try {

                    /** 超时时间1s 获取连接,如果没有获取到则返回null*/
                    Connection connection = dbConnectionPool.getConnection(1);
                    if (connection != null) {
                        System.out.println(Thread.currentThread() + ",获取到了连接");
                        try {
                            connection.createStatement();
                            connection.commit();
                            got.incrementAndGet();
                        } finally {
                            dbConnectionPool.releaseConnection(connection);
                        }
                    } else {
                        System.out.println(Thread.currentThread() + ",没获取到了连接");
                        notGot.incrementAndGet();
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    count--;
                }
            }

            end.countDown();
        }
    }
}

Thread[getThread4,5,main],获取到了连接
Thread[getThread2,5,main],获取到了连接
Thread[getThread5,5,main],获取到了连接
Thread[getThread1,5,main],获取到了连接
Thread[getThread0,5,main],获取到了连接
Thread[getThread3,5,main],没获取到了连接
Thread[getThread3,5,main],没获取到了连接
Thread[getThread0,5,main] 睡眠结束
Thread[getThread5,5,main] 睡眠结束
Thread[getThread0,5,main],获取到了连接
Thread[getThread4,5,main] 睡眠结束
Thread[getThread2,5,main] 睡眠结束
Thread[getThread4,5,main],获取到了连接
Thread[getThread5,5,main],获取到了连接
Thread[getThread1,5,main] 睡眠结束
Thread[getThread2,5,main],获取到了连接
Thread[getThread1,5,main],获取到了连接
Thread[getThread4,5,main] 睡眠结束
Thread[getThread5,5,main] 睡眠结束
Thread[getThread2,5,main] 睡眠结束
Thread[getThread0,5,main] 睡眠结束
Thread[getThread1,5,main] 睡眠结束
total invoke: 12
got connection: 10
notGot connection: 2

代码都可以跑起来的,感兴趣的同学可以跑跑试试

相关文章

网友评论

      本文标题:等待超时模式&一个带超时时间的数据库连接池Demo

      本文链接:https://www.haomeiwen.com/subject/zvttkftx.html