软件下载略过,直接进入安装
部署
- 到config目录下配置
application.yml 选择元数据存储类型 ,我选pgsql
编辑application-pgsql.yml,配置pg连接信息
2.执行pgsql脚本到数据库
3.复制flink/lib下jar到 dinky/extends里
测试
1.新建flinksql任务
CREATE TABLE Orders (
order_number BIGINT,
price DECIMAL(32,2),
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'number-of-rows' = '50'
);
select * from Orders;
local模式预览(因为select,只能预览)
image.png
上面这个未明确具体原因, 把元数据库换到mysql 后就可以正常运行了。
CDC配置
将dinky lib目录下的 dinky-client-base-1.0.0-rc4.jar、dinky-common-1.0.0-rc4.jar以及 dinky/extends/flink版本/dinky/dinky-client-1.15-1.0.0-rc4.jar 放到了flink的lib下,
mysql的cdc包 和jdbc包在dinky的extend和 flink/lib下都要有。
重启flink, 重启dinky
《注意》:提交standalone模式下, 任务在flink里, print也在对应的任务节点的stdout里显示。
《注意》:dinky的jar和flink lib下的jar 不能用软链接,浪费了一上午时间
kafka配置
dinky写到kafka
报错:Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
看这篇文章里https://blog.csdn.net/m0_37759590/article/details/127791947 ,
需要在kafka的 server.property里设置一个属性值transaction.max.timeout.ms=7200000,
在flink sql中设置对应属性的值
网友评论