美文网首页
webflux使用hive jdbc

webflux使用hive jdbc

作者: GoddyWu | 来源:发表于2019-01-09 11:25 被阅读0次

    公司有个场景,通过spark thrift server将日志以hive表存储起来,并暴露hive的jdbc连接端口。目前spring推荐使用非阻塞异步IO即reactive programing: https://projectreactor.io/ 。对于web app,spring有webflux框架,https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html。然鹅,webflux如何建立非阻塞异步IO的hive jdbc连接呢、?

    这里参考这里的博客:
    注:博客里的代码是走不通的,它推荐使用的是rxjava2-jdbc,项目地址:https://github.com/davidmoten/rxjava2-jdbc

    首先添加pom依赖:

    # 这个就是博客推荐使用的RxJava
        <!-- https://mvnrepository.com/artifact/com.github.davidmoten/rxjava2-jdbc -->
        <dependency>
          <groupId>com.github.davidmoten</groupId>
          <artifactId>rxjava2-jdbc</artifactId>
          <version>0.2.1</version>
        </dependency>
    
    # 注意这里需要选用和自己的hive对应版本的jdbc。因为此pom包和webflux有包的冲突,所以我们将它们exclude出去。
        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
        <dependency>
          <groupId>org.apache.hive</groupId>
          <artifactId>hive-jdbc</artifactId>
          <version>1.2.1</version>
          <exclusions>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
              <groupId>org.eclipse.jetty.aggregate</groupId>
              <artifactId>jetty-all</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
    
    # 这里的版本也要和hive jdbc相对应,具体对应版本需要查一下
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>2.7.1</version>
          <exclusions>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
    

    补充:后两个pom包应该是可以用spring data hadoop替换的,还未尝试

        <!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-hadoop -->
        <dependency>
          <groupId>org.springframework.data</groupId>
          <artifactId>spring-data-hadoop</artifactId>
          <version>2.5.0.RELEASE</version>
        </dependency>
    

    示例代码:

    import io.reactivex.Flowable;
    import java.sql.SQLException;
    import lombok.Data;
    import org.davidmoten.rx.jdbc.ConnectionProvider;
    import org.davidmoten.rx.jdbc.Database;
    import org.davidmoten.rx.jdbc.pool.NonBlockingConnectionPool;
    import org.davidmoten.rx.jdbc.pool.Pools;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Flux;
    
    @RestController
    public class Test2Controller {
      @Bean
      public Database database() {
        NonBlockingConnectionPool pool = Pools.nonBlocking()
            .maxPoolSize(Runtime.getRuntime().availableProcessors() * 5)
            .connectionProvider(ConnectionProvider.from("jdbc:hive2://localhost:10000/default"))
            .build();
        return Database.from(pool);
      }
    
      @Autowired
      private Database database;
    
      # 这个注解来自于插件lombok
      @Data
      private class KV {
        private String key;
        private String value;
      }
    
      @GetMapping("/1")
      public Flux test1() throws SQLException {
        String sql = "select * from src limit 20";
    
        Flowable<KV> kvFlowable = database.select(sql).get(row -> {
          KV kv = new KV();
          kv.setKey(row.getString("key"));
          kv.setValue(row.getString("value"));
          return kv;
        });
        return Flux.from(kvFlowable);
      }
    }
    

    至此,已可以正常使用了。


    在database.select(sql).get()这里会获取到一个JDBC的ResultSet,我们需要在这里做一些转化。

    第一种,就是上面的方式,比较麻烦。

    第二种,rxjava本身提供一个automap的方法来映射对象。(底层用的jackson,我这里没走通,)

    第三种,转换为map

      private List<HashMap<String, Object>> resultSetToArrayList(ResultSet rs) throws SQLException{
        ResultSetMetaData md = rs.getMetaData();
        int columns = md.getColumnCount();
        ArrayList<HashMap<String, Object>> list = new ArrayList<>();
        while (rs.next()){
          HashMap<String, Object> row = new HashMap<>(columns);
          for(int i=1; i<=columns; ++i){
            row.put(md.getColumnName(i), rs.getObject(i));
          }
          list.add(row);
        }
        return list;
      }
    
    ---
    
        Flowable<List<HashMap<String, Object>>> kvFlowable = database.select(sql).get(
            this::resultSetToArrayList);
        return Mono.from(kvFlowable).flatMapMany(Flux::fromIterable);
    

    第四种,使用同步阻塞的方式

    Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection connection = DriverManager
            .getConnection("jdbc:hive2://localhost:10000/default", "", "");
    
        Statement stmt = connection.createStatement();
    
        ResultSet rs = stmt.executeQuery("select * from src limit 20");
        List<HashMap<String, Object>> mapList = new ArrayList<>();
        while (rs.next()) {
          System.out.println(rs.getString(1));
          mapList = resultSetToArrayList(rs);
        }
        return Mono.just(mapList);
    
    --
    当然,可以注册JdbcTemplate到bean中,
    @Bean JdbcTemplate jdbcTemplate() {
        return new JdbcTemplate(new SimpleDriverDataSource(new HiveDriver(), "jdbc:hive2://localhost:10000"));
      }
    
    public Mono test5() {
        List<Map<String, Object>> result = hiveTemplate.queryForList("select * from src limit 20");
        return Mono.just(result);
      }
    

    第五种,十分不推荐,因为HiveTemplate只能返回list、string等基础类,不能使用map

    @Bean
      public HiveTemplate hiveTemplate() {
        DataSource dataSource = new SimpleDriverDataSource(new HiveDriver(), "jdbc:hive2://localhost:10000");
        HiveClientFactory factory = () -> new HiveClient(dataSource);
        return new HiveTemplate(factory);
      }
    

    相关文章

      网友评论

          本文标题:webflux使用hive jdbc

          本文链接:https://www.haomeiwen.com/subject/meuwrqtx.html