flink 从1.10.0 开始支持hive,开始搭建flink-hive坏境以及为后期sql-client提供坏境准备。
1、flink 1.11.2 编译及搭建,参考之前的flink1.9源码编译
2、拷贝所需hive jar包到flink lib包下
cp /opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec-1.1.0-cdh5.14.2.jar ./
cp /opt/cloudera/parcels/CDH/lib/hive/lib/hive-metastore-1.1.0-cdh5.14.2.jar ./
cp /opt/cloudera/parcels/CDH/lib/hive/lib/datanucleus-* ./
3、还需flink-shaded-hadoop自己编译的相关包,
flink-hadoop-compatibility_2.11-1.11.2.jar
flink-shaded-hadoop-2-uber-2.6.0-cdh5.14.2-9.0.jar
4、其他包
因为集群配置了lzo,还需导入相关的包
hadoop-lzo-0.4.15-cdh5.14.2.jar
flink-connector相关包
flink-connector-kafka_2.11-1.11.2.jar
flink-connector-kafka-base_2.11-1.11.2.jar
flink-connector-hive_2.11-1.11.2.jar
最终lib下的全部jar包
图片.png
5、进入flink/conf目录下,
cp sql-client-defaults.yaml sql-client-hive.yaml
修改sql-client-hive.yaml如下配置
catalogs: #[] # empty list
# A typical catalog definition looks like:
- name: myhive
type: hive
hive-conf-dir: /etc/hive/conf.cloudera.hive
6、进入fink目录 启动sqlclient
./bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Flink SQL> show catalogs ;
default_catalog
myhive
Flink SQL> use catalog myhive;
Flink SQL> set table.sql-dialect=hive;
Flink SQL> create table mytable(name string,num double);
Flink SQL> desc mytable;
root
|-- name: STRING
|-- num: DOUBLE
Flink SQL> insert into mytable values('yuchi',16);
Flink SQL> select * from mytable;
结果
name num
yuchi 16.0
7、解决异常: 注意当在flinks-sql建hive表时 报错如下
图片.png
解决:需要设置如下参数table.sql-dialect,其中hive是hive语法,default是flink-sql语法
set table.sql-dialect=hive;
set table.sql-dialect=default;
网友评论