美文网首页
spring kafka 集成

spring kafka 集成

作者: cifer_pan | 来源:发表于2021-05-08 19:25 被阅读0次

基础配置

1.mvn 坐标

        <!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!-- json 工具 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.76</version>
        </dependency>

2.kafka 配置文件

spring:
  ### kafka 配置
  kafka:
    producer:
      # 重试次数
      retries: 0
      # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      acks: 1
      # 批量大小
      batch-size: 16384
      # 生产端缓冲区大小
      buffer-memory: 33554432
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 是否自动提交offset
      enable-auto-commit: true
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      group-id: test-consumer
    bootstrap-servers: 10.162.108.62:9092

1.简单生产模型

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    // 生产消息
    kafkaTemplate.send(topic1, msg);

生产消息

2.回调消息生产模型

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    // 生产消息
    kafkaTemplate.send(top, msg).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            log.info("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            log.info("发送消息失败:" + failure.getMessage());
        });

3.事物消息生产模型

   @Resource
    private KafkaTemplate<String, String> kafkaTemplate;
    // 生产消息
   kafkaTemplate.executeInTransaction(operations -> {
            operations.send(topic1, msg);
            throw new RuntimeException("fail");
        });

消费消息

1.简单消费模型

    @KafkaListener(topics = {"topic1"},groupId = "1",errorHandler = "consumerAwareErrorHandler")
    public void onMessage1(ConsumerRecord<String, String> record) {
        log.info("简单消费 {}:" + record.topic() + "-" + record.partition() + "-" + record.value()); 
    }
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            log.info("消费异常:"+message.getPayload());
            return null;
        };
    }
topics:消费top
groupId:消费者分组groupId (id 一致时表示点对点通信,不一致时表示广播消息)
errorHandler:异常处理器

相关文章

网友评论

      本文标题:spring kafka 集成

      本文链接:https://www.haomeiwen.com/subject/sqkedltx.html