kafka模块提供了一些开箱即用的功能,但大部分特性均来自spring-kafka。spring-kafka本身其实已经可以做足够多的事情。
spring-kafka针对kafka的操作做了高级抽象,本文进行简要介绍一些常用功能,以方便开发人员熟知。
发送消息
spring-kafka提供了两种发送操作类:KafkaTemplate及ReplyingKafkaTemplate。前者为普通发送者,后者可以同步接收消费者的回复消息。
消费消息
初始消费
kafka的auto.offset.reset允许我们设置kafka中没有初始offset或当前offset没有存储在服务器时的初始消费方式,一般我们设置为earliest:当分区中存在已消费的offset时,从当前位置开始消费;不存在时,从头拉取,此种方式防止我们晚于生产者进入消费,从而导致丢失消息。
但是此种模式针对同一个topic的不同分组,消费者将会从头开始消费(比如某天有人兴起换了个分组名称)。
故开发者在消费消息,处理业务时必须需要做幂等性处理。(如根据状态处理,根据唯一键进行写入等)
提交方式
此处所指的提交即为修改offset。
spring-kafka的消费者可以指定其提交方式,默认为自动提交。可以通过修改enable-auto-commit配置来开启手动提交。
需注意的是,仅开启此选项,listener的默认应答方式ack-mode为batch,仍是自动提交,只不过变成了每批记录传递给监听器之后批量提交。
如果需要变为显示提交,需要设置ack-mode为MANUAL或MANUAL_IMMEDIATE,并在监听器中加入Acknowledgment参数,调用其acknowledgment方法进行手动提交。
大体代码如下:
application.yml
spring:
kafka:
consumer:
enable-auto-commit: false
listener:
ack-mode: manual_immediate
listener
@KafkaListener(topics = "acknowledgment-test")
public void listen(UserModel userModel, Acknowledgment acknowledgment) {
System.out.println(userModel);
acknowledgment.acknowledge();
}
批量消费
可以通过spring.kafka.listener.type=batch开启批量消费。
可以通过spring.kafka.consumer.max-poll-recrods来修改每次拉取的消息。默认为500。
可以通过spring.kafka.listener.concurrency来修改同时消费的监听器。建议与分区数量一致。
事务
kafka在早在0.11版本便引入了事务机制及消息仅发送一次(但唯一消息只有在同一分区才有效)。但是spring-kafa并没有提供此方式的配置。(可能只针对一个分区的唯一消息比较鸡肋?)
spring-kafka提供了事务机制。允许我们可以使用kafka提供的事务机制回滚提交。
消息事务的场景
数据库事务的是针对于数据库的ACID属性。而消息系统的事务则是针对消息生产/消费的原则操作。两者为不同的数据源,不可混为一谈。
我们使用kafka消息事务的场景有以下两种:
- 在一次业务中,存在消费消息,又存在生产消息。此时如果消息生产失败,那么消费者需要回滚。这种情况称为consumer-transform-producer
- 在一次业务中,存在多次生产消息,其中后续生产的消息抛出异常,前置生产的消息需要回滚。
同步事务
spring-kafka提供了一种同步链事务,可以允许kafka数据源事务与其他数据源结合,要么一起成功,要么一起失败。可以通过ChainedKafkaTransactionManager来实现。
开启方式
spring-kafka提供了spring.kafka.producer.transaction-id-prefix属性开启事务,仅需要配置事务前缀,并且在所有涉及到kafka操作及监听的方法上增加@Transcational注解。
注意:spring-kafka的事务是针对单示例的,即每个@Transcational所标识的方法均会创建一个事务,并且生成一个事务id。这种方式比较适合于真实场景。
大体代码如下:
application.yml
spring:
kafka:
producer:
transaction-id-prefix: test-transacation
网友评论