分布式分析引擎Kylin Spring DataSource封装

作者: 高广超 | 来源:发表于2019-03-15 21:15 被阅读8次

    Kylin 概述

    Apache Kylin™是一个开源的分布式分析引擎,提供Hadoop/Spark之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay Inc. 开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。

    image.png

    Kylin 特性

    image.png

    JDBC

    认证

    基于Apache Kylin认证RESTFUL服务。支持的参数:

    user : 用户名
    password : 密码
    ssl: true或false。 默认为flas;如果为true,所有的服务调用都会使用https。
    

    连接url格式:

    jdbc:kylin://<hostname>:<port>/<kylin_project_name>
    

    如果“ssl”为true,“port”应该是Kylin server的HTTPS端口。
    如果“port”未被指定,driver会使用默认的端口:HTTP 80,HTTPS 443。
    必须指定“kylin_project_name”并且用户需要确保它在Kylin server上存在。

    1. 使用Statement查询

    Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
    
    Properties info = new Properties();
    info.put("user", "ADMIN");
    info.put("password", "KYLIN");
    Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
    Statement state = conn.createStatement();
    ResultSet resultSet = state.executeQuery("select * from test_table");
    
    while (resultSet.next()) {
        assertEquals("foo", resultSet.getString(1));
        assertEquals("bar", resultSet.getString(2));
        assertEquals("tool", resultSet.getString(3));
    }
    

    2. 使用PreparedStatementv查询

    支持的PreparedStatement参数:
    setString
    setInt
    setShort
    setLong
    setFloat
    setDouble
    setBoolean
    setByte
    setDate
    setTime
    setTimestamp

    Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
    Properties info = new Properties();
    info.put("user", "ADMIN");
    info.put("password", "KYLIN");
    Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
    PreparedStatement state = conn.prepareStatement("select * from test_table where id=?");
    state.setInt(1, 10);
    ResultSet resultSet = state.executeQuery();
    
    while (resultSet.next()) {
        assertEquals("foo", resultSet.getString(1));
        assertEquals("bar", resultSet.getString(2));
        assertEquals("tool", resultSet.getString(3));
    }
    

    3. 获取查询结果元数据

    Kylin jdbc driver支持元数据列表方法:
    通过sql模式过滤器(比如 %)列出catalog、schema、table和column。

    Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
    Properties info = new Properties();
    info.put("user", "ADMIN");
    info.put("password", "KYLIN");
    Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
    Statement state = conn.createStatement();
    ResultSet resultSet = state.executeQuery("select * from test_table");
    
    ResultSet tables = conn.getMetaData().getTables(null, null, "dummy", null);
    while (tables.next()) {
        for (int i = 0; i < 10; i++) {
            assertEquals("dummy", tables.getString(i + 1));
        }
    }
    

    Spring DataSource封装

    JDBC方式在开发使用中十分不便,而如果能封装为Spring 提供的DataSource方式,使用过程中就会便捷很多。

    创建SqlProperties,封装jdbc连接的参数

    @Data
    public class KylinSqlProperties {
    
        private static final String DEFAULT_DRIVER_CLASS_NAME = "org.apache.kylin.jdbc.Driver";
        private static final int DEFAULT_POOL_SIZE = 10;
        private static final Long DEFAULT_MAX_WAIT_TIME = 10000L;
    
        /**
         * 用户名
         */
        private String userName;
    
        /**
         * 密码
         */
        private String password;
    
        /**
         * 是否加密
         */
        private boolean decrypt;
    
        /**
         * 主库连接地址
         */
        private String connectionUrl;
    
    
        /**
         * 最长等待连接时间
         */
        private long maxWaitTime = DEFAULT_MAX_WAIT_TIME;
    
        private int poolSize = DEFAULT_POOL_SIZE;
    
        private String driverClassName = DEFAULT_DRIVER_CLASS_NAME;
    }
    

    实现 DataSource 接口,创建连接池

    @Slf4j
    public class KylinDataSource implements DataSource {
    
        private LinkedList<Connection> connectionPoolList = new LinkedList<>();
        private long maxWaitTime;
    
        public KylinDataSource(KylinSqlProperties sqlProperties) {
            try {
                this.maxWaitTime = sqlProperties.getMaxWaitTime();
                Driver driverManager = (Driver) Class.forName(sqlProperties.getDriverClassName())
                        .newInstance();
                Properties info = new Properties();
                info.put("user", sqlProperties.getUserName());
                info.put("password", sqlProperties.getPassword());
                for (int i = 0; i < sqlProperties.getPoolSize(); i++) {
                    Connection connection = driverManager
                            .connect(sqlProperties.getConnectionUrl(), info);
                    connectionPoolList.add(ConnectionProxy.getProxy(connection, connectionPoolList));
                }
                log.info("PrestoDataSource has initialized {} size connection pool",
                        connectionPoolList.size());
            } catch (Exception e) {
                log.error("kylinDataSource initialize error, ex: ", e);
            }
        }
    
        @Override
        public Connection getConnection() throws SQLException {
            synchronized (connectionPoolList) {
                if (connectionPoolList.size() <= 0) {
                    try {
                        connectionPoolList.wait(maxWaitTime);
                    } catch (InterruptedException e) {
                        throw new SQLException("getConnection timeout..." + e.getMessage());
                    }
                }
    
                return connectionPoolList.removeFirst();
            }
        }
    
    
        @Override
        public Connection getConnection(String username, String password) throws SQLException {
            return getConnection();
        }
    
        @Override
        public <T> T unwrap(Class<T> iface) throws SQLException {
            throw new RuntimeException("Unsupport operation.");
        }
    
        @Override
        public boolean isWrapperFor(Class<?> iface) throws SQLException {
            return DataSource.class.equals(iface);
        }
    
        @Override
        public PrintWriter getLogWriter() throws SQLException {
            throw new RuntimeException("Unsupport operation.");
        }
    
        @Override
        public void setLogWriter(PrintWriter out) throws SQLException {
    
        }
    
        @Override
        public void setLoginTimeout(int seconds) throws SQLException {
            throw new RuntimeException("Unsupport operation.");
        }
    
        @Override
        public int getLoginTimeout() throws SQLException {
            return 0;
        }
    
        @Override
        public Logger getParentLogger() throws SQLFeatureNotSupportedException {
            return null;
        }
    
    
        static class ConnectionProxy implements InvocationHandler {
    
            private Object obj;
            private LinkedList<Connection> pool;
            private String DEFAULT_CLOSE_METHOD = "close";
    
            private ConnectionProxy(Object obj, LinkedList<Connection> pool) {
                this.obj = obj;
                this.pool = pool;
            }
    
            public static Connection getProxy(Object o, LinkedList<Connection> pool) {
                Object proxed = Proxy
                        .newProxyInstance(o.getClass().getClassLoader(), new Class[]{Connection.class},
                                new ConnectionProxy(o, pool));
                return (Connection) proxed;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if (method.getName().equals(DEFAULT_CLOSE_METHOD)) {
                    synchronized (pool) {
                        pool.add((Connection) proxy);
                        pool.notify();
                    }
                    return null;
                } else {
                    return method.invoke(obj, args);
                }
            }
        }
    
    

    创建JdbcPoolConfiguration类,注册template bean

    @Slf4j
    @Configuration
    @Component
    public class KylinJdbcPoolConfiguration implements BeanFactoryPostProcessor, EnvironmentAware {
    
        private ConfigurableEnvironment environment;
    
        @Value("${kylin.decrypt}")
        private boolean decrypt = false;
    
        private final static String prefixName = "kylin";
    
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
            KylinSqlProperties properties = new KylinSqlProperties();
            properties.setUserName("xxxxx");
            properties.setPassword("xxxx");
            properties.setConnectionUrl("xxxx");
            properties.setDecrypt(decrypt);
            createDataSourceBean(beanFactory, properties);
        }
    
        public void createDataSourceBean(ConfigurableListableBeanFactory beanFactory,
                                         KylinSqlProperties sqlProperties) {
    
            DataSource baseDataSource = new KylinDataSource(sqlProperties);
            register(beanFactory, new JdbcTemplate(baseDataSource), prefixName + "JdbcTemplateFactory", prefixName);
        }
    
        private void register(ConfigurableListableBeanFactory beanFactory, Object bean, String name,
                              String alias) {
            beanFactory.registerSingleton(name, bean);
            if (!beanFactory.containsSingleton(alias)) {
                beanFactory.registerAlias(name, alias);
            }
        }
    
        @Override
        public void setEnvironment(Environment environment) {
            this.environment = (ConfigurableEnvironment) environment;
        }
    
    }
    

    RowMapper实现

    
    public class CommonBeanPropertyRowMapper<T> implements RowMapper<T> {
    
        protected final Log logger = LogFactory.getLog(this.getClass());
        private Class<T> mappedClass;
        private boolean checkFullyPopulated = false;
        private boolean primitivesDefaultedForNullValue = false;
        private ConversionService conversionService = DefaultConversionService.getSharedInstance();
        private Map<String, PropertyDescriptor> mappedFields;
        private Set<String> mappedProperties;
    
        public CommonBeanPropertyRowMapper() {
        }
    
        public CommonBeanPropertyRowMapper(Class<T> mappedClass) throws Exception {
            this.initialize(mappedClass);
        }
    
        public CommonBeanPropertyRowMapper(Class<T> mappedClass, boolean checkFullyPopulated)
                throws Exception {
            this.initialize(mappedClass);
            this.checkFullyPopulated = checkFullyPopulated;
        }
    
        public void setMappedClass(Class<T> mappedClass) throws Exception {
            if (this.mappedClass == null) {
                this.initialize(mappedClass);
            } else if (this.mappedClass != mappedClass) {
                throw new InvalidDataAccessApiUsageException(
                        "The mapped class can not be reassigned to map to " + mappedClass
                                + " since it is already providing mapping for " + this.mappedClass);
            }
    
        }
    
        public final Class<T> getMappedClass() {
            return this.mappedClass;
        }
    
        public void setCheckFullyPopulated(boolean checkFullyPopulated) {
            this.checkFullyPopulated = checkFullyPopulated;
        }
    
        public boolean isCheckFullyPopulated() {
            return this.checkFullyPopulated;
        }
    
        public void setPrimitivesDefaultedForNullValue(boolean primitivesDefaultedForNullValue) {
            this.primitivesDefaultedForNullValue = primitivesDefaultedForNullValue;
        }
    
        public boolean isPrimitivesDefaultedForNullValue() {
            return this.primitivesDefaultedForNullValue;
        }
    
        public void setConversionService(ConversionService conversionService) {
            this.conversionService = conversionService;
        }
    
        public ConversionService getConversionService() {
            return this.conversionService;
        }
    
        protected void initialize(Class<T> mappedClass) throws Exception {
            this.mappedClass = mappedClass;
            this.mappedFields = new HashMap();
            this.mappedProperties = new HashSet();
            PropertyDescriptor[] pds = BeanUtils.getPropertyDescriptors(mappedClass);
            PropertyDescriptor[] var3 = pds;
            int var4 = pds.length;
    
            for (int var5 = 0; var5 < var4; ++var5) {
                PropertyDescriptor pd = var3[var5];
                if (pd.getWriteMethod() != null) {
                    Field field = mappedClass.getDeclaredField(pd.getName());
                    SerializedName annotation = field.getAnnotation(SerializedName.class);
                    if (annotation != null) {
                        this.mappedFields.put(annotation.value(), pd);
                    } else {
    
                        this.mappedFields.put(this.lowerCaseName(pd.getName()), pd);
                        String underscoredName = this.underscoreName(pd.getName());
                        if (!this.lowerCaseName(pd.getName()).equals(underscoredName)) {
                            this.mappedFields.put(underscoredName, pd);
                        }
                    }
    
                    this.mappedProperties.add(pd.getName());
                }
            }
        }
    
        protected String underscoreName(String name) {
            if (!StringUtils.hasLength(name)) {
                return "";
            } else {
                StringBuilder result = new StringBuilder();
                result.append(this.lowerCaseName(name.substring(0, 1)));
    
                for (int i = 1; i < name.length(); ++i) {
                    String s = name.substring(i, i + 1);
                    String slc = this.lowerCaseName(s);
                    if (!s.equals(slc)) {
                        result.append("_").append(slc);
                    } else {
                        result.append(s);
                    }
                }
    
                return result.toString();
            }
        }
    
        protected String lowerCaseName(String name) {
            return name.toLowerCase(Locale.US);
        }
    
        @Override
        public T mapRow(ResultSet rs, int rowNumber) throws SQLException {
            Assert.state(this.mappedClass != null, "Mapped class was not specified");
            T mappedObject = BeanUtils.instantiateClass(this.mappedClass);
            BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(mappedObject);
            this.initBeanWrapper(bw);
            ResultSetMetaData rsmd = rs.getMetaData();
            int columnCount = rsmd.getColumnCount();
            HashSet populatedProperties = this.isCheckFullyPopulated() ? new HashSet() : null;
    
            for (int index = 1; index <= columnCount; ++index) {
                String column = JdbcUtils.lookupColumnName(rsmd, index);
                String field = this.lowerCaseName(column.replaceAll(" ", ""));
                PropertyDescriptor pd = (PropertyDescriptor) this.mappedFields.get(field);
                if (pd == null) {
                    if (rowNumber == 0 && this.logger.isDebugEnabled()) {
                        this.logger.debug(
                                "No property found for column \'" + column + "\' mapped to field \'" + field + "\'");
                    }
                } else {
                    try {
                        Object ex = this.getColumnValue(rs, index, pd);
                        if (rowNumber == 0 && this.logger.isDebugEnabled()) {
                            this.logger.debug(
                                    "Mapping column \'" + column + "\' to property \'" + pd.getName() + "\' of type \'"
                                            + ClassUtils
                                            .getQualifiedName(pd.getPropertyType()) + "\'");
                        }
    
                        try {
                            bw.setPropertyValue(pd.getName(), ex);
                        } catch (TypeMismatchException var14) {
                            if (ex != null || !this.primitivesDefaultedForNullValue) {
                                throw var14;
                            }
    
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug(
                                        "Intercepted TypeMismatchException for row " + rowNumber + " and column \'"
                                                + column + "\' with null value when setting property \'" + pd.getName()
                                                + "\' of type \'" + ClassUtils.getQualifiedName(pd.getPropertyType())
                                                + "\' on object: " + mappedObject, var14);
                            }
                        }
    
                        if (populatedProperties != null) {
                            populatedProperties.add(pd.getName());
                        }
                    } catch (NotWritablePropertyException var15) {
                        throw new DataRetrievalFailureException(
                                "Unable to map column \'" + column + "\' to property \'" + pd.getName() + "\'",
                                var15);
                    }
                }
            }
    
            if (populatedProperties != null && !populatedProperties.equals(this.mappedProperties)) {
                throw new InvalidDataAccessApiUsageException(
                        "Given ResultSet does not contain all fields necessary to populate object of class ["
                                + this.mappedClass.getName() + "]: " + this.mappedProperties);
            } else {
                return mappedObject;
            }
        }
    
        protected void initBeanWrapper(BeanWrapper bw) {
            ConversionService cs = this.getConversionService();
            if (cs != null) {
                bw.setConversionService(cs);
            }
    
        }
    
        protected Object getColumnValue(ResultSet rs, int index, PropertyDescriptor pd)
                throws SQLException {
            return JdbcUtils.getResultSetValue(rs, index, pd.getPropertyType());
        }
    
        public static <T> org.springframework.jdbc.core.BeanPropertyRowMapper<T> newInstance(
                Class<T> mappedClass) {
            return new org.springframework.jdbc.core.BeanPropertyRowMapper(mappedClass);
        }
    }
    
    

    RowMapper子类

    
    public class RowMapper<T> extends CommonBeanPropertyRowMapper<T> {
    
        private List<MapperPlugin> mapperPlugins;
    
        private RowMapper(Class<T> tClass, List<MapperPlugin> mapperPlugins) throws Exception {
            super(tClass);
            this.mapperPlugins = mapperPlugins;
        }
    
        @Override
        protected Object getColumnValue(ResultSet rs, int index, PropertyDescriptor pd)
                throws SQLException {
            Object object = rs.getObject(index);
            return mapperPlugins.stream()
                    .filter(mapperPlugin -> mapperPlugin.test(pd))
                    .map(mapperPlugin -> mapperPlugin.getColumnValue(object, pd))
                    .findFirst()
                    .orElse(super.getColumnValue(rs, index, pd));
        }
    
        public static <T> RowMapper<T> getDefault(Class<T> tClass) {
            return RowMapper.<T>builder().tClass(tClass)
                    .mapperPlugins(JSONObjectPlugin)
                    .mapperPlugins(ListPlugin)
                    .mapperPlugins(SetPlugin)
                    .mapperPlugins(MapPlugin)
                    .mapperPlugins(EnumPlugin)
                    .mapperPlugins(JsonPlugin)
                    .build();
        }
    
        public static <T> RowMapper<T> withDefault(Class<T> tClass, MapperPlugin... mapperPlugins) {
            RhllorRowMapperBuilder<T> builder = RowMapper.<T>builder().tClass(tClass);
            for (final MapperPlugin mapperPlugin : mapperPlugins) {
                builder.mapperPlugins(mapperPlugin);
            }
            return builder
                    .mapperPlugins(JSONObjectPlugin)
                    .mapperPlugins(ListPlugin)
                    .mapperPlugins(SetPlugin)
                    .mapperPlugins(MapPlugin)
                    .mapperPlugins(EnumPlugin)
                    .mapperPlugins(JsonPlugin)
                    .build();
        }
    
        public static <T> RowMapper.RhllorRowMapperBuilder<T> builder() {
            return new RowMapper.RhllorRowMapperBuilder<>();
        }
    
        public static class RhllorRowMapperBuilder<T> {
    
            private Class<T> tClass;
            private ArrayList<MapperPlugin> mapperPlugins;
    
            RhllorRowMapperBuilder() {
            }
    
            public RowMapper.RhllorRowMapperBuilder<T> tClass(Class<T> tClass) {
                this.tClass = tClass;
                return this;
            }
    
            public RowMapper.RhllorRowMapperBuilder<T> mapperPlugins(MapperPlugin mapperPlugin) {
                if (this.mapperPlugins == null) {
                    this.mapperPlugins = new ArrayList();
                }
                this.mapperPlugins.add(mapperPlugin);
                return this;
            }
    
            public RowMapper<T> build() {
                List<MapperPlugin> mapperPlugins;
                switch (this.mapperPlugins == null ? 0 : this.mapperPlugins.size()) {
                    case 0:
                        mapperPlugins = Collections.emptyList();
                        break;
                    case 1:
                        mapperPlugins = Collections.singletonList(this.mapperPlugins.get(0));
                        break;
                    default:
                        mapperPlugins = Collections.unmodifiableList(new ArrayList<>(this.mapperPlugins));
                }
                try {
                    return new RowMapper<>(this.tClass, mapperPlugins);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
    
                return null;
            }
    
            @Override
            public String toString() {
                return "PrestoRowMapper.KylinRowMapperBuilder(tClass=" + this.tClass + ", mapperPlugins="
                        + this.mapperPlugins + ")";
            }
        }
    
    
    }
    
    

    MapperPlugin

    
    public class MapperPlugin {
    
        private static final Function<Object, String> bytes2UTF8String =
                bytes -> bytes instanceof String ? bytes.toString() :
                        new String((byte[]) bytes, Charset.forName("UTF-8"));
    
        private static final Function<PropertyDescriptor, Class> pd2Generic =
                pd -> getCollectionGeneric(pd.getReadMethod());
    
    
        private final Predicate<PropertyDescriptor> predicate;
        private final ColumnValue columnValue;
    
        private MapperPlugin(Predicate<PropertyDescriptor> predicate,
                             ColumnValue columnValue) {
            this.predicate = predicate;
            this.columnValue = columnValue;
        }
    
        boolean test(PropertyDescriptor pd) {
            return predicate.test(pd);
        }
    
        Object getColumnValue(Object object, PropertyDescriptor pd) {
            return columnValue.get(object, pd);
        }
    
        public static MapperPluginsBuilder of(Predicate<PropertyDescriptor> predicate) {
            return new MapperPluginsBuilder(predicate);
        }
    
        public static MapperPluginsBuilder ofNot(Predicate<PropertyDescriptor> predicate) {
            return of(predicate.negate());
        }
    
        public static MapperPluginsBuilder of(Class clazz) {
            return of(pd -> clazz.isAssignableFrom(pd.getPropertyType()));
        }
    
        @FunctionalInterface
        public interface ColumnValue {
    
            Object get(Object object, PropertyDescriptor pd);
        }
    
        public static class MapperPluginsBuilder {
    
            Predicate<PropertyDescriptor> predicate;
    
            public MapperPluginsBuilder(Predicate<PropertyDescriptor> predicate) {
                this.predicate = predicate;
            }
    
            public MapperPlugin columnValue(ColumnValue columnValue) {
                return new MapperPlugin(predicate, columnValue);
            }
        }
    
        static final MapperPlugin JsonPlugin =
                MapperPlugin.ofNot(pd -> pd.getPropertyType().isPrimitive() ||
                        Primitives.isWrapperType(pd.getPropertyType()) ||
                        String.class.isAssignableFrom(pd.getPropertyType()) ||
                        Date.class.isAssignableFrom(pd.getPropertyType()))
                        .columnValue((object, pd) ->
                                Optional.ofNullable(object)
                                        .map(bytes2UTF8String)
                                        .map(json -> JSON.parseObject(json, pd.getPropertyType()))
                                        .orElse(null));
    
        static final MapperPlugin JSONObjectPlugin =
                MapperPlugin.of(JSONObject.class)
                        .columnValue((object, pd) ->
                                Optional.ofNullable(object)
                                        .map(bytes2UTF8String)
                                        .map(JSONObject::parseObject)
                                        .orElse(new JSONObject()));
    
        static final MapperPlugin ListPlugin =
                MapperPlugin.of(List.class)
                        .columnValue((object, pd) ->
                                Optional.ofNullable(object)
                                        .map(bytes2UTF8String)
                                        .map(json -> JSON.parseArray(json, pd2Generic.apply(pd)))
                                        .orElse(new ArrayList<>()));
    
        static final MapperPlugin SetPlugin =
                MapperPlugin.of(Set.class)
                        .columnValue((object, pd) ->
                                Optional.ofNullable(object)
                                        .map(bytes2UTF8String)
                                        .map(json -> JSON.parseArray(json, pd2Generic.apply(pd)))
                                        .map(list -> Sets.newHashSet(List.class.cast(list)))
                                        .orElse(new HashSet<>()));
    
        static final MapperPlugin MapPlugin =
                MapperPlugin.of(Map.class)
                        .columnValue((object, pd) ->
                                Optional.ofNullable(object)
                                        .map(bytes2UTF8String)
                                        .map(json -> JSONObject.parseObject(json, Map.class))
                                        .orElse(new HashMap<>()));
    
        static final MapperPlugin EnumPlugin =
                MapperPlugin.of(Enum.class)
                        .columnValue((o, pd) -> {
                            try {
                                if (o == null) {
                                    return null;
                                }
                                if (o instanceof Number) {
                                    Number number = (Number) o;
                                    Method method = pd.getPropertyType()
                                            .getMethod("valueByIndex", Integer.TYPE);
                                    return method.invoke(null, number.intValue());
                                } else {
                                    String val = o.toString();
                                    Method method = pd.getPropertyType().getMethod("fromString", String.class);
                                    return method.invoke(null, val);
                                }
                            } catch (NoSuchMethodException e) {
                                throw new RuntimeException(
                                        "getColumnValue error, NoSuchMethod : valueByIndex or fromString", e);
                            } catch (InvocationTargetException e) {
                                throw new RuntimeException(
                                        "getColumnValue error, InvocationTargetException ", e);
                            } catch (IllegalAccessException e) {
                                throw new RuntimeException(
                                        "getColumnValue error, IllegalAccessException ", e);
                            }
                        });
    
        private static Class<?> getCollectionGeneric(Method method) {
            if (Collection.class.isAssignableFrom(method.getReturnType())) {
                Type fc = method.getGenericReturnType();
                if (fc == null) {
                    return Object.class;
                }
                if (fc instanceof ParameterizedType) {
                    ParameterizedType pt = (ParameterizedType) fc;
                    return (Class) pt.getActualTypeArguments()[0];
                }
                return Object.class;
            }
            return Object.class;
        }
    
    }
    
    

    具体使用

    
    @Component
    @Log4j2
    public class MetricDaoImpl {
       
        @Resource(name = "kylinJdbcTemplateFactory")
        private JdbcTemplate kylinJdbcTemplate;
    
        public List<TotalModelMetricEntity> getDistinctIds() {
            StringBuilder sqlBuilder = new StringBuilder()
                    .append(" select * ")
                    .append(" from LOG_DID_VIEW ")
                    .append(" ORDER BY DT ,total DESC limit 1000");
            log.info(sqlBuilder);
            return kylinJdbcTemplate.query(sqlBuilder.toString(), RowMapper.getDefault(TotalModelMetricEntity.class));
        }
    
    

    综上我们就完成了对Kylin JDBC的封装,同样的如Presto等其他支持JDBC的查询引擎封装方式类似。


    欢迎关注 高广超的简书博客 与 收藏文章 !
    欢迎关注 头条号:互联网技术栈

    个人介绍:

    高广超:多年一线互联网研发与架构设计经验,擅长设计与落地高可用、高性能、可扩展的互联网架构。目前从事大数据相关研发与架构工作。

    本文首发在 高广超的简书博客 转载请注明!

    相关文章

      网友评论

        本文标题:分布式分析引擎Kylin Spring DataSource封装

        本文链接:https://www.haomeiwen.com/subject/jnqdmqtx.html