美文网首页
kafka表引擎使用

kafka表引擎使用

作者: FyK_21f8 | 来源:发表于2021-03-10 01:04 被阅读0次

    1 创建kafka topic

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 3 --replication-factor 1 --topic order
    

    2 验证生产者消费者

    # 生产者发送消息
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic order
    
    # 订单消息
    {"id":1, "goods_name":"大力丸", "price":999.99, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:01"}
    
    # 创建消费者
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order --from-beginning
    

    消费结果

    {"id":1, "goods_name":"大力丸", "price":999.99, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:01"}
    

    消费者能成功消费到消息,kafka在本机可用

    3 创建clickhouse kafka表引擎

    我们需要使用kafka表引擎创建一张用于消费kafka数据的表,建表语句

    CREATE TABLE kafka_order_consumer
    (
        id UInt64 COMMENT '订单id',
        goods_name String COMMENT '商品名称',
        price Decimal32(2) COMMENT '商品价格',
        user_name String COMMENT '用户名称',
        addr String COMMENT '收货地址',
        order_date DateTime COMMENT '订单日期'
    )ENGINE = Kafka()
        SETTINGS
        kafka_broker_list = '192.168.211.10:9092',
        kafka_topic_list = 'order',
        kafka_group_name = 'ck',
        kafka_format = 'JSONEachRow'
    

    kafka_broker_list kafka地址
    kafka_topic_list topic主题名称
    kafka_group_name 消费者组
    kafka_format kafka消息的数据类型

    4 创建本地表

    CREATE TABLE kafka_order_mergetree
    (
        id UInt64 COMMENT '订单id',
        goods_name String COMMENT '商品名称',
        price Decimal32(2) COMMENT '商品价格',
        user_name String COMMENT '用户名称',
        addr String COMMENT '收货地址',
        order_date DateTime COMMENT '订单日期'
    )ENGINE = MergeTree()
        partition by toYYYYMM(order_date)
        order by id 
    

    5 创建物化视图

    CREATE MATERIALIZED VIEW consumer TO kafka_order_mergetree AS
        SELECT id, goods_name, price, user_name, addr, order_date FROM kafka_order_consumer;
    

    kafka_order_mergetree 最终存储的本地表
    kafka_order_consumer 消费kafka的消费者表

    6 kafka生产者发送数据

    # 进入终端
    bin/kafka-console-producer.sh --broker-list 192.168.211.10:9092 --topic order
    
    # json数据
    {"id":1, "goods_name":"大力丸", "price":999.99, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:01:00"}
    {"id":2, "goods_name":"小力丸", "price":888.88, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:39:00"}
    {"id":3, "goods_name":"营养快线", "price":666.66, "user_name":"jack", "addr":"广东深圳",  "order_date":"2021-03-09 23:46:00"}
    

    7 查看本地表数据

    select * from kafka_order_mergetree;
    

    查询结果

    ┌─id─┬─goods_name─┬──price─┬─user_name─┬─addr─────┬──────────order_date─┐
    │  2 │ 小力丸     │ 888.88 │ jack      │ 广东深圳 │ 2021-03-09 23:39:00 │
    └────┴────────────┴────────┴───────────┴──────────┴─────────────────────┘
    ┌─id─┬─goods_name─┬──price─┬─user_name─┬─addr─────┬──────────order_date─┐
    │  3 │ 营养快线   │ 666.66 │ jack      │ 广东深圳 │ 2021-03-09 23:46:00 │
    └────┴────────────┴────────┴───────────┴──────────┴─────────────────────┘
    ┌─id─┬─goods_name─┬──price─┬─user_name─┬─addr─────┬──────────order_date─┐
    │  1 │ 大力丸     │ 999.99 │ jack      │ 广东深圳 │ 2021-03-09 23:01:00 │
    └────┴────────────┴────────┴───────────┴──────────┴─────────────────────┘
    

    8 中间的小坑

    一开始kafka发送数据时,order_date字段的时间格式为yyyy-MM-dd HH:mm时间格式不正确,导致时间格式转换错误,修正的数据后重新发送数据,查看kafka消费表和本地表都没有数据,一度怀疑是clickhouse没有消费到kafka的数据,所以又通过kafka-console-consumer.sh的方式以同样的消费者组ck去消费消息,发现并没有消费到任何消息,说明消息已经被ck消费过了,接着重置了ck消费者组的offset,发现查kafka_order_consumer表报了错,怀疑是不是只要中间有一条数据有问题,将导致接下来的所有的数据都会消费失败;于是删除了topic,检查了数据order_date字段的日期格式,重新生产数据发现可以了;

    后面查看错误日志发现了...

    DB::UnionBlockInputStream::~UnionBlockInputStream(): Code: 41, e.displayText() = DB::ParsingException: Cannot parse datetime 2021-03-09 23:01"}{}:
    

    或许我一开始就应该去clickhouse的log目录下看日志的!!!!天色不早了,这个问题有待进一步深入研究...

    相关文章

      网友评论

          本文标题:kafka表引擎使用

          本文链接:https://www.haomeiwen.com/subject/ikdiqltx.html