美文网首页
webflux使用hive jdbc

webflux使用hive jdbc

作者: GoddyWu | 来源:发表于2019-01-09 11:25 被阅读0次

公司有个场景,通过spark thrift server将日志以hive表存储起来,并暴露hive的jdbc连接端口。目前spring推荐使用非阻塞异步IO即reactive programing: https://projectreactor.io/ 。对于web app,spring有webflux框架,https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html。然鹅,webflux如何建立非阻塞异步IO的hive jdbc连接呢、?

这里参考这里的博客:
注:博客里的代码是走不通的,它推荐使用的是rxjava2-jdbc,项目地址:https://github.com/davidmoten/rxjava2-jdbc

首先添加pom依赖:

# 这个就是博客推荐使用的RxJava
    <!-- https://mvnrepository.com/artifact/com.github.davidmoten/rxjava2-jdbc -->
    <dependency>
      <groupId>com.github.davidmoten</groupId>
      <artifactId>rxjava2-jdbc</artifactId>
      <version>0.2.1</version>
    </dependency>

# 注意这里需要选用和自己的hive对应版本的jdbc。因为此pom包和webflux有包的冲突,所以我们将它们exclude出去。
    <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>1.2.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.eclipse.jetty.aggregate</groupId>
          <artifactId>jetty-all</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

# 这里的版本也要和hive jdbc相对应,具体对应版本需要查一下
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.1</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

补充:后两个pom包应该是可以用spring data hadoop替换的,还未尝试

    <!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-hadoop -->
    <dependency>
      <groupId>org.springframework.data</groupId>
      <artifactId>spring-data-hadoop</artifactId>
      <version>2.5.0.RELEASE</version>
    </dependency>

示例代码:

import io.reactivex.Flowable;
import java.sql.SQLException;
import lombok.Data;
import org.davidmoten.rx.jdbc.ConnectionProvider;
import org.davidmoten.rx.jdbc.Database;
import org.davidmoten.rx.jdbc.pool.NonBlockingConnectionPool;
import org.davidmoten.rx.jdbc.pool.Pools;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class Test2Controller {
  @Bean
  public Database database() {
    NonBlockingConnectionPool pool = Pools.nonBlocking()
        .maxPoolSize(Runtime.getRuntime().availableProcessors() * 5)
        .connectionProvider(ConnectionProvider.from("jdbc:hive2://localhost:10000/default"))
        .build();
    return Database.from(pool);
  }

  @Autowired
  private Database database;

  # 这个注解来自于插件lombok
  @Data
  private class KV {
    private String key;
    private String value;
  }

  @GetMapping("/1")
  public Flux test1() throws SQLException {
    String sql = "select * from src limit 20";

    Flowable<KV> kvFlowable = database.select(sql).get(row -> {
      KV kv = new KV();
      kv.setKey(row.getString("key"));
      kv.setValue(row.getString("value"));
      return kv;
    });
    return Flux.from(kvFlowable);
  }
}

至此,已可以正常使用了。


在database.select(sql).get()这里会获取到一个JDBC的ResultSet,我们需要在这里做一些转化。

第一种,就是上面的方式,比较麻烦。

第二种,rxjava本身提供一个automap的方法来映射对象。(底层用的jackson,我这里没走通,)

第三种,转换为map

  private List<HashMap<String, Object>> resultSetToArrayList(ResultSet rs) throws SQLException{
    ResultSetMetaData md = rs.getMetaData();
    int columns = md.getColumnCount();
    ArrayList<HashMap<String, Object>> list = new ArrayList<>();
    while (rs.next()){
      HashMap<String, Object> row = new HashMap<>(columns);
      for(int i=1; i<=columns; ++i){
        row.put(md.getColumnName(i), rs.getObject(i));
      }
      list.add(row);
    }
    return list;
  }

---

    Flowable<List<HashMap<String, Object>>> kvFlowable = database.select(sql).get(
        this::resultSetToArrayList);
    return Mono.from(kvFlowable).flatMapMany(Flux::fromIterable);

第四种,使用同步阻塞的方式

Class.forName("org.apache.hive.jdbc.HiveDriver");
    Connection connection = DriverManager
        .getConnection("jdbc:hive2://localhost:10000/default", "", "");

    Statement stmt = connection.createStatement();

    ResultSet rs = stmt.executeQuery("select * from src limit 20");
    List<HashMap<String, Object>> mapList = new ArrayList<>();
    while (rs.next()) {
      System.out.println(rs.getString(1));
      mapList = resultSetToArrayList(rs);
    }
    return Mono.just(mapList);

--
当然,可以注册JdbcTemplate到bean中,
@Bean JdbcTemplate jdbcTemplate() {
    return new JdbcTemplate(new SimpleDriverDataSource(new HiveDriver(), "jdbc:hive2://localhost:10000"));
  }

public Mono test5() {
    List<Map<String, Object>> result = hiveTemplate.queryForList("select * from src limit 20");
    return Mono.just(result);
  }

第五种,十分不推荐,因为HiveTemplate只能返回list、string等基础类,不能使用map

@Bean
  public HiveTemplate hiveTemplate() {
    DataSource dataSource = new SimpleDriverDataSource(new HiveDriver(), "jdbc:hive2://localhost:10000");
    HiveClientFactory factory = () -> new HiveClient(dataSource);
    return new HiveTemplate(factory);
  }

相关文章

网友评论

      本文标题:webflux使用hive jdbc

      本文链接:https://www.haomeiwen.com/subject/meuwrqtx.html