公司有个场景,通过spark thrift server将日志以hive表存储起来,并暴露hive的jdbc连接端口。目前spring推荐使用非阻塞异步IO即reactive programing: https://projectreactor.io/ 。对于web app,spring有webflux框架,https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html。然鹅,webflux如何建立非阻塞异步IO的hive jdbc连接呢、?
这里参考这里的博客:
注:博客里的代码是走不通的,它推荐使用的是rxjava2-jdbc,项目地址:https://github.com/davidmoten/rxjava2-jdbc
- https://medium.com/netifi/spring-webflux-and-rxjava2-jdbc-83a94e71ba04
- https://gist.github.com/robertroeser/580116df0045187e4850bd4016704ce1#file-employeerepository-java
首先添加pom依赖:
# 这个就是博客推荐使用的RxJava
<!-- https://mvnrepository.com/artifact/com.github.davidmoten/rxjava2-jdbc -->
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>rxjava2-jdbc</artifactId>
<version>0.2.1</version>
</dependency>
# 注意这里需要选用和自己的hive对应版本的jdbc。因为此pom包和webflux有包的冲突,所以我们将它们exclude出去。
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
# 这里的版本也要和hive jdbc相对应,具体对应版本需要查一下
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
补充:后两个pom包应该是可以用spring data hadoop替换的,还未尝试
<!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-hadoop -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-hadoop</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
示例代码:
import io.reactivex.Flowable;
import java.sql.SQLException;
import lombok.Data;
import org.davidmoten.rx.jdbc.ConnectionProvider;
import org.davidmoten.rx.jdbc.Database;
import org.davidmoten.rx.jdbc.pool.NonBlockingConnectionPool;
import org.davidmoten.rx.jdbc.pool.Pools;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
@RestController
public class Test2Controller {
@Bean
public Database database() {
NonBlockingConnectionPool pool = Pools.nonBlocking()
.maxPoolSize(Runtime.getRuntime().availableProcessors() * 5)
.connectionProvider(ConnectionProvider.from("jdbc:hive2://localhost:10000/default"))
.build();
return Database.from(pool);
}
@Autowired
private Database database;
# 这个注解来自于插件lombok
@Data
private class KV {
private String key;
private String value;
}
@GetMapping("/1")
public Flux test1() throws SQLException {
String sql = "select * from src limit 20";
Flowable<KV> kvFlowable = database.select(sql).get(row -> {
KV kv = new KV();
kv.setKey(row.getString("key"));
kv.setValue(row.getString("value"));
return kv;
});
return Flux.from(kvFlowable);
}
}
至此,已可以正常使用了。
在database.select(sql).get()这里会获取到一个JDBC的ResultSet,我们需要在这里做一些转化。
第一种,就是上面的方式,比较麻烦。
第二种,rxjava本身提供一个automap的方法来映射对象。(底层用的jackson,我这里没走通,)
第三种,转换为map
private List<HashMap<String, Object>> resultSetToArrayList(ResultSet rs) throws SQLException{
ResultSetMetaData md = rs.getMetaData();
int columns = md.getColumnCount();
ArrayList<HashMap<String, Object>> list = new ArrayList<>();
while (rs.next()){
HashMap<String, Object> row = new HashMap<>(columns);
for(int i=1; i<=columns; ++i){
row.put(md.getColumnName(i), rs.getObject(i));
}
list.add(row);
}
return list;
}
---
Flowable<List<HashMap<String, Object>>> kvFlowable = database.select(sql).get(
this::resultSetToArrayList);
return Mono.from(kvFlowable).flatMapMany(Flux::fromIterable);
第四种,使用同步阻塞的方式
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection connection = DriverManager
.getConnection("jdbc:hive2://localhost:10000/default", "", "");
Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("select * from src limit 20");
List<HashMap<String, Object>> mapList = new ArrayList<>();
while (rs.next()) {
System.out.println(rs.getString(1));
mapList = resultSetToArrayList(rs);
}
return Mono.just(mapList);
--
当然,可以注册JdbcTemplate到bean中,
@Bean JdbcTemplate jdbcTemplate() {
return new JdbcTemplate(new SimpleDriverDataSource(new HiveDriver(), "jdbc:hive2://localhost:10000"));
}
public Mono test5() {
List<Map<String, Object>> result = hiveTemplate.queryForList("select * from src limit 20");
return Mono.just(result);
}
第五种,十分不推荐,因为HiveTemplate只能返回list、string等基础类,不能使用map
@Bean
public HiveTemplate hiveTemplate() {
DataSource dataSource = new SimpleDriverDataSource(new HiveDriver(), "jdbc:hive2://localhost:10000");
HiveClientFactory factory = () -> new HiveClient(dataSource);
return new HiveTemplate(factory);
}
网友评论