声明:原创文章,转载请注明出处。http://www.jianshu.com/u/e02df63eaa87
在上一节中,介绍了Thrift框架的基本情况和使用,本节针对上节中最后实现的四则运算服务实现一个简单的连接池。
1、Apache Common-pool2简介
Apache Common-pool2包提供了一个通用对象池实现,可方便地基于此实现对象池。对象的创建和销毁在一定程度上会消耗系统的资源,虽然JVM想性能得到了很大的提升,对于多数对象来说,没必要利用对象池进行对象的创建和管理,但是对于线程、TCP连接、数据库连接等对象,其创建与销毁的代价是很大的,因此对象池技术还是有其存在的意义。
Common-pool2由三大模块组成:ObjectPool、PooledObject和PooledObjectFactory。
- ObjectPool:提供所有对象的存取管理。
- PooledObject:池化的对象,是对对象的一个包装,加上了对象的一些其他信息,包括对象的状态(已用、空闲),对象的创建时间等。
- PooledObjectFactory:工厂类,负责池化对象的创建,对象的初始化,对象状态的销毁和对象状态的验证。
ObjectPool会持有PooledObjectFactory,将具体的对象的创建、初始化、销毁等任务交给它处理,其操作对象是PooledObject,即具体的Object的包装类。
2、 Thrift连接池实现
2.1 Node节点
用于保存服务端的IP、Port等信息
public class Node {
private static final Logger logger = LogManager.getLogger(Node.class);
private String ip;
private int port;
public Node() {
}
public Node(String ip, int port) {
this.ip = ip;
this.port = port;
}
public static Logger getLogger() {
return logger;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@Override
public String toString() {
return "Node{" +
"ip='" + ip + '\'' +
", port=" + port +
'}';
}
}
2.2 Factory
回调。用于对象创建,销毁,验证,激活、钝化等。
public class ConnectionFactory implements PooledObjectFactory<TServiceClient> {
private static final Logger logger = LogManager.getLogger(ConnectionFactory.class);
private Node node;
private int timeout;
public ConnectionFactory(Node node, int timeout) {
this.node = node;
this.timeout = timeout;
}
public PooledObject<TServiceClient> makeObject() throws Exception {
try {
TTransport tTransport = new TFramedTransport(new TSocket(node.getIp(), node.getPort(), timeout));
tTransport.open();
// 具体的连接池
ComputeServer.Client client = new ComputeServer.Client(new TBinaryProtocol(tTransport));
logger.info("connect to server success.");
return new DefaultPooledObject<TServiceClient>(client);
} catch (Exception e) {
logger.error("connect to server failed.", e);
throw e;
}
}
public boolean validateObject(PooledObject<TServiceClient> pooledObject) {
TServiceClient client = pooledObject.getObject();
TTransport transport = client.getInputProtocol().getTransport();
return transport.isOpen();
}
public void activateObject(PooledObject<TServiceClient> pooledObject) throws Exception {
TServiceClient client = pooledObject.getObject();
TTransport transport = client.getInputProtocol().getTransport();
if (!transport.isOpen()) {
logger.info("transport is closed, reopen");
transport.open();
}
}
public void destroyObject(PooledObject<TServiceClient> pooledObject) throws Exception {
TServiceClient client = pooledObject.getObject();
TTransport transport = client.getInputProtocol().getTransport();
if (transport.isOpen()) {
transport.close();
logger.info("close renode connection." + node.toString());
}
}
public void passivateObject(PooledObject<TServiceClient> pooledObject) throws Exception {
TServiceClient client = pooledObject.getObject();
TTransport transport = client.getInputProtocol().getTransport();
if (!transport.isOpen()) {
logger.info("transport is closed, reopen");
transport.open();
}
}
}
备注:其中需要注意的地方是,在makeObject()中,根据不同的Client将进行创建。
2.3 代理
用于获取连接和归还连接。
public class ThriftClientProxy {
private static final Logger logger = LogManager.getLogger(ThriftClientProxy.class);
private static final int WAIT_TIME_MS = 20; // 等待时间
GenericObjectPoolConfig poolConfig; // 连接池配置
// Thrift客户端连接池,ConcurrentHashMap用于存储所有的对象(不含销毁的对象)
// LinkedBlockingDeque用于存储空闲的对象
GenericObjectPool<TServiceClient> pool;
ConnectionFactory connectionFactory; // 连接
Node node; // 节点
/* 构造连接池 */
public ThriftClientProxy(Node node, int timeout, int minPoolSize, int maxPoolSize) {
this.node = node;
poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(maxPoolSize);
poolConfig.setMinIdle(minPoolSize);
connectionFactory = new ConnectionFactory(this.node, timeout);
pool = new GenericObjectPool<TServiceClient>(connectionFactory, poolConfig);
}
/* 连接池初始化 */
public boolean connect() {
try {
pool.preparePool();
} catch (Exception e) {
logger.error("prepare client connection pool failed.", e);
close();
return false;
}
return true;
}
/* 关闭连接池 */
public void close() {
pool.close();
}
/* 获取连接池中的连接 */
public TServiceClient takeConnection() {
try {
return pool.borrowObject(WAIT_TIME_MS);
} catch (Exception e) {
logger.error("take connection from pool failed.", e);
return null;
}
}
/* 归还连接 */
public void returnConnection(TServiceClient client) {
if (client == null)
return;
pool.returnObject(client);
}
}
2.4 Main
用于测试连接池的功能。
public class Main {
private static final Logger logger = LogManager.getLogger(Main.class);
public static void main(String[] args) {
// 根据节点创建连接池
Node node = new Node("127.0.0.1", 9000);
ThriftClientProxy thriftClientProxy = new ThriftClientProxy(node, 200, 5, 5);
if (!thriftClientProxy.connect()) {
logger.error("connect to server failed.{}", node.toString());
return;
}
// 获取连接并使用
ComputeServer.Client computeClient = (ComputeServer.Client) thriftClientProxy.takeConnection();
ComputeRequest request = new ComputeRequest();
request.setX(1);
request.setY(2);
request.setComputeType(ComputeType.ADD);
try {
ComputeResponse response = computeClient.getComputeResult(request);
if (response != null) {
System.out.println(response.toString());
}
} catch (Exception e) {
logger.error(e);
} finally {
// 归还连接
thriftClientProxy.returnConnection(computeClient);
}
}
}
2.5 代码结构
包结构2.6 测试
测试引用
http://www.open-open.com/lib/view/open1415453575730.html
http://www.cnblogs.com/jinzhiming/p/5120623.html
http://commons.apache.org/proper/commons-pool/api-2.4.2/index.html
网友评论