需求背景
- 由于大数据集群机房的迁移,迁移之前需要先进行一波数据压缩的工作。具体的内容就是将用户Textfile格式的表格,压缩成orc+snappy的格式。在整个压缩的流程中,需要多次的连接HiveServer2查询partition的元数据信息。考虑到每次获取一个连接对象的损耗是比较好的,因此采用了对象池的方法来提高速度。
- 由于涉及到对多集群数据压缩,因此需要设置不同的数据源的池对象。
- 此项目为深度数据运维工具,后续将接入到我司开源的中间件,则能很好的解决此问题
- 当然每种压缩算法都有特定的适应场景,有些确实用Textfile格式最好,得根据使用场景来选择符合的格式
-
图1:Hive各种压缩算法效果比较,可以看出orc+snappy格式的压缩率是最高的
压缩算法比较
多数据源源对象池实战
- 在maven/grade中引入commons-pool2
#maven
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.2</version>
</dependency>
#grade
dependencies{
compile group: 'org.apache.commons', name: 'commons-pool2', version: '2.6.2'
}
- 创建对象池配置文件
-
关于对象池配置说明。我们设置了最大空闲数为5个,也就是项目在启动的时候,就会自动的为每个数据源创建5个对象。其它的可以看参数的说明。
对象池属性配置
public class PoolProperties {
//最大空空闲
private int maxIdle = 5;
//最大连接数
private int maxTotal = 50;
// 最小空闲
private int minIdle = 2;
// 初始化连接数
private int initialSize = 5;
//最大等待时间,设置为30秒
private int maxWaitMillis = 30000;
//省略get/set方法
}
- 创建不用数据源的工厂类
- 假设我们当前有集群A,B,它们都需要连接到HiveServer2
- 对象池中管理的对象为HiveServer2的JDBC连接对象Connection
#A集群的连接池对象
public class AClusterObjectPool extends GenericObjectPool<Connection> {
public AClusterObjectPool (PooledObjectFactory<Connection> factory, GenericObjectPoolConfig<Connection> config) {
super(factory, config);
}
}
#B集群的连接池对象
public class BClusterObjectPool extends GenericObjectPool<Connection> {
public BClusterObjectPool (PooledObjectFactory<Connection> factory, GenericObjectPoolConfig<Connection> config) {
super(factory, config);
}
}
- 创建connection对象类
public class PoolableObjectFactory extends BasePooledObjectFactory<Connection> {
private final String driverName = "org.apache.hive.jdbc.HiveDriver";
private String url;
private String username;
private String password;
public PoolableObjectFactory(String url, String username, String password) {
this.url = url;
this.username = username;
this.password = password;
}
/**
* 创建一个对象实例
*/
@Override
public Connection create() throws Exception {
Class.forName(driverName);
java.util.Properties info = new java.util.Properties();
info.put("user", username);
info.put("password", password);
info.put("hive.metastore.client.socket.timeout", 3000);
Connection con = DriverManager.getConnection(url, info);
return con;
}
/**
* 包裹创建的对象实例,返回一个pooledobject
*/
@Override
public PooledObject<Connection> wrap(Connection obj) {
return new DefaultPooledObject<Connection>(obj);
}
}
- 创建池对象的工具类
@Configuration
@EnableConfigurationProperties(PoolProperties.class)
public class PooledObjectUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(PooledObjectUtil.class);
@Autowired
private ClusterService clusterService;
@Value("${a.cluster.name}")
private String aClusterName;
@Value("${b.cluster.name}")
private String bClusterName;
private final PoolProperties poolProperties;
AClusterObjectPool aPool;
BClusterObjectPool bPool;
@Autowired
public PooledObjectUtil(PoolProperties poolProperties) {
this.poolProperties = poolProperties;
}
private GenericObjectPoolConfig generatePoolConfig() {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 最大空闲数
poolConfig.setMaxIdle(poolProperties.getMaxIdle());
// 最小空闲数, 池中只有一个空闲对象的时候,池会在创建一个对象,并借出一个对象,从而保证池中最小空闲数为1
poolConfig.setMinIdle(poolProperties.getMinIdle());
// 最大池对象总数
poolConfig.setMaxTotal(poolProperties.getMaxTotal());
// 在获取对象的时候检查有效性, 默认false
poolConfig.setTestOnBorrow(true);
// 在归还对象的时候检查有效性, 默认false
poolConfig.setTestOnReturn(false);
// 在空闲时检查有效性, 默认false
poolConfig.setTestWhileIdle(false);
// 最大等待时间, 默认的值为-1,表示无限等待。
poolConfig.setMaxWaitMillis(poolProperties.getMaxWaitMillis());
// 是否启用后进先出, 默认true
poolConfig.setLifo(true);
// MXBean already registered的错误
poolConfig.setJmxEnabled(false);
return poolConfig;
}
@Bean
public AClusterObjectPool aPooledObjectFactory() {
Cluster cluster = clusterService.findByClusterName(aClusterName);
GenericObjectPoolConfig poolConfig = generatePoolConfig();
PooledObjectFactory<Connection> factory = new PoolableObjectFactory(cluster.getHiveServer2Url(), cluster.getAdminUser(), cluster.getAdminUmPassword());
aPool = new AClusterObjectPool(factory, poolConfig);
initPool(aPool , poolProperties.getInitialSize(), poolConfig.getMaxIdle());
LOGGER.info("Successful build aCluster connection,size is {}",aPool .getNumActive());
return aPool ;
}
@Bean
public BClusterObjectPool bPooledObjectFactory() {
Cluster cluster = clusterService.findByClusterName(bClusterName);
GenericObjectPoolConfig poolConfig = generatePoolConfig();
PooledObjectFactory<Connection> factory = new PoolableObjectFactory(cluster.getHiveServer2Url(), cluster.getAdminUser(), cluster.getAdminUmPassword());
bPool = new BClusterObjectPool(factory, poolConfig);
initPool(bPool , poolProperties.getInitialSize(), poolConfig.getMaxIdle());
LOGGER.info("Successful build b cluster connection,size is {}",bPool .getNumActive());
return bPool ;
}
/**
* 预先加载connection对象到对象池中
* @param initialSize 初始化连接数
* @param maxIdle 最大空闲连接数
*/
private void initPool(AClusterObjectPool pool, int initialSize, int maxIdle) {
if (initialSize <= 0) {
return;
}
int size = Math.min(initialSize, maxIdle);
for (int i = 0; i < size; i++) {
try {
pool.addObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
/**
* 预先加载connection对象到对象池中
* @param initialSize 初始化连接数
* @param maxIdle 最大空闲连接数
*/
private void initPool(BClusterObjectPool pool, int initialSize, int maxIdle) {
if (initialSize <= 0) {
return;
}
int size = Math.min(initialSize, maxIdle);
for (int i = 0; i < size; i++) {
try {
pool.addObject();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
在项目中如何使用
- 在项目中将不用的工厂类注入进来,后根据集群的名字进行判断,获取对应的工厂类
@Value(cluster.name.a)
private String aClusterName;
@Autowired
private AClusterObjectPool aClusterObjectPool;
@Autowired
private BClusterObjectPool bClusterObjectPool;
#从对象池中借用对象调用borrow方法
private Connection borrowObject(String clusterName) throws Exception {
Connection connection;
if (aClusterName.equals(clusterName)) {
connection = aClusterObjectPool.borrowObject();
} else {
connection = bClusterObjectPool.borrowObject();
}
LOGGER.info("Borrow hive client object success,clusterName is {}",clusterName);
return connection;
}
#向对象池归还对象调用return方法
private void returnObject(Connection connection,String clusterName){
if (aClusterObjectPool.equals(clusterName)) {
aClusterObjectPool.returnObject(connection);
} else {
bClusterObjectPool.returnObject(connection);
}
LOGGER.info("Return hive client object success,clusterName is {}",clusterName);
}
实际效果
- 在进行分区对比过程中执行从对象池借用对象,完成使命后进行关闭即可,能快速的处理好单服务遇到的瓶颈问题。
遇到的一些问题
- 测试环境对象池空置一段时间后,再连接的时候,会报session超时:HiveSQLException: Invalid SessionHandle
- 解决办法:修改hiveServer中客户端超时的验证时间,我的修改如下
hive.server2.session.check.interval = 1 hour
hive.server2.idle.operation.timeout = 1 day
hive.server2.idle.session.timeout = 3 days
- 如果HiveServer2重启的话,则创建的connection将失效,也需要重启项目,这个是后续的改进方向,也可能接入到其它的连接中间件中。
参考资料
- Hive格式各种格式下不同压缩算法的比较:https://my.oschina.net/hulubo/blog/956692
- Hive支持的文件格式与压缩算法:https://my.oschina.net/hulubo/blog/915072
- Hive session异常:https://blog.csdn.net/weixin_39588015/article/details/79264139
- 探索对象池技术 (很好的源码解释):https://www.cnblogs.com/aspirant/p/10655761.html
网友评论