1、首先在集群上安装flink服务
2、安装完成,我们需要使用将指定的连接的jar,放在flink的lib下
本次使用hive的版本是3.1.0 ,kafka的版本2.0.0,集群版本是2.7.3
根据官网的提示:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/
找不到包可以在阿里云下载:https://maven.aliyun.com/mvn/search
连接hive的yaml配置
sql-client-hive.yamlsql-client-hive.yaml
3、配置完成,重启hive,以及flink
切换到flink的根目录(bin的上一级目录)
cd /usr/hdp/xxxx/flink
./bin/start-cluster.sh
#-s flink 指定yarn session的名称,可不加
./bin/sql-client.sh embedded -e ./conf/sql-client-hive.yaml
启动成功如下
show catalogs;
use catalog myhive;
show tables;
create table students(id Int,name String) WITH ('is_generic'='false');
insert into students(id,name) values(1,'lilei');
insert into students(id,name) values(2,'tom');
insert into students(id,name) values(3,'liming');
select * from students;
3、连接kafka和hive
根据官网提示:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
kafka的版本2.0.0,所以选择:universal,将这3个jar包放到flink的lib下
先将kafka安装好,并且调通,生产和消费数据都没问题
还是启动./bin/sql-client.sh embedded -d ./conf/sql-client-hive.yaml
创建连接kafka的表
CREATE TABLE mykafka (name String, age Int) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'flink_in', 'connector.properties.zookeeper.connect' = 'ip:2181', 'connector.properties.bootstrap.servers' = 'ip:9092', 'format.type' = 'csv', 'update-mode' = 'append');
创建kafka消费
select * from mykafka;
参考链接:https://cloud.tencent.com/developer/article/1616330
网友评论