美文网首页
HiveServer2多数据源对象池实战

HiveServer2多数据源对象池实战

作者: _Kantin | 来源:发表于2020-05-17 20:57 被阅读0次

需求背景

  • 由于大数据集群机房的迁移,迁移之前需要先进行一波数据压缩的工作。具体的内容就是将用户Textfile格式的表格,压缩成orc+snappy的格式。在整个压缩的流程中,需要多次的连接HiveServer2查询partition的元数据信息。考虑到每次获取一个连接对象的损耗是比较好的,因此采用了对象池的方法来提高速度。
  • 由于涉及到对多集群数据压缩,因此需要设置不同的数据源的池对象。
  • 此项目为深度数据运维工具,后续将接入到我司开源的中间件,则能很好的解决此问题
  • 当然每种压缩算法都有特定的适应场景,有些确实用Textfile格式最好,得根据使用场景来选择符合的格式
  • 图1:Hive各种压缩算法效果比较,可以看出orc+snappy格式的压缩率是最高的


    压缩算法比较

多数据源源对象池实战

  1. 在maven/grade中引入commons-pool2
#maven
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.6.2</version>
</dependency>

#grade
dependencies{
    compile group: 'org.apache.commons', name: 'commons-pool2', version: '2.6.2'
}
  1. 创建对象池配置文件
  • 关于对象池配置说明。我们设置了最大空闲数为5个,也就是项目在启动的时候,就会自动的为每个数据源创建5个对象。其它的可以看参数的说明。


    对象池属性配置
    public class PoolProperties {
         //最大空空闲
          private int maxIdle = 5;
          //最大连接数
          private int maxTotal = 50;
           // 最小空闲
          private int minIdle = 2;
          // 初始化连接数
          private int initialSize = 5;
          //最大等待时间,设置为30秒
          private int maxWaitMillis = 30000;
   
          //省略get/set方法
    }
  1. 创建不用数据源的工厂类
  • 假设我们当前有集群A,B,它们都需要连接到HiveServer2
  • 对象池中管理的对象为HiveServer2的JDBC连接对象Connection
#A集群的连接池对象
public class AClusterObjectPool extends GenericObjectPool<Connection> {
    public AClusterObjectPool (PooledObjectFactory<Connection> factory, GenericObjectPoolConfig<Connection> config) {
        super(factory, config);
    }
}

#B集群的连接池对象
public class BClusterObjectPool extends GenericObjectPool<Connection> {
    public BClusterObjectPool (PooledObjectFactory<Connection> factory, GenericObjectPoolConfig<Connection> config) {
        super(factory, config);
    }
}
  1. 创建connection对象类
public class PoolableObjectFactory extends BasePooledObjectFactory<Connection> {

    private final String driverName = "org.apache.hive.jdbc.HiveDriver";
    private  String url;
    private  String username;
    private  String password;


    public PoolableObjectFactory(String url, String username, String password) {
        this.url = url;
        this.username = username;
        this.password = password;
    }
    /**
     * 创建一个对象实例
     */
    @Override
    public Connection create() throws Exception {
        Class.forName(driverName);
        java.util.Properties info = new java.util.Properties();
        info.put("user", username);
        info.put("password", password);
        info.put("hive.metastore.client.socket.timeout", 3000);
        Connection con = DriverManager.getConnection(url, info);
        return con;
    }
    /**
     * 包裹创建的对象实例,返回一个pooledobject
     */
    @Override
    public PooledObject<Connection> wrap(Connection obj) {
        return new DefaultPooledObject<Connection>(obj);
    }
}
  1. 创建池对象的工具类
@Configuration
@EnableConfigurationProperties(PoolProperties.class)
public class PooledObjectUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(PooledObjectUtil.class);

    @Autowired
    private ClusterService clusterService;

    @Value("${a.cluster.name}")
    private String aClusterName;

    @Value("${b.cluster.name}")
    private String bClusterName;

    private final PoolProperties poolProperties;

    AClusterObjectPool  aPool;

    BClusterObjectPool  bPool;

    @Autowired
    public PooledObjectUtil(PoolProperties poolProperties) {
        this.poolProperties = poolProperties;
    }

    private GenericObjectPoolConfig generatePoolConfig() {
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        // 最大空闲数
        poolConfig.setMaxIdle(poolProperties.getMaxIdle());
        // 最小空闲数, 池中只有一个空闲对象的时候,池会在创建一个对象,并借出一个对象,从而保证池中最小空闲数为1
        poolConfig.setMinIdle(poolProperties.getMinIdle());
        // 最大池对象总数
        poolConfig.setMaxTotal(poolProperties.getMaxTotal());
        // 在获取对象的时候检查有效性, 默认false
        poolConfig.setTestOnBorrow(true);
        // 在归还对象的时候检查有效性, 默认false
        poolConfig.setTestOnReturn(false);
        // 在空闲时检查有效性, 默认false
        poolConfig.setTestWhileIdle(false);
        // 最大等待时间, 默认的值为-1,表示无限等待。
        poolConfig.setMaxWaitMillis(poolProperties.getMaxWaitMillis());
        // 是否启用后进先出, 默认true
        poolConfig.setLifo(true);
        // MXBean already registered的错误
        poolConfig.setJmxEnabled(false);
        return poolConfig;
    }

    @Bean
    public AClusterObjectPool  aPooledObjectFactory() {
        Cluster cluster = clusterService.findByClusterName(aClusterName);
        GenericObjectPoolConfig poolConfig = generatePoolConfig();
        PooledObjectFactory<Connection> factory = new PoolableObjectFactory(cluster.getHiveServer2Url(), cluster.getAdminUser(), cluster.getAdminUmPassword());
        aPool = new AClusterObjectPool(factory, poolConfig);
        initPool(aPool , poolProperties.getInitialSize(), poolConfig.getMaxIdle());
        LOGGER.info("Successful build aCluster connection,size is {}",aPool .getNumActive());
        return aPool ;
    }

    @Bean
    public BClusterObjectPool  bPooledObjectFactory() {
        Cluster cluster = clusterService.findByClusterName(bClusterName);
        GenericObjectPoolConfig poolConfig = generatePoolConfig();
        PooledObjectFactory<Connection> factory = new PoolableObjectFactory(cluster.getHiveServer2Url(), cluster.getAdminUser(), cluster.getAdminUmPassword());
        bPool = new BClusterObjectPool(factory, poolConfig);
        initPool(bPool , poolProperties.getInitialSize(), poolConfig.getMaxIdle());
        LOGGER.info("Successful build b cluster connection,size is {}",bPool .getNumActive());
        return bPool ;
    }
    /**
     * 预先加载connection对象到对象池中
     * @param initialSize 初始化连接数
     * @param maxIdle     最大空闲连接数
     */
    private void initPool(AClusterObjectPool pool, int initialSize, int maxIdle) {
        if (initialSize <= 0) {
            return;
        }
        int size = Math.min(initialSize, maxIdle);
        for (int i = 0; i < size; i++) {
            try {
                pool.addObject();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /**
     * 预先加载connection对象到对象池中
     * @param initialSize 初始化连接数
     * @param maxIdle     最大空闲连接数
     */
    private void initPool(BClusterObjectPool pool, int initialSize, int maxIdle) {
        if (initialSize <= 0) {
            return;
        }
        int size = Math.min(initialSize, maxIdle);
        for (int i = 0; i < size; i++) {
            try {
                pool.addObject();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

在项目中如何使用

  • 在项目中将不用的工厂类注入进来,后根据集群的名字进行判断,获取对应的工厂类
    @Value(cluster.name.a)
    private String aClusterName;
    @Autowired
    private AClusterObjectPool aClusterObjectPool;
    @Autowired
    private BClusterObjectPool bClusterObjectPool;

    #从对象池中借用对象调用borrow方法
    private Connection borrowObject(String clusterName) throws Exception {
        Connection connection;
        if (aClusterName.equals(clusterName)) {
            connection = aClusterObjectPool.borrowObject();
        } else {
            connection = bClusterObjectPool.borrowObject();
        }
        LOGGER.info("Borrow hive client object success,clusterName is {}",clusterName);
        return connection;
    }
    #向对象池归还对象调用return方法
    private void returnObject(Connection connection,String clusterName){
        if (aClusterObjectPool.equals(clusterName)) {
            aClusterObjectPool.returnObject(connection);
        } else {
            bClusterObjectPool.returnObject(connection);
        }
        LOGGER.info("Return hive client object success,clusterName is {}",clusterName);
    }

实际效果

  • 在进行分区对比过程中执行从对象池借用对象,完成使命后进行关闭即可,能快速的处理好单服务遇到的瓶颈问题。

遇到的一些问题

  • 测试环境对象池空置一段时间后,再连接的时候,会报session超时:HiveSQLException: Invalid SessionHandle
  • 解决办法:修改hiveServer中客户端超时的验证时间,我的修改如下
hive.server2.session.check.interval = 1 hour
hive.server2.idle.operation.timeout = 1 day
hive.server2.idle.session.timeout = 3 days

  • 如果HiveServer2重启的话,则创建的connection将失效,也需要重启项目,这个是后续的改进方向,也可能接入到其它的连接中间件中。

参考资料

相关文章

网友评论

      本文标题:HiveServer2多数据源对象池实战

      本文链接:https://www.haomeiwen.com/subject/mdxeohtx.html