我们使用等待超时模式来构造一个简单的数据库连接池,在示例中模拟从连接池中获取、使用和释放连接的过程。而客户端获取连接的过程被设定为等待超时模式,也就是在1000毫秒内如果无法获取到可用的连接,将会返回客户端一个null。设定连接池的大小为10个,然后通过调节客户端的线程数来模拟无法获取连接的场景。
一、Connection接口
由于java.sql.Connection是一个接口,最终的实现是由数据库驱动提供方来实现的,考虑到只是一个示例,我们通过动态代理构造了一个Connection,该Connection的代理实现仅仅是在commit()方法调用时休眠100毫秒。
public class ConnectionDriver {
static class ConnectionHandler implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable{
if (method.getName().equals("commit")) {
TimeUnit.MILLISECONDS.sleep(100);
}
return null;
}
}
public static final Connection createConnection() {
return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
new Class[]{Connection.class},
new ConnectionHandler());
}
}
二、连接池定义
连接池通过构造函数初始化连接池的最大上限,通过一个双向队列来维护连接,调用方需要先调用fetchConnection(long)方法来指定在多少毫秒内超时获取连接,当连接使用完成以后,需要调用releaseConnection(Connection)方法将连接放回线程池。
public class ConnectionPool {
//用来存放数据库连接
private LinkedList<Connection> pool = new LinkedList<Connection>();
//初始化数据连接池连接
public ConnectionPool(int initialSize) {
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
pool.addLast(ConnectionDriver.createConnection());
}
}
}
//释放连接
public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (pool) {
//将连接放回连接池
pool.addLast(connection);
//通知连接池对象已经释放了连接
pool.notifyAll();
}
}
}
//超时获取连接
public Connection fetchConnection(long mills) throws InterruptedException {
synchronized (pool) {
// 完全超时
if (mills <= 0) {
while (pool.isEmpty()) {
pool.wait();
}
return pool.removeFirst();
} else {
//等到后超时时间后的将来的时间
long future = System.currentTimeMillis() + mills;
//等待时间
long remaining = mills;
//如果连接池为空 而且等待时间大于0则需要等待
while (pool.isEmpty() && remaining > 0) {
pool.wait(remaining);
remaining = future - System.currentTimeMillis();
}
//连接池有连接或者已经等待完超时时间,则获取数据库连接
Connection result = null;
if (!pool.isEmpty()) {
result = pool.removeFirst();
}
return result;
}
}
}
}
三、测试类
public class ConnectionPoolTest {
static ConnectionPool pool = new ConnectionPool(10);
static CountDownLatch start = new CountDownLatch(1);
static CountDownLatch end;
public static void main(String[] args) throws Exception {
int threadCount = 10;
end = new CountDownLatch(threadCount);
int count = 20;
AtomicInteger got = new AtomicInteger();
AtomicInteger notGot = new AtomicInteger();
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new ConnetionRunner(count, got, notGot),
"ConnectionRunnerThread");
thread.start();
}
start.countDown();
//等待所有获取连接线程执行完
end.await();
System.out.println("total invoke: " + (threadCount * count));
System.out.println("got connection: " + got);
System.out.println("not got connection " + notGot);
}
static class ConnetionRunner implements Runnable {
int count;
AtomicInteger got;
AtomicInteger notGot;
public ConnetionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}
public void run() {
try {
//所有线程在此等待并发去获取连接
start.await();
} catch (Exception ex) {
}
while (count > 0) {
try {
Connection connection = pool.fetchConnection(1000);
if (connection != null) {
try {
connection.createStatement();
connection.commit();
} finally {
pool.releaseConnection(connection);
got.incrementAndGet();
}
} else {
notGot.incrementAndGet();
}
} catch (Exception ex) {
} finally {
count--;
}
}
end.countDown();
}
}
}
上述示例中使用了CountDownLatch来确保ConnectionRunnerThread能够同时开始执行,并且在全部结束之后,才使main线程从等待状态中返回。
网友评论