美文网首页
SpringKafka常用组件与配置

SpringKafka常用组件与配置

作者: BeRicher | 来源:发表于2019-01-18 16:51 被阅读0次

    kafka模块提供了一些开箱即用的功能,但大部分特性均来自spring-kafka。spring-kafka本身其实已经可以做足够多的事情。
    spring-kafka针对kafka的操作做了高级抽象,本文进行简要介绍一些常用功能,以方便开发人员熟知。

    发送消息

    spring-kafka提供了两种发送操作类:KafkaTemplate及ReplyingKafkaTemplate。前者为普通发送者,后者可以同步接收消费者的回复消息。

    消费消息

    初始消费

    kafka的auto.offset.reset允许我们设置kafka中没有初始offset或当前offset没有存储在服务器时的初始消费方式,一般我们设置为earliest:当分区中存在已消费的offset时,从当前位置开始消费;不存在时,从头拉取,此种方式防止我们晚于生产者进入消费,从而导致丢失消息。
    但是此种模式针对同一个topic的不同分组,消费者将会从头开始消费(比如某天有人兴起换了个分组名称)。
    故开发者在消费消息,处理业务时必须需要做幂等性处理。(如根据状态处理,根据唯一键进行写入等)

    提交方式

    此处所指的提交即为修改offset。
    spring-kafka的消费者可以指定其提交方式,默认为自动提交。可以通过修改enable-auto-commit配置来开启手动提交。
    需注意的是,仅开启此选项,listener的默认应答方式ack-mode为batch,仍是自动提交,只不过变成了每批记录传递给监听器之后批量提交。
    如果需要变为显示提交,需要设置ack-mode为MANUAL或MANUAL_IMMEDIATE,并在监听器中加入Acknowledgment参数,调用其acknowledgment方法进行手动提交。
    大体代码如下:

    application.yml

    spring:
      kafka:
        consumer:
          enable-auto-commit: false
        listener:
          ack-mode: manual_immediate
    

    listener

        @KafkaListener(topics = "acknowledgment-test")
        public void listen(UserModel userModel, Acknowledgment acknowledgment) {
            System.out.println(userModel);
            acknowledgment.acknowledge();
        }
    

    批量消费

    可以通过spring.kafka.listener.type=batch开启批量消费。
    可以通过spring.kafka.consumer.max-poll-recrods来修改每次拉取的消息。默认为500。
    可以通过spring.kafka.listener.concurrency来修改同时消费的监听器。建议与分区数量一致。

    事务

    kafka在早在0.11版本便引入了事务机制及消息仅发送一次(但唯一消息只有在同一分区才有效)。但是spring-kafa并没有提供此方式的配置。(可能只针对一个分区的唯一消息比较鸡肋?)

    spring-kafka提供了事务机制。允许我们可以使用kafka提供的事务机制回滚提交。

    消息事务的场景

    数据库事务的是针对于数据库的ACID属性。而消息系统的事务则是针对消息生产/消费的原则操作。两者为不同的数据源,不可混为一谈。

    我们使用kafka消息事务的场景有以下两种:

    1. 在一次业务中,存在消费消息,又存在生产消息。此时如果消息生产失败,那么消费者需要回滚。这种情况称为consumer-transform-producer
    2. 在一次业务中,存在多次生产消息,其中后续生产的消息抛出异常,前置生产的消息需要回滚。

    同步事务

    spring-kafka提供了一种同步链事务,可以允许kafka数据源事务与其他数据源结合,要么一起成功,要么一起失败。可以通过ChainedKafkaTransactionManager来实现。

    开启方式

    spring-kafka提供了spring.kafka.producer.transaction-id-prefix属性开启事务,仅需要配置事务前缀,并且在所有涉及到kafka操作及监听的方法上增加@Transcational注解。
    注意:spring-kafka的事务是针对单示例的,即每个@Transcational所标识的方法均会创建一个事务,并且生成一个事务id。这种方式比较适合于真实场景。

    大体代码如下:

    application.yml

    spring:
      kafka:
        producer:
          transaction-id-prefix: test-transacation 
    

    快速开发框架
    高质量图片压缩工具

    相关文章

      网友评论

          本文标题:SpringKafka常用组件与配置

          本文链接:https://www.haomeiwen.com/subject/sebflqtx.html